This is page 1 of 2. Use http://codebase.md/opensensor/bn_cline_mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .gitignore
├── .idea
│ ├── editor.xml
│ ├── vcs.xml
│ └── workspace.xml
├── binaryninja_http_client.py
├── binaryninja_http_server.py
├── binaryninja_mcp_client.py
├── binaryninja_mcp_http_server.py
├── binaryninja_server.py
├── binaryninja-mcp-bridge.js
├── bridge
│ ├── bn_mcp_bridge_http.py
│ ├── bn_mcp_bridge_stdio.py
│ ├── Pipfile
│ ├── Pipfile.lock
│ └── requirements.txt
├── client.ts
├── cline-tool.json
├── example.py
├── examples
│ └── binary_analysis.ts
├── LICENSE
├── package-lock.json
├── package.json
├── README_EXTENDED.md
├── README.md
├── run_analysis.sh
├── run-bridge.sh
├── run.js
├── test_pagination.py
└── tsconfig.json
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | __pycache__/
2 | node_modules/
3 | .venv
4 |
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
1 | [](https://mseep.ai/app/opensensor-bn-cline-mcp)
2 |
3 | # binary_ninja_cline_mcp
4 | An MCP server for Cline that works with Binary Ninja (Personal License)
5 |
6 | This repository contains an MCP server that allows Cline to analyze binaries using Binary Ninja.
7 | Note: Not all files will be used, there is also prototype of using headless Binary Ninja but my
8 | license is Personal so I can't test it.
9 |
10 | ## Setup
11 |
12 | 1. Install the latest of Binary Ninja MCP Plugin https://github.com/fosdickio/binary_ninja_mcp
13 | 2. Open your binary and start the MCP server from within Binary Ninja.
14 | 3. Open a terminal and run python binary_ninja_mcp_http_server.py --port 8088
15 | 4. Open another terminal and run `npm start`
16 | 5. Open Cline and add the following tool:{
17 | Example:
18 | ```
19 | {
20 | "mcpServers": {
21 | "BN MCP": {
22 | "command": "node",
23 | "args": ["/home/matteius/binary_ninja_cline/bn_cline_mcp/binaryninja-mcp-bridge.js"],
24 | "env": {
25 | "BN_HTTP_SERVER": "http://localhost:8088"
26 | },
27 | "autoApprove": [],
28 | "disabled": false,
29 | "timeout": 30
30 | }
31 | }
32 | }
33 |
34 | ```
```
--------------------------------------------------------------------------------
/bridge/requirements.txt:
--------------------------------------------------------------------------------
```
1 | anthropic>=0.49.0
2 | fastmcp>=2.0.0
3 | requests>=2.32.3
4 |
```
--------------------------------------------------------------------------------
/.idea/vcs.xml:
--------------------------------------------------------------------------------
```
1 | <?xml version="1.0" encoding="UTF-8"?>
2 | <project version="4">
3 | <component name="VcsDirectoryMappings">
4 | <mapping directory="" vcs="Git" />
5 | </component>
6 | </project>
```
--------------------------------------------------------------------------------
/run-bridge.sh:
--------------------------------------------------------------------------------
```bash
1 | #!/bin/bash
2 | # Save this as /home/matteius/Documents/Cline/MCP/bn-mcp/run-bridge.sh
3 | export BN_HTTP_SERVER=http://localhost:8088
4 | cd /home/matteius/Documents/Cline/MCP/bn-mcp/
5 | /usr/bin/node binaryninja-mcp-bridge.js
6 |
```
--------------------------------------------------------------------------------
/cline-tool.json:
--------------------------------------------------------------------------------
```json
1 | {
2 | "name": "Binary Ninja MCP",
3 | "description": "Analyze binaries via Binary Ninja MCP server",
4 | "entry": "/home/matteius/Documents/Cline/MCP/bn-mcp/binaryninja_server.py",
5 | "language": "json",
6 | "tool": {
7 | "type": "mcp"
8 | }
9 | }
10 |
```
--------------------------------------------------------------------------------
/tsconfig.json:
--------------------------------------------------------------------------------
```json
1 | {
2 | "compilerOptions": {
3 | "target": "ES2020",
4 | "module": "commonjs",
5 | "lib": ["ES2020", "DOM"],
6 | "outDir": "./dist",
7 | "rootDir": "./",
8 | "strict": true,
9 | "esModuleInterop": true,
10 | "skipLibCheck": true,
11 | "forceConsistentCasingInFileNames": true,
12 | "resolveJsonModule": true
13 | },
14 | "include": [
15 | "*.ts"
16 | ],
17 | "exclude": [
18 | "node_modules",
19 | "dist"
20 | ]
21 | }
22 |
```
--------------------------------------------------------------------------------
/package.json:
--------------------------------------------------------------------------------
```json
1 | {
2 | "name": "binaryninja-mcp-bridge",
3 | "version": "0.1.0",
4 | "description": "Bridge between MCP and Binary Ninja HTTP Server",
5 | "type": "module",
6 | "main": "binaryninja-mcp-bridge.js",
7 | "scripts": {
8 | "start": "node binaryninja-mcp-bridge.js"
9 | },
10 | "author": "",
11 | "license": "MIT",
12 | "dependencies": {
13 | "@modelcontextprotocol/sdk": "^1.0.2",
14 | "node-fetch": "^3.3.2",
15 | "zod": "^3.22.4",
16 | "zod-to-json-schema": "^3.22.3"
17 | },
18 | "devDependencies": {
19 | "@types/node": "^22.14.0"
20 | }
21 | }
22 |
```
--------------------------------------------------------------------------------
/run_analysis.sh:
--------------------------------------------------------------------------------
```bash
1 | #!/bin/bash
2 | # Script to run the binary analysis example
3 |
4 | # Check if a binary path was provided
5 | if [ $# -lt 1 ]; then
6 | echo "Usage: $0 <path_to_binary> [output_dir]"
7 | exit 1
8 | fi
9 |
10 | BINARY_PATH="$1"
11 | OUTPUT_DIR="${2:-./analysis_output}"
12 |
13 | # Check if the binary exists
14 | if [ ! -f "$BINARY_PATH" ]; then
15 | echo "Error: Binary file '$BINARY_PATH' not found"
16 | exit 1
17 | fi
18 |
19 | # Create the output directory if it doesn't exist
20 | mkdir -p "$OUTPUT_DIR"
21 |
22 | # Run the analysis script
23 | echo "Running binary analysis on '$BINARY_PATH'..."
24 | echo "Results will be saved to '$OUTPUT_DIR'"
25 | echo
26 |
27 | # Make sure we're in the right directory
28 | cd "$(dirname "$0")"
29 |
30 | # Run the TypeScript example using ts-node
31 | npx ts-node examples/binary_analysis.ts "$BINARY_PATH" "$OUTPUT_DIR"
32 |
33 | # Check if the analysis was successful
34 | if [ $? -eq 0 ]; then
35 | echo
36 | echo "Analysis completed successfully!"
37 | echo "Results are available in '$OUTPUT_DIR'"
38 | else
39 | echo
40 | echo "Analysis failed. Please check the error messages above."
41 | fi
42 |
```
--------------------------------------------------------------------------------
/test_pagination.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Test script to verify that pagination is working correctly in the Binary Ninja HTTP client.
4 | This script will load a binary file and retrieve all functions, demonstrating that
5 | pagination is working correctly by retrieving more than the default limit of 100 functions.
6 | """
7 |
8 | import sys
9 | import json
10 | from binaryninja_http_client import BinaryNinjaHTTPClient
11 |
12 | def main():
13 | if len(sys.argv) < 2:
14 | print(f"Usage: {sys.argv[0]} <path_to_binary>")
15 | sys.exit(1)
16 |
17 | binary_path = sys.argv[1]
18 | client = BinaryNinjaHTTPClient()
19 |
20 | # Test the connection
21 | ping_result = client.ping()
22 | print(f"Connection status: {ping_result['status']}")
23 |
24 | if ping_result['status'] != 'connected':
25 | print(f"Error: {ping_result.get('error', 'Unknown error')}")
26 | sys.exit(1)
27 |
28 | # Load the binary if not already loaded
29 | if not ping_result.get('loaded', False):
30 | try:
31 | print(f"Loading binary: {binary_path}")
32 | load_result = client.load_binary(binary_path)
33 | print(f"Load result: {json.dumps(load_result, indent=2)}")
34 | except Exception as e:
35 | print(f"Error loading binary: {e}")
36 | sys.exit(1)
37 |
38 | # Get all functions
39 | print("\nRetrieving all functions...")
40 | functions = client.list_functions()
41 | print(f"Retrieved {len(functions)} functions in total")
42 |
43 | # Print the first 5 and last 5 functions to verify pagination is working
44 | if functions:
45 | print("\nFirst 5 functions:")
46 | for i, func in enumerate(functions[:5]):
47 | print(f"{i+1}. {func['name']} at {func.get('address', 'unknown')}")
48 |
49 | if len(functions) > 10:
50 | print("\nLast 5 functions:")
51 | for i, func in enumerate(functions[-5:]):
52 | print(f"{len(functions)-4+i}. {func['name']} at {func.get('address', 'unknown')}")
53 |
54 | # Test other paginated methods
55 | print("\nRetrieving all imports...")
56 | imports = client.get_imports()
57 | print(f"Retrieved {len(imports)} imports in total")
58 |
59 | print("\nRetrieving all exports...")
60 | exports = client.get_exports()
61 | print(f"Retrieved {len(exports)} exports in total")
62 |
63 | print("\nRetrieving all segments...")
64 | segments = client.get_sections()
65 | print(f"Retrieved {len(segments)} segments in total")
66 |
67 | print("\nRetrieving all data items...")
68 | data_items = client.get_defined_data()
69 | print(f"Retrieved {len(data_items)} data items in total")
70 |
71 | print("\nTest completed successfully!")
72 |
73 | if __name__ == "__main__":
74 | main()
75 |
```
--------------------------------------------------------------------------------
/run.js:
--------------------------------------------------------------------------------
```javascript
1 | #!/usr/bin/env node
2 | /**
3 | * Binary Ninja MCP Bridge Runner
4 | *
5 | * This script helps start all the necessary components for the Binary Ninja MCP setup.
6 | * It can start the HTTP server, the MCP bridge, or both.
7 | */
8 |
9 | const { spawn } = require('child_process');
10 | const path = require('path');
11 | const fs = require('fs');
12 |
13 | // Parse command line arguments
14 | const args = process.argv.slice(2);
15 | const command = args[0] || 'all'; // Default to 'all'
16 |
17 | // Configuration
18 | const HTTP_SERVER_PORT = 8088;
19 | const HTTP_SERVER_HOST = '127.0.0.1';
20 |
21 | // Helper function to run a command and pipe its output
22 | function runCommand(cmd, args, name) {
23 | console.log(`Starting ${name}...`);
24 |
25 | const proc = spawn(cmd, args, {
26 | stdio: 'inherit',
27 | shell: true
28 | });
29 |
30 | proc.on('error', (err) => {
31 | console.error(`Error starting ${name}: ${err.message}`);
32 | });
33 |
34 | proc.on('close', (code) => {
35 | if (code !== 0) {
36 | console.error(`${name} exited with code ${code}`);
37 | } else {
38 | console.log(`${name} stopped`);
39 | }
40 | });
41 |
42 | return proc;
43 | }
44 |
45 | // Start the HTTP server
46 | function startHttpServer() {
47 | const serverPath = path.join(__dirname, 'binaryninja_mcp_http_server.py');
48 |
49 | if (!fs.existsSync(serverPath)) {
50 | console.error(`HTTP server script not found at ${serverPath}`);
51 | process.exit(1);
52 | }
53 |
54 | return runCommand('python3', [
55 | serverPath,
56 | '--host', HTTP_SERVER_HOST,
57 | '--port', HTTP_SERVER_PORT.toString()
58 | ], 'HTTP Server');
59 | }
60 |
61 | // Start the MCP bridge
62 | function startMcpBridge() {
63 | const bridgePath = path.join(__dirname, 'binaryninja-mcp-bridge.js');
64 |
65 | if (!fs.existsSync(bridgePath)) {
66 | console.error(`MCP bridge script not found at ${bridgePath}`);
67 | process.exit(1);
68 | }
69 |
70 | // Set the environment variable for the HTTP server URL
71 | process.env.BN_HTTP_SERVER = `http://${HTTP_SERVER_HOST}:${HTTP_SERVER_PORT}`;
72 |
73 | return runCommand('node', [bridgePath], 'MCP Bridge');
74 | }
75 |
76 | // Print usage information
77 | function printUsage() {
78 | console.log(`
79 | Usage: node run.js [command]
80 |
81 | Commands:
82 | http Start only the HTTP server
83 | bridge Start only the MCP bridge
84 | all Start both the HTTP server and MCP bridge (default)
85 | help Show this help message
86 |
87 | Example:
88 | node run.js all
89 | `);
90 | }
91 |
92 | // Main function
93 | function main() {
94 | switch (command.toLowerCase()) {
95 | case 'http':
96 | startHttpServer();
97 | break;
98 |
99 | case 'bridge':
100 | startMcpBridge();
101 | break;
102 |
103 | case 'all':
104 | startHttpServer();
105 | setTimeout(() => {
106 | startMcpBridge();
107 | }, 2000); // Wait 2 seconds for the HTTP server to start
108 | break;
109 |
110 | case 'help':
111 | printUsage();
112 | break;
113 |
114 | default:
115 | console.error(`Unknown command: ${command}`);
116 | printUsage();
117 | process.exit(1);
118 | }
119 | }
120 |
121 | // Run the main function
122 | main();
123 |
```
--------------------------------------------------------------------------------
/.idea/workspace.xml:
--------------------------------------------------------------------------------
```
1 | <?xml version="1.0" encoding="UTF-8"?>
2 | <project version="4">
3 | <component name="AutoImportSettings">
4 | <option name="autoReloadType" value="SELECTIVE" />
5 | </component>
6 | <component name="BackendCodeEditorMiscSettings">
7 | <option name="/Default/RiderDebugger/RiderRestoreDecompile/RestoreDecompileSetting/@EntryValue" value="false" type="bool" />
8 | <option name="/Default/Housekeeping/GlobalSettingsUpgraded/IsUpgraded/@EntryValue" value="true" type="bool" />
9 | <option name="/Default/Housekeeping/FeatureSuggestion/FeatureSuggestionManager/DisabledSuggesters/=SwitchToGoToActionSuggester/@EntryIndexedValue" value="true" type="bool" />
10 | <option name="/Default/Environment/Hierarchy/GeneratedFilesCacheKey/Timestamp/@EntryValue" value="7" type="long" />
11 | <option name="/Default/Housekeeping/FeatureSuggestion/FeatureSuggestionManager/DisabledSuggesters/=SwitchToGoToActionSuggester/@EntryIndexRemoved" />
12 | </component>
13 | <component name="CMakeProjectFlavorService">
14 | <option name="flavorId" value="CMakePlainProjectFlavor" />
15 | </component>
16 | <component name="CMakeSettings">
17 | <configurations>
18 | <configuration PROFILE_NAME="Debug" ENABLED="true" CONFIG_NAME="Debug" />
19 | </configurations>
20 | </component>
21 | <component name="ChangeListManager">
22 | <list default="true" id="670a9151-2f3f-4278-bef4-d2416d369e62" name="Changes" comment="" />
23 | <option name="SHOW_DIALOG" value="false" />
24 | <option name="HIGHLIGHT_CONFLICTS" value="true" />
25 | <option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
26 | <option name="LAST_RESOLUTION" value="IGNORE" />
27 | </component>
28 | <component name="ClangdSettings">
29 | <option name="formatViaClangd" value="false" />
30 | </component>
31 | <component name="Git.Settings">
32 | <option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
33 | </component>
34 | <component name="ProjectColorInfo">{
35 | "associatedIndex": 3
36 | }</component>
37 | <component name="ProjectId" id="2vCdF1tV8kT5ok6lkq0D4zlu2P2" />
38 | <component name="ProjectViewState">
39 | <option name="hideEmptyMiddlePackages" value="true" />
40 | <option name="showLibraryContents" value="true" />
41 | </component>
42 | <component name="PropertiesComponent">{
43 | "keyToString": {
44 | "RunOnceActivity.RadMigrateCodeStyle": "true",
45 | "RunOnceActivity.ShowReadmeOnStart": "true",
46 | "RunOnceActivity.cidr.known.project.marker": "true",
47 | "RunOnceActivity.git.unshallow": "true",
48 | "RunOnceActivity.readMode.enableVisualFormatting": "true",
49 | "cf.first.check.clang-format": "false",
50 | "cidr.known.project.marker": "true",
51 | "git-widget-placeholder": "main",
52 | "last_opened_file_path": "/home/matteius/bn_cline_mcp",
53 | "node.js.detected.package.eslint": "true",
54 | "node.js.detected.package.tslint": "true",
55 | "node.js.selected.package.eslint": "(autodetect)",
56 | "node.js.selected.package.tslint": "(autodetect)",
57 | "nodejs_package_manager_path": "npm",
58 | "vue.rearranger.settings.migration": "true"
59 | }
60 | }</component>
61 | <component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
62 | <component name="TaskManager">
63 | <task active="true" id="Default" summary="Default task">
64 | <changelist id="670a9151-2f3f-4278-bef4-d2416d369e62" name="Changes" comment="" />
65 | <created>1743652882529</created>
66 | <option name="number" value="Default" />
67 | <option name="presentableId" value="Default" />
68 | <updated>1743652882529</updated>
69 | <workItem from="1743652883573" duration="5236000" />
70 | <workItem from="1743696826637" duration="600000" />
71 | <workItem from="1743702948433" duration="7000" />
72 | </task>
73 | <servers />
74 | </component>
75 | <component name="TypeScriptGeneratedFilesManager">
76 | <option name="version" value="3" />
77 | </component>
78 | </project>
```
--------------------------------------------------------------------------------
/bridge/bn_mcp_bridge_stdio.py:
--------------------------------------------------------------------------------
```python
1 | from mcp.server.fastmcp import FastMCP
2 | import requests
3 |
4 |
5 | binja_server_url = "http://localhost:9009"
6 | mcp = FastMCP("binja-mcp")
7 |
8 |
9 | def safe_get(endpoint: str, params: dict = None) -> list:
10 | """
11 | Perform a GET request. If 'params' is given, we convert it to a query string.
12 | """
13 | if params is None:
14 | params = {}
15 | qs = [f"{k}={v}" for k, v in params.items()]
16 | query_string = "&".join(qs)
17 | url = f"{binja_server_url}/{endpoint}"
18 | if query_string:
19 | url += "?" + query_string
20 |
21 | try:
22 | response = requests.get(url, timeout=5)
23 | response.encoding = "utf-8"
24 | if response.ok:
25 | return response.text.splitlines()
26 | else:
27 | return [f"Error {response.status_code}: {response.text.strip()}"]
28 | except Exception as e:
29 | return [f"Request failed: {str(e)}"]
30 |
31 |
32 | def safe_post(endpoint: str, data: dict | str) -> str:
33 | try:
34 | if isinstance(data, dict):
35 | response = requests.post(
36 | f"{binja_server_url}/{endpoint}", data=data, timeout=5
37 | )
38 | else:
39 | response = requests.post(
40 | f"{binja_server_url}/{endpoint}", data=data.encode("utf-8"), timeout=5
41 | )
42 | response.encoding = "utf-8"
43 | if response.ok:
44 | return response.text.strip()
45 | else:
46 | return f"Error {response.status_code}: {response.text.strip()}"
47 | except Exception as e:
48 | return f"Request failed: {str(e)}"
49 |
50 |
51 | @mcp.tool()
52 | def list_methods(offset: int = 0, limit: int = 100) -> list:
53 | """
54 | List all function names in the program with pagination.
55 | """
56 | return safe_get("methods", {"offset": offset, "limit": limit})
57 |
58 |
59 | @mcp.tool()
60 | def list_classes(offset: int = 0, limit: int = 100) -> list:
61 | """
62 | List all namespace/class names in the program with pagination.
63 | """
64 | return safe_get("classes", {"offset": offset, "limit": limit})
65 |
66 |
67 | @mcp.tool()
68 | def decompile_function(name: str) -> str:
69 | """
70 | Decompile a specific function by name and return the decompiled C code.
71 | """
72 | return safe_post("decompile", name)
73 |
74 |
75 | @mcp.tool()
76 | def rename_function(old_name: str, new_name: str) -> str:
77 | """
78 | Rename a function by its current name to a new user-defined name.
79 | """
80 | return safe_post("renameFunction", {"oldName": old_name, "newName": new_name})
81 |
82 |
83 | @mcp.tool()
84 | def rename_data(address: str, new_name: str) -> str:
85 | """
86 | Rename a data label at the specified address.
87 | """
88 | return safe_post("renameData", {"address": address, "newName": new_name})
89 |
90 |
91 | @mcp.tool()
92 | def list_segments(offset: int = 0, limit: int = 100) -> list:
93 | """
94 | List all memory segments in the program with pagination.
95 | """
96 | return safe_get("segments", {"offset": offset, "limit": limit})
97 |
98 |
99 | @mcp.tool()
100 | def list_imports(offset: int = 0, limit: int = 100) -> list:
101 | """
102 | List imported symbols in the program with pagination.
103 | """
104 | return safe_get("imports", {"offset": offset, "limit": limit})
105 |
106 |
107 | @mcp.tool()
108 | def list_exports(offset: int = 0, limit: int = 100) -> list:
109 | """
110 | List exported functions/symbols with pagination.
111 | """
112 | return safe_get("exports", {"offset": offset, "limit": limit})
113 |
114 |
115 | @mcp.tool()
116 | def list_namespaces(offset: int = 0, limit: int = 100) -> list:
117 | """
118 | List all non-global namespaces in the program with pagination.
119 | """
120 | return safe_get("namespaces", {"offset": offset, "limit": limit})
121 |
122 |
123 | @mcp.tool()
124 | def list_data_items(offset: int = 0, limit: int = 100) -> list:
125 | """
126 | List defined data labels and their values with pagination.
127 | """
128 | return safe_get("data", {"offset": offset, "limit": limit})
129 |
130 |
131 | @mcp.tool()
132 | def search_functions_by_name(query: str, offset: int = 0, limit: int = 100) -> list:
133 | """
134 | Search for functions whose name contains the given substring.
135 | """
136 | if not query:
137 | return ["Error: query string is required"]
138 | return safe_get(
139 | "searchFunctions", {"query": query, "offset": offset, "limit": limit}
140 | )
141 |
142 |
143 | @mcp.tool()
144 | def get_binary_status() -> str:
145 | """
146 | Get the current status of the loaded binary.
147 | """
148 | return safe_get("status")[0]
149 |
150 |
151 | if __name__ == "__main__":
152 | print("Starting MCP bridge service...")
153 | mcp.run()
154 |
```
--------------------------------------------------------------------------------
/README_EXTENDED.md:
--------------------------------------------------------------------------------
```markdown
1 | # Extended Binary Ninja MCP Client
2 |
3 | This project extends the Binary Ninja MCP client to provide more comprehensive binary analysis capabilities through the Model Context Protocol (MCP).
4 |
5 | ## Features
6 |
7 | The extended client adds the following capabilities:
8 |
9 | ### Basic Binary Analysis
10 | - Get binary metadata
11 | - List functions, sections, imports, exports
12 | - Get detailed function information
13 | - Disassemble and decompile functions
14 | - Search for functions by name
15 | - List C++ namespaces
16 | - List and analyze data variables
17 |
18 | ### Advanced Analysis
19 | - Generate comprehensive analysis reports
20 | - Find potential vulnerabilities in binaries
21 | - Compare two binaries to identify differences
22 | - Rename functions and data variables
23 |
24 | ## Setup
25 |
26 | 1. Install dependencies:
27 | ```bash
28 | cd bn_cline_mcp
29 | npm install
30 | npm install --save-dev @types/node
31 | ```
32 |
33 | 2. Make sure Binary Ninja is running with the MCP plugin installed.
34 |
35 | ## Usage
36 |
37 | ### TypeScript Client
38 |
39 | The TypeScript client provides a simple API for interacting with Binary Ninja:
40 |
41 | ```typescript
42 | import { BinaryNinjaClient } from './client';
43 |
44 | async function main() {
45 | const client = new BinaryNinjaClient();
46 |
47 | try {
48 | // Start the server
49 | await client.start('/path/to/binaryninja_server.py');
50 |
51 | // Load a binary file
52 | const binaryPath = '/path/to/binary';
53 |
54 | // Get binary information
55 | const info = await client.getBinaryInfo(binaryPath);
56 | console.log(info);
57 |
58 | // List functions
59 | const functions = await client.listFunctions(binaryPath);
60 | console.log(functions);
61 |
62 | // Decompile a function
63 | const decompiled = await client.decompileFunction(binaryPath, functions[0]);
64 | console.log(decompiled);
65 |
66 | // Generate a comprehensive analysis report
67 | const report = await client.analyzeFile(binaryPath, 'report.json');
68 | console.log(`Found ${report.function_count} functions`);
69 |
70 | // Find potential vulnerabilities
71 | const vulnerabilities = await client.findVulnerabilities(binaryPath);
72 | console.log(`Found ${vulnerabilities.length} potential vulnerabilities`);
73 |
74 | } catch (err) {
75 | console.error('Error:', err);
76 | } finally {
77 | // Stop the server
78 | client.stop();
79 | }
80 | }
81 |
82 | main().catch(console.error);
83 | ```
84 |
85 | ### Example Scripts
86 |
87 | The `examples` directory contains example scripts that demonstrate how to use the extended client:
88 |
89 | - `binary_analysis.ts`: Demonstrates comprehensive binary analysis capabilities
90 |
91 | To run an example:
92 |
93 | ```bash
94 | cd bn_cline_mcp
95 | npx ts-node examples/binary_analysis.ts /path/to/binary [output_dir]
96 | ```
97 |
98 | ## API Reference
99 |
100 | ### Basic Operations
101 |
102 | - `getBinaryInfo(path: string)`: Get information about a binary file
103 | - `listFunctions(path: string)`: List all functions in a binary file
104 | - `getFunction(path: string, functionName: string)`: Get detailed information about a specific function
105 | - `disassembleFunction(path: string, functionName: string)`: Disassemble a function
106 | - `decompileFunction(path: string, functionName: string)`: Decompile a function to C code
107 | - `listSections(path: string)`: List all sections/segments in a binary file
108 | - `listImports(path: string)`: List all imported functions
109 | - `listExports(path: string)`: List all exported symbols
110 | - `listNamespaces(path: string)`: List all C++ namespaces
111 | - `listData(path: string)`: List all defined data variables
112 | - `searchFunctions(path: string, query: string)`: Search for functions by name
113 | - `renameFunction(path: string, oldName: string, newName: string)`: Rename a function
114 | - `renameData(path: string, address: string, newName: string)`: Rename a data variable
115 |
116 | ### Advanced Analysis
117 |
118 | - `analyzeFile(path: string, outputPath?: string)`: Generate a comprehensive analysis report
119 | - `findVulnerabilities(path: string)`: Find potential vulnerabilities in a binary file
120 | - `compareBinaries(path1: string, path2: string)`: Compare two binary files and identify differences
121 |
122 | ## Extending the Client
123 |
124 | You can extend the client by adding new methods to the `BinaryNinjaClient` class in `client.ts`. If you need to add new server-side functionality, you'll need to:
125 |
126 | 1. Add a new tool definition to the `MCP_TOOLS` array in `binaryninja_mcp_http_server.py`
127 | 2. Implement the handler for the new tool in the `_handle_mcp_request` method
128 | 3. Add a corresponding method to the `BinaryNinjaClient` class in `client.ts`
129 |
130 | ## Troubleshooting
131 |
132 | - Make sure Binary Ninja is running and the MCP plugin is installed
133 | - Check that the server path in `client.start()` is correct
134 | - If you get TypeScript errors, make sure you've installed the required dependencies with `npm install --save-dev @types/node`
135 |
136 | ## Handling Timeouts
137 |
138 | The client includes built-in retry logic to handle occasional timeouts that may occur when communicating with the Binary Ninja MCP server. By default, each request will:
139 |
140 | - Timeout after 30 seconds if no response is received
141 | - Automatically retry up to 3 times with a 1-second delay between attempts
142 | - Log detailed error information for debugging
143 |
144 | You can customize the retry behavior when making requests:
145 |
146 | ```typescript
147 | // Custom retry options
148 | const result = await client.sendRequest('some_method', { param: 'value' }, {
149 | maxRetries: 5, // Retry up to 5 times
150 | retryDelay: 2000 // Wait 2 seconds between retries
151 | });
152 | ```
153 |
154 | This makes the client more robust when dealing with large binaries or complex analysis tasks that might occasionally cause timeouts.
155 |
```
--------------------------------------------------------------------------------
/binaryninja-mcp-bridge.js:
--------------------------------------------------------------------------------
```javascript
1 | #!/usr/bin/env node
2 | import { Server } from "@modelcontextprotocol/sdk/server/index.js";
3 | import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
4 | import {
5 | CallToolRequestSchema,
6 | ListToolsRequestSchema,
7 | } from "@modelcontextprotocol/sdk/types.js";
8 | import { z } from 'zod';
9 | import { zodToJsonSchema } from 'zod-to-json-schema';
10 | import fetch from 'node-fetch';
11 |
12 | // Define the version
13 | const VERSION = "0.1.0";
14 |
15 | // Configuration
16 | const BN_HTTP_SERVER = process.env.BN_HTTP_SERVER || 'http://localhost:8088';
17 |
18 | // Schema definitions
19 | const FilePathSchema = z.object({
20 | path: z.string().min(1, "File path cannot be empty")
21 | });
22 |
23 | const FunctionSchema = z.object({
24 | path: z.string().min(1, "File path cannot be empty"),
25 | function: z.string().min(1, "Function name cannot be empty")
26 | });
27 |
28 | // Create a server instance
29 | const server = new Server(
30 | {
31 | name: "binaryninja-mcp-server",
32 | version: VERSION,
33 | },
34 | {
35 | capabilities: {
36 | tools: {},
37 | },
38 | }
39 | );
40 |
41 | // HTTP client for Binary Ninja server
42 | async function callBinaryNinjaServer(method, params = {}) {
43 | try {
44 | console.error(`[INFO] Calling Binary Ninja server method: ${method}`);
45 | console.error(`[INFO] Params: ${JSON.stringify(params)}`);
46 |
47 | const response = await fetch(`${BN_HTTP_SERVER}`, {
48 | method: 'POST',
49 | headers: {
50 | 'Content-Type': 'application/json'
51 | },
52 | body: JSON.stringify({
53 | jsonrpc: "2.0",
54 | id: Date.now().toString(),
55 | method: method,
56 | params: params
57 | })
58 | });
59 |
60 | if (!response.ok) {
61 | const errorText = await response.text();
62 | throw new Error(`HTTP error ${response.status}: ${errorText}`);
63 | }
64 |
65 | const result = await response.json();
66 |
67 | if (result.error) {
68 | throw new Error(`Binary Ninja server error: ${JSON.stringify(result.error)}`);
69 | }
70 |
71 | return result;
72 | } catch (error) {
73 | console.error(`[ERROR] Failed to call Binary Ninja server: ${error.message}`);
74 | throw error;
75 | }
76 | }
77 |
78 | // Register the ListTools handler
79 | server.setRequestHandler(ListToolsRequestSchema, async () => {
80 | console.error("[INFO] Received ListTools request");
81 | return {
82 | tools: [
83 | {
84 | name: "get_binary_info",
85 | description: "Get binary metadata",
86 | inputSchema: zodToJsonSchema(FilePathSchema),
87 | },
88 | {
89 | name: "list_functions",
90 | description: "List functions in a binary",
91 | inputSchema: zodToJsonSchema(FilePathSchema),
92 | },
93 | {
94 | name: "disassemble_function",
95 | description: "Disassemble a function from a binary",
96 | inputSchema: zodToJsonSchema(FunctionSchema),
97 | },
98 | {
99 | name: "decompile_function",
100 | description: "Decompile a function to C",
101 | inputSchema: zodToJsonSchema(FunctionSchema),
102 | }
103 | ],
104 | };
105 | });
106 |
107 | // Register the CallTool handler
108 | server.setRequestHandler(CallToolRequestSchema, async (request) => {
109 | try {
110 | console.error(`[INFO] Received CallTool request for tool: ${request.params.name}`);
111 |
112 | if (!request.params.arguments) {
113 | throw new Error("Arguments are required");
114 | }
115 |
116 | let result;
117 |
118 | // Log the arguments for debugging
119 | console.error(`[DEBUG] Tool arguments: ${JSON.stringify(request.params.arguments)}`);
120 |
121 | // Check if arguments use 'file' instead of 'path'
122 | if (request.params.arguments && request.params.arguments.file !== undefined && request.params.arguments.path === undefined) {
123 | console.error(`[INFO] Converting 'file' parameter to 'path'`);
124 | request.params.arguments.path = request.params.arguments.file;
125 | }
126 |
127 | switch (request.params.name) {
128 | case "get_binary_info": {
129 | try {
130 | const args = FilePathSchema.parse(request.params.arguments);
131 | result = await callBinaryNinjaServer("get_binary_info", args);
132 | } catch (error) {
133 | console.error(`[ERROR] Failed to parse arguments for get_binary_info: ${error.message}`);
134 | console.error(`[ERROR] Arguments received: ${JSON.stringify(request.params.arguments)}`);
135 | throw error;
136 | }
137 | break;
138 | }
139 |
140 | case "list_functions": {
141 | try {
142 | const args = FilePathSchema.parse(request.params.arguments);
143 | result = await callBinaryNinjaServer("list_functions", args);
144 | } catch (error) {
145 | console.error(`[ERROR] Failed to parse arguments for list_functions: ${error.message}`);
146 | console.error(`[ERROR] Arguments received: ${JSON.stringify(request.params.arguments)}`);
147 | throw error;
148 | }
149 | break;
150 | }
151 |
152 | case "disassemble_function": {
153 | try {
154 | const args = FunctionSchema.parse(request.params.arguments);
155 | result = await callBinaryNinjaServer("disassemble_function", args);
156 | } catch (error) {
157 | console.error(`[ERROR] Failed to parse arguments for disassemble_function: ${error.message}`);
158 | console.error(`[ERROR] Arguments received: ${JSON.stringify(request.params.arguments)}`);
159 | throw error;
160 | }
161 | break;
162 | }
163 |
164 | case "decompile_function": {
165 | try {
166 | const args = FunctionSchema.parse(request.params.arguments);
167 | result = await callBinaryNinjaServer("decompile_function", args);
168 | } catch (error) {
169 | console.error(`[ERROR] Failed to parse arguments for decompile_function: ${error.message}`);
170 | console.error(`[ERROR] Arguments received: ${JSON.stringify(request.params.arguments)}`);
171 | throw error;
172 | }
173 | break;
174 | }
175 |
176 | default:
177 | throw new Error(`Unknown tool: ${request.params.name}`);
178 | }
179 |
180 | // Extract content from the result
181 | const content = result.result?.content?.[0]?.text || JSON.stringify(result, null, 2);
182 |
183 | return {
184 | content: [{ type: "text", text: content }],
185 | };
186 |
187 | } catch (error) {
188 | if (error instanceof z.ZodError) {
189 | console.error(`[ERROR] Validation error: ${JSON.stringify(error.errors, null, 2)}`);
190 | throw new Error(`Invalid input: ${JSON.stringify(error.errors)}`);
191 | }
192 | console.error(`[ERROR] Unexpected error: ${error.message}`);
193 | throw error;
194 | }
195 | });
196 |
197 | // Run the server
198 | async function runServer() {
199 | try {
200 | console.error(`[INFO] Starting Binary Ninja MCP Server (connecting to ${BN_HTTP_SERVER})...`);
201 | const transport = new StdioServerTransport();
202 | await server.connect(transport);
203 | console.error("[INFO] Binary Ninja MCP Server running on stdio");
204 | } catch (error) {
205 | console.error(`[FATAL] Failed to start server: ${error.message}`);
206 | process.exit(1);
207 | }
208 | }
209 |
210 | runServer().catch((error) => {
211 | console.error(`[FATAL] Unhandled error in main: ${error.message}`);
212 | process.exit(1);
213 | });
214 |
```
--------------------------------------------------------------------------------
/examples/binary_analysis.ts:
--------------------------------------------------------------------------------
```typescript
1 | /**
2 | * Binary Analysis Example
3 | *
4 | * This example demonstrates how to use the extended Binary Ninja MCP client
5 | * to perform advanced binary analysis tasks.
6 | */
7 |
8 | import { BinaryNinjaClient } from '../client';
9 | import * as path from 'path';
10 | import * as fs from 'fs';
11 |
12 | async function main() {
13 | if (process.argv.length < 3) {
14 | console.error('Usage: ts-node binary_analysis.ts <path_to_binary> [output_dir]');
15 | process.exit(1);
16 | }
17 |
18 | const binaryPath = process.argv[2];
19 | const outputDir = process.argv.length > 3 ? process.argv[3] : './analysis_output';
20 |
21 | // Create output directory if it doesn't exist
22 | if (!fs.existsSync(outputDir)) {
23 | fs.mkdirSync(outputDir, { recursive: true });
24 | }
25 |
26 | const client = new BinaryNinjaClient();
27 |
28 | try {
29 | // Start the server
30 | console.log('Starting Binary Ninja MCP server...');
31 | await client.start('/home/matteius/Documents/Cline/MCP/bn-mcp/binaryninja_server.py');
32 | console.log('Connected to Binary Ninja MCP server');
33 |
34 | // 1. Generate a comprehensive analysis report
35 | console.log('\n=== Generating Comprehensive Analysis Report ===');
36 | const reportPath = path.join(outputDir, 'analysis_report.json');
37 | const report = await client.analyzeFile(binaryPath, reportPath);
38 | console.log(`Analysis report generated and saved to ${reportPath}`);
39 | console.log(`Found ${report.function_count} functions, ${report.imports.length} imports, ${report.exports.length} exports`);
40 |
41 | // 2. Find potential vulnerabilities
42 | console.log('\n=== Scanning for Potential Vulnerabilities ===');
43 | const vulnerabilities = await client.findVulnerabilities(binaryPath);
44 | const vulnPath = path.join(outputDir, 'vulnerabilities.json');
45 | fs.writeFileSync(vulnPath, JSON.stringify(vulnerabilities, null, 2));
46 | console.log(`Found ${vulnerabilities.length} potential vulnerabilities`);
47 | console.log(`Vulnerability report saved to ${vulnPath}`);
48 |
49 | if (vulnerabilities.length > 0) {
50 | console.log('\nTop potential vulnerabilities:');
51 | for (let i = 0; i < Math.min(vulnerabilities.length, 3); i++) {
52 | const vuln = vulnerabilities[i];
53 | console.log(`${i+1}. ${vuln.type} in function ${vuln.function_name} at ${vuln.address}`);
54 | console.log(` Dangerous call: ${vuln.dangerous_call || 'N/A'}`);
55 | }
56 | }
57 |
58 | // 3. Demonstrate function search and renaming
59 | console.log('\n=== Function Search and Manipulation ===');
60 |
61 | // Search for functions containing "main"
62 | const mainFunctions = await client.searchFunctions(binaryPath, 'main');
63 | console.log(`Found ${mainFunctions.length} functions matching "main"`);
64 |
65 | if (mainFunctions.length > 0) {
66 | // Get the first match
67 | const mainFunc = mainFunctions[0];
68 | console.log(`\nFunction details for ${mainFunc.name}:`);
69 | console.log(`Address: ${mainFunc.address}`);
70 |
71 | // Get detailed information
72 | const funcInfo = await client.getFunction(binaryPath, mainFunc.name);
73 | console.log(`Symbol type: ${funcInfo.symbol?.type || 'N/A'}`);
74 |
75 | // Disassemble the function
76 | console.log('\nDisassembly:');
77 | const disasm = await client.disassembleFunction(binaryPath, mainFunc.name);
78 | for (let i = 0; i < Math.min(disasm.length, 5); i++) {
79 | console.log(` ${disasm[i]}`);
80 | }
81 | if (disasm.length > 5) {
82 | console.log(` ... and ${disasm.length - 5} more lines`);
83 | }
84 |
85 | // Decompile the function
86 | console.log('\nDecompiled code:');
87 | const decompiled = await client.decompileFunction(binaryPath, mainFunc.name);
88 | const decompLines = decompiled.split('\n');
89 | for (let i = 0; i < Math.min(decompLines.length, 5); i++) {
90 | console.log(` ${decompLines[i]}`);
91 | }
92 | if (decompLines.length > 5) {
93 | console.log(` ... and ${decompLines.length - 5} more lines`);
94 | }
95 |
96 | // Save the decompiled code to a file
97 | const decompPath = path.join(outputDir, `${mainFunc.name}.c`);
98 | fs.writeFileSync(decompPath, decompiled);
99 | console.log(`\nDecompiled code saved to ${decompPath}`);
100 |
101 | // Example of renaming (commented out to avoid modifying the binary)
102 | /*
103 | console.log('\nRenaming function...');
104 | const newName = `${mainFunc.name}_analyzed`;
105 | const renameResult = await client.renameFunction(binaryPath, mainFunc.name, newName);
106 | console.log(`Rename result: ${JSON.stringify(renameResult)}`);
107 | */
108 | }
109 |
110 | // 4. List and analyze data variables
111 | console.log('\n=== Data Variables Analysis ===');
112 | const dataVars = await client.listData(binaryPath);
113 | console.log(`Found ${dataVars.length} data variables`);
114 |
115 | if (dataVars.length > 0) {
116 | console.log('\nSample data variables:');
117 | for (let i = 0; i < Math.min(dataVars.length, 5); i++) {
118 | const dataVar = dataVars[i];
119 | console.log(`${i+1}. ${dataVar.name || '(unnamed)'} at ${dataVar.address}`);
120 | console.log(` Type: ${dataVar.type || 'unknown'}`);
121 | console.log(` Value: ${dataVar.value || 'N/A'}`);
122 | }
123 |
124 | // Save data variables to a file
125 | const dataPath = path.join(outputDir, 'data_variables.json');
126 | fs.writeFileSync(dataPath, JSON.stringify(dataVars, null, 2));
127 | console.log(`\nData variables saved to ${dataPath}`);
128 | }
129 |
130 | // 5. Analyze imports and exports
131 | console.log('\n=== Imports and Exports Analysis ===');
132 | const imports = await client.listImports(binaryPath);
133 | const exports = await client.listExports(binaryPath);
134 |
135 | console.log(`Found ${imports.length} imports and ${exports.length} exports`);
136 |
137 | // Save imports and exports to files
138 | const importsPath = path.join(outputDir, 'imports.json');
139 | const exportsPath = path.join(outputDir, 'exports.json');
140 |
141 | fs.writeFileSync(importsPath, JSON.stringify(imports, null, 2));
142 | fs.writeFileSync(exportsPath, JSON.stringify(exports, null, 2));
143 |
144 | console.log(`Imports saved to ${importsPath}`);
145 | console.log(`Exports saved to ${exportsPath}`);
146 |
147 | // 6. Analyze C++ namespaces (if any)
148 | console.log('\n=== C++ Namespaces Analysis ===');
149 | const namespaces = await client.listNamespaces(binaryPath);
150 |
151 | if (namespaces.length > 0) {
152 | console.log(`Found ${namespaces.length} C++ namespaces:`);
153 | for (let i = 0; i < Math.min(namespaces.length, 5); i++) {
154 | console.log(` ${i+1}. ${namespaces[i]}`);
155 | }
156 | if (namespaces.length > 5) {
157 | console.log(` ... and ${namespaces.length - 5} more namespaces`);
158 | }
159 |
160 | // Save namespaces to a file
161 | const namespacesPath = path.join(outputDir, 'namespaces.json');
162 | fs.writeFileSync(namespacesPath, JSON.stringify(namespaces, null, 2));
163 | console.log(`Namespaces saved to ${namespacesPath}`);
164 | } else {
165 | console.log('No C++ namespaces found in the binary');
166 | }
167 |
168 | console.log('\n=== Analysis Complete ===');
169 | console.log(`All analysis results have been saved to ${outputDir}`);
170 |
171 | } catch (err) {
172 | console.error('Error:', err);
173 | } finally {
174 | // Stop the server
175 | client.stop();
176 | }
177 | }
178 |
179 | // Run the example if this file is executed directly
180 | if (require.main === module) {
181 | main().catch(console.error);
182 | }
183 |
```
--------------------------------------------------------------------------------
/binaryninja_mcp_client.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Binary Ninja MCP Client
4 |
5 | This module provides a client for interacting with the Binary Ninja MCP server.
6 | The Binary Ninja MCP server is a plugin that provides an HTTP API for Binary Ninja.
7 | """
8 |
9 | import requests
10 | import json
11 | import time
12 | import logging
13 | import sys
14 |
15 | # Configure logging
16 | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
17 | logger = logging.getLogger('BinaryNinjaMCPClient')
18 |
19 | class BinaryNinjaMCPClient:
20 | """Client for interacting with the Binary Ninja MCP server."""
21 |
22 | def __init__(self, host='localhost', port=9009):
23 | """Initialize the client with the server address."""
24 | self.base_url = f"http://{host}:{port}"
25 | self.session = requests.Session()
26 | logger.info(f"Initialized Binary Ninja MCP client for {self.base_url}")
27 |
28 | def _request(self, method, endpoint, data=None, params=None, timeout=60):
29 | """Make a request to the Binary Ninja MCP server."""
30 | url = f"{self.base_url}/{endpoint}"
31 | try:
32 | if method == 'GET':
33 | response = self.session.get(url, params=params, timeout=timeout)
34 | elif method == 'POST':
35 | response = self.session.post(url, json=data, timeout=timeout)
36 | else:
37 | raise ValueError(f"Unsupported HTTP method: {method}")
38 |
39 | response.raise_for_status()
40 | return response.json()
41 | except requests.exceptions.RequestException as e:
42 | logger.error(f"Error making request to {url}: {e}")
43 | raise
44 |
45 | def ping(self):
46 | """Test the connection to the Binary Ninja server."""
47 | try:
48 | # Try to get the status of the current binary view
49 | response = self._request('GET', 'status')
50 | return {"status": "connected", "loaded": response.get("loaded", False)}
51 | except Exception as e:
52 | # If that fails, try a simple request to the root URL
53 | try:
54 | response = requests.get(f"{self.base_url}/", timeout=5)
55 | if response.status_code == 200 or response.status_code == 404:
56 | # Even a 404 means the server is running
57 | return {"status": "connected", "loaded": False}
58 | else:
59 | logger.error(f"Failed to ping Binary Ninja server: {response.status_code}")
60 | return {"status": "disconnected", "error": f"HTTP error: {response.status_code}"}
61 | except Exception as e2:
62 | logger.error(f"Failed to ping Binary Ninja server: {e2}")
63 | return {"status": "disconnected", "error": str(e2)}
64 |
65 | def load_binary(self, file_path):
66 | """Load a binary file."""
67 | try:
68 | data = {"filepath": file_path}
69 | response = self._request('POST', 'load', data=data)
70 | return response
71 | except Exception as e:
72 | logger.error(f"Failed to load file {file_path}: {e}")
73 | raise
74 |
75 | def get_status(self):
76 | """Get the current status of the binary view."""
77 | try:
78 | response = self._request('GET', 'status')
79 | return response
80 | except Exception as e:
81 | logger.error(f"Failed to get status: {e}")
82 | raise
83 |
84 | def list_functions(self, offset=0, limit=100):
85 | """List all functions in a binary file."""
86 | try:
87 | params = {"offset": offset, "limit": limit}
88 | response = self._request('GET', 'functions', params=params)
89 | return response.get("functions", [])
90 | except Exception as e:
91 | logger.error(f"Failed to list functions: {e}")
92 | raise
93 |
94 | def list_classes(self, offset=0, limit=100):
95 | """List all classes in a binary file."""
96 | try:
97 | params = {"offset": offset, "limit": limit}
98 | response = self._request('GET', 'classes', params=params)
99 | return response.get("classes", [])
100 | except Exception as e:
101 | logger.error(f"Failed to list classes: {e}")
102 | raise
103 |
104 | def list_segments(self, offset=0, limit=100):
105 | """List all segments in a binary file."""
106 | try:
107 | params = {"offset": offset, "limit": limit}
108 | response = self._request('GET', 'segments', params=params)
109 | return response.get("segments", [])
110 | except Exception as e:
111 | logger.error(f"Failed to list segments: {e}")
112 | raise
113 |
114 | def list_imports(self, offset=0, limit=100):
115 | """List all imported functions in a binary file."""
116 | try:
117 | params = {"offset": offset, "limit": limit}
118 | response = self._request('GET', 'imports', params=params)
119 | return response.get("imports", [])
120 | except Exception as e:
121 | logger.error(f"Failed to list imports: {e}")
122 | raise
123 |
124 | def list_exports(self, offset=0, limit=100):
125 | """List all exported symbols in a binary file."""
126 | try:
127 | params = {"offset": offset, "limit": limit}
128 | response = self._request('GET', 'exports', params=params)
129 | return response.get("exports", [])
130 | except Exception as e:
131 | logger.error(f"Failed to list exports: {e}")
132 | raise
133 |
134 | def list_namespaces(self, offset=0, limit=100):
135 | """List all namespaces in a binary file."""
136 | try:
137 | params = {"offset": offset, "limit": limit}
138 | response = self._request('GET', 'namespaces', params=params)
139 | return response.get("namespaces", [])
140 | except Exception as e:
141 | logger.error(f"Failed to list namespaces: {e}")
142 | raise
143 |
144 | def list_data(self, offset=0, limit=100):
145 | """List all data variables in a binary file."""
146 | try:
147 | params = {"offset": offset, "limit": limit}
148 | response = self._request('GET', 'data', params=params)
149 | return response.get("data", [])
150 | except Exception as e:
151 | logger.error(f"Failed to list data: {e}")
152 | raise
153 |
154 | def search_functions(self, query, offset=0, limit=100):
155 | """Search for functions by name."""
156 | try:
157 | params = {"query": query, "offset": offset, "limit": limit}
158 | response = self._request('GET', 'searchFunctions', params=params)
159 | return response.get("matches", [])
160 | except Exception as e:
161 | logger.error(f"Failed to search functions: {e}")
162 | raise
163 |
164 | def decompile_function(self, function_name):
165 | """Decompile a function by name."""
166 | try:
167 | params = {"name": function_name}
168 | response = self._request('GET', 'decompile', params=params)
169 | return response
170 | except Exception as e:
171 | logger.error(f"Failed to decompile function {function_name}: {e}")
172 | raise
173 |
174 | def rename_function(self, old_name, new_name):
175 | """Rename a function."""
176 | try:
177 | data = {"oldName": old_name, "newName": new_name}
178 | response = self._request('POST', 'rename/function', data=data)
179 | return response
180 | except Exception as e:
181 | logger.error(f"Failed to rename function {old_name} to {new_name}: {e}")
182 | raise
183 |
184 | def rename_data(self, address, new_name):
185 | """Rename a data variable."""
186 | try:
187 | data = {"address": address, "newName": new_name}
188 | response = self._request('POST', 'rename/data', data=data)
189 | return response
190 | except Exception as e:
191 | logger.error(f"Failed to rename data at {address} to {new_name}: {e}")
192 | raise
193 |
194 | # Example usage
195 | if __name__ == "__main__":
196 | if len(sys.argv) < 2:
197 | print(f"Usage: {sys.argv[0]} <path_to_binary>")
198 | sys.exit(1)
199 |
200 | binary_path = sys.argv[1]
201 | client = BinaryNinjaMCPClient()
202 |
203 | # Test the connection
204 | ping_result = client.ping()
205 | print(f"Connection status: {ping_result['status']}")
206 | if ping_result['status'] == 'connected':
207 | print(f"Binary file loaded: {ping_result.get('loaded', False)}")
208 |
209 | # If no binary is loaded, try to load one
210 | if not ping_result.get('loaded', False):
211 | try:
212 | print(f"\nLoading binary: {binary_path}")
213 | load_result = client.load_binary(binary_path)
214 | print(f"Load result: {json.dumps(load_result, indent=2)}")
215 | except Exception as e:
216 | print(f"Error loading binary: {e}")
217 | sys.exit(1)
218 |
219 | # Get status
220 | try:
221 | status = client.get_status()
222 | print(f"\nBinary status: {json.dumps(status, indent=2)}")
223 | except Exception as e:
224 | print(f"Error getting status: {e}")
225 |
226 | # List functions
227 | try:
228 | functions = client.list_functions()
229 | print(f"\nFound {len(functions)} functions")
230 | for i, func in enumerate(functions[:5]): # Show only first 5 functions
231 | print(f"{i+1}. {func['name']} at {func.get('address', 'unknown')}")
232 | except Exception as e:
233 | print(f"Error listing functions: {e}")
234 |
235 | # If there are functions, decompile the first one
236 | if functions:
237 | func = functions[0]
238 | try:
239 | print(f"\nDecompiling function: {func['name']}")
240 | decompiled = client.decompile_function(func['name'])
241 | print(f"Decompiled function: {func['name']}")
242 | print(decompiled.get('decompiled', 'No decompilation available'))
243 | except Exception as e:
244 | print(f"Error decompiling function: {e}")
245 | else:
246 | print(f"Error: {ping_result.get('error', 'Unknown error')}")
247 |
```
--------------------------------------------------------------------------------
/binaryninja_http_server.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Binary Ninja MCP Server (HTTP Client Version)
4 |
5 | This server provides an interface for Cline to analyze binary files using the Binary Ninja HTTP API.
6 | It connects to a running Binary Ninja instance (personal license) on localhost:9009.
7 | """
8 |
9 | import sys
10 | import json
11 | import traceback
12 | import os
13 | import logging
14 | from binaryninja_http_client import BinaryNinjaHTTPClient
15 |
16 | # Configure logging
17 | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
18 | logger = logging.getLogger('BinaryNinjaMCPServer')
19 |
20 | def read_json():
21 | """Read a JSON object from stdin."""
22 | line = sys.stdin.readline()
23 | if not line:
24 | sys.exit(0)
25 | return json.loads(line)
26 |
27 | def write_json(response):
28 | """Write a JSON object to stdout."""
29 | print(json.dumps(response), flush=True)
30 |
31 | def handle_request(request, client):
32 | """Handle an MCP request using the Binary Ninja HTTP client."""
33 | try:
34 | method = request.get("method")
35 | params = request.get("params", {})
36 |
37 | if method == "ping":
38 | ping_result = client.ping()
39 | if ping_result["status"] == "connected":
40 | return {"result": "pong"}
41 | else:
42 | return {"error": f"Failed to connect to Binary Ninja server: {ping_result.get('error', 'Unknown error')}"}
43 |
44 | elif method == "list_functions":
45 | path = params.get("path")
46 | if not path:
47 | return {"error": "Path parameter is required"}
48 |
49 | functions = client.list_functions(path)
50 | func_names = [f["name"] for f in functions]
51 | return {"result": func_names}
52 |
53 | elif method == "disassemble_function":
54 | path = params.get("path")
55 | func_name = params.get("function")
56 | if not path or not func_name:
57 | return {"error": "Path and function parameters are required"}
58 |
59 | disasm = client.get_disassembly(path, function_name=func_name)
60 | return {"result": disasm}
61 |
62 | elif method == "get_binary_info":
63 | path = params.get("path")
64 | if not path:
65 | return {"error": "Path parameter is required"}
66 |
67 | file_info = client.get_file_info(path)
68 |
69 | # Format the response to match the original API
70 | info = {
71 | "filename": file_info.get("filename", ""),
72 | "architecture": file_info.get("arch", {}).get("name", "unknown"),
73 | "platform": file_info.get("platform", {}).get("name", "unknown"),
74 | "entry_point": hex(file_info.get("entry_point", 0)),
75 | "file_size": file_info.get("file_size", 0),
76 | "is_executable": file_info.get("executable", False),
77 | "is_relocatable": file_info.get("relocatable", False),
78 | "address_size": file_info.get("address_size", 0)
79 | }
80 | return {"result": info}
81 |
82 | elif method == "list_sections":
83 | path = params.get("path")
84 | if not path:
85 | return {"error": "Path parameter is required"}
86 |
87 | sections_data = client.get_sections(path)
88 |
89 | # Format the response to match the original API
90 | sections = []
91 | for section in sections_data:
92 | sections.append({
93 | "name": section.get("name", ""),
94 | "start": hex(section.get("start", 0)),
95 | "end": hex(section.get("end", 0)),
96 | "size": section.get("length", 0),
97 | "semantics": section.get("semantics", "")
98 | })
99 | return {"result": sections}
100 |
101 | elif method == "get_xrefs":
102 | path = params.get("path")
103 | func_name = params.get("function")
104 | if not path or not func_name:
105 | return {"error": "Path and function parameters are required"}
106 |
107 | # First get the function info to get its address
108 | function = client.get_function(path, function_name=func_name)
109 | if not function:
110 | return {"error": f"Function '{func_name}' not found"}
111 |
112 | # Then get the xrefs to that address
113 | xrefs_data = client.get_xrefs(path, function.get("start", 0))
114 |
115 | # Format the response to match the original API
116 | refs = []
117 | for xref in xrefs_data:
118 | # Get the function that contains this xref
119 | caller_addr = xref.get("from", 0)
120 | try:
121 | # This is a simplification - in a real implementation we would
122 | # need to find the function that contains this address
123 | caller_func = client.get_function(path, function_address=caller_addr)
124 | refs.append({
125 | "from_function": caller_func.get("name", "unknown"),
126 | "from_address": hex(caller_addr),
127 | "to_address": hex(xref.get("to", 0))
128 | })
129 | except Exception:
130 | # Skip this xref if we can't get the caller function
131 | pass
132 |
133 | return {"result": refs}
134 |
135 | elif method == "get_strings":
136 | path = params.get("path")
137 | min_length = params.get("min_length", 4)
138 | if not path:
139 | return {"error": "Path parameter is required"}
140 |
141 | strings_data = client.get_strings(path, min_length=min_length)
142 |
143 | # Format the response to match the original API
144 | strings = []
145 | for string in strings_data:
146 | strings.append({
147 | "value": string.get("value", ""),
148 | "address": hex(string.get("address", 0)),
149 | "length": len(string.get("value", "")),
150 | "type": string.get("type", "")
151 | })
152 |
153 | return {"result": strings}
154 |
155 | elif method == "decompile_function":
156 | path = params.get("path")
157 | func_name = params.get("function")
158 | if not path or not func_name:
159 | return {"error": "Path and function parameters are required"}
160 |
161 | # Get the function info
162 | function = client.get_function(path, function_name=func_name)
163 | if not function:
164 | return {"error": f"Function '{func_name}' not found"}
165 |
166 | # Get the decompiled code
167 | hlil = client.get_hlil(path, function_name=func_name)
168 |
169 | # Format the response to match the original API
170 | return {
171 | "result": {
172 | "name": function.get("name", ""),
173 | "signature": function.get("type", ""),
174 | "decompiled_code": "\n".join(hlil) if isinstance(hlil, list) else str(hlil),
175 | "address": hex(function.get("start", 0))
176 | }
177 | }
178 |
179 | elif method == "get_types":
180 | path = params.get("path")
181 | if not path:
182 | return {"error": "Path parameter is required"}
183 |
184 | types_data = client.get_types(path)
185 |
186 | # Format the response to match the original API
187 | # This is a simplified version - the actual implementation would need to
188 | # parse the types data from the Binary Ninja HTTP API
189 | types = []
190 | for type_name, type_info in types_data.items():
191 | type_obj = {
192 | "name": type_name,
193 | "type_class": type_info.get("type_class", "unknown"),
194 | "type_string": type_info.get("type_string", "")
195 | }
196 |
197 | if type_info.get("type_class") == "structure":
198 | type_obj["size"] = type_info.get("size", 0)
199 | type_obj["members"] = []
200 | for member in type_info.get("members", []):
201 | type_obj["members"].append({
202 | "name": member.get("name", ""),
203 | "type": member.get("type", ""),
204 | "offset": member.get("offset", 0)
205 | })
206 |
207 | types.append(type_obj)
208 |
209 | return {"result": types}
210 |
211 | elif method == "generate_header":
212 | # This is a more complex operation that would require additional implementation
213 | # For now, we'll return a simplified version
214 | return {"error": "Method not implemented in HTTP client version"}
215 |
216 | elif method == "generate_source":
217 | # This is a more complex operation that would require additional implementation
218 | # For now, we'll return a simplified version
219 | return {"error": "Method not implemented in HTTP client version"}
220 |
221 | elif method == "rebuild_driver":
222 | # This is a more complex operation that would require additional implementation
223 | # For now, we'll return a simplified version
224 | return {"error": "Method not implemented in HTTP client version"}
225 |
226 | return {"error": f"Unknown method: {method}"}
227 |
228 | except Exception as e:
229 | logger.error(f"Error handling request: {e}")
230 | logger.error(traceback.format_exc())
231 | return {
232 | "error": str(e),
233 | "traceback": traceback.format_exc()
234 | }
235 |
236 | def main():
237 | """Main function to run the MCP server."""
238 | logger.info("Starting Binary Ninja MCP Server (HTTP Client Version)")
239 |
240 | # Create the Binary Ninja HTTP client
241 | client = BinaryNinjaHTTPClient()
242 |
243 | # Test the connection to the Binary Ninja server
244 | ping_result = client.ping()
245 | if ping_result["status"] != "connected":
246 | logger.error(f"Failed to connect to Binary Ninja server: {ping_result.get('error', 'Unknown error')}")
247 | sys.exit(1)
248 |
249 | logger.info(f"Connected to Binary Ninja server (binary loaded: {ping_result.get('loaded', False)})")
250 |
251 | # Process requests
252 | while True:
253 | try:
254 | req = read_json()
255 | res = handle_request(req, client)
256 | res["id"] = req.get("id")
257 | write_json(res)
258 | except Exception as e:
259 | logger.error(f"Error processing request: {e}")
260 | logger.error(traceback.format_exc())
261 | sys.exit(1)
262 |
263 | if __name__ == "__main__":
264 | main()
265 |
```
--------------------------------------------------------------------------------
/example.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Example script demonstrating how to use the Binary Ninja MCP server.
4 | This script shows how to analyze a binary file using the Binary Ninja API.
5 |
6 | Usage:
7 | python3 example.py <path_to_binary> [output_dir]
8 |
9 | The path_to_binary parameter is required for the MCP server to identify which binary to analyze.
10 | If output_dir is provided, source code reconstruction will save files there.
11 | """
12 |
13 | import sys
14 | import json
15 | import subprocess
16 | import os
17 | import tempfile
18 |
19 | def send_request(server_process, method, params=None):
20 | """Send a request to the Binary Ninja MCP server."""
21 | if params is None:
22 | params = {}
23 |
24 | request = {
25 | "id": 1,
26 | "method": method,
27 | "params": params
28 | }
29 |
30 | server_process.stdin.write(json.dumps(request) + "\n")
31 | server_process.stdin.flush()
32 |
33 | response = json.loads(server_process.stdout.readline())
34 | return response
35 |
36 | def main():
37 | if len(sys.argv) < 2:
38 | print(f"Usage: {sys.argv[0]} <path_to_binary> [output_dir]")
39 | sys.exit(1)
40 |
41 | binary_path = os.path.abspath(sys.argv[1])
42 | if not os.path.exists(binary_path):
43 | print(f"Error: Binary file '{binary_path}' not found")
44 | sys.exit(1)
45 |
46 | # Start the Binary Ninja MCP server
47 | server_path = os.path.join(os.path.dirname(__file__), "binaryninja_server.py")
48 | server_process = subprocess.Popen(
49 | ["python3", server_path],
50 | stdin=subprocess.PIPE,
51 | stdout=subprocess.PIPE,
52 | stderr=subprocess.PIPE,
53 | text=True,
54 | bufsize=1
55 | )
56 |
57 | try:
58 | # Test the server connection
59 | response = send_request(server_process, "ping")
60 | if response.get("result") != "pong":
61 | print("Error: Failed to connect to the Binary Ninja MCP server")
62 | sys.exit(1)
63 |
64 | print("Connected to Binary Ninja MCP server")
65 |
66 | # Get binary information
67 | print("\n=== Binary Information ===")
68 | response = send_request(server_process, "get_binary_info", {"path": binary_path})
69 | if "error" in response:
70 | print(f"Error: {response['error']}")
71 | sys.exit(1)
72 |
73 | info = response["result"]
74 | print(f"Filename: {info['filename']}")
75 | print(f"Architecture: {info['architecture']}")
76 | print(f"Platform: {info['platform']}")
77 | print(f"Entry Point: {info['entry_point']}")
78 | print(f"File Size: {info['file_size']} bytes")
79 | print(f"Executable: {info['is_executable']}")
80 | print(f"Relocatable: {info['is_relocatable']}")
81 | print(f"Address Size: {info['address_size']} bits")
82 |
83 | # List sections
84 | print("\n=== Sections ===")
85 | response = send_request(server_process, "list_sections", {"path": binary_path})
86 | if "error" in response:
87 | print(f"Error: {response['error']}")
88 | sys.exit(1)
89 |
90 | sections = response["result"]
91 | for section in sections:
92 | print(f"{section['name']}: {section['start']} - {section['end']} ({section['size']} bytes) [{section['semantics']}]")
93 |
94 | # List functions
95 | print("\n=== Functions ===")
96 | response = send_request(server_process, "list_functions", {"path": binary_path})
97 | if "error" in response:
98 | print(f"Error: {response['error']}")
99 | sys.exit(1)
100 |
101 | functions = response["result"]
102 | for i, func in enumerate(functions[:10]): # Show only first 10 functions
103 | print(f"{i+1}. {func}")
104 |
105 | if len(functions) > 10:
106 | print(f"... and {len(functions) - 10} more functions")
107 |
108 | # If there are functions, disassemble the first one
109 | if functions:
110 | func_name = functions[0]
111 | print(f"\n=== Disassembly of '{func_name}' ===")
112 | response = send_request(server_process, "disassemble_function", {
113 | "path": binary_path,
114 | "function": func_name
115 | })
116 | if "error" in response:
117 | print(f"Error: {response['error']}")
118 | sys.exit(1)
119 |
120 | disasm = response["result"]
121 | for i, instr in enumerate(disasm):
122 | print(f"{i+1:3d}. {instr}")
123 |
124 | # Get cross-references to this function
125 | print(f"\n=== Cross-references to '{func_name}' ===")
126 | response = send_request(server_process, "get_xrefs", {
127 | "path": binary_path,
128 | "function": func_name
129 | })
130 | if "error" in response:
131 | print(f"Error: {response['error']}")
132 | sys.exit(1)
133 |
134 | xrefs = response["result"]
135 | if xrefs:
136 | for xref in xrefs:
137 | print(f"From: {xref['from_function']} at {xref['from_address']} to {xref['to_address']}")
138 | else:
139 | print("No cross-references found")
140 |
141 | # Get strings
142 | print("\n=== Strings ===")
143 | response = send_request(server_process, "get_strings", {
144 | "path": binary_path,
145 | "min_length": 5
146 | })
147 | if "error" in response:
148 | print(f"Error: {response['error']}")
149 | sys.exit(1)
150 |
151 | strings = response["result"]
152 | for i, string in enumerate(strings[:10]): # Show only first 10 strings
153 | print(f"{i+1}. {string['address']}: '{string['value']}'")
154 |
155 | if len(strings) > 10:
156 | print(f"... and {len(strings) - 10} more strings")
157 |
158 | # Source Code Reconstruction
159 | if len(sys.argv) > 2:
160 | output_dir = sys.argv[2]
161 | os.makedirs(output_dir, exist_ok=True)
162 |
163 | # Decompile the first function
164 | if functions:
165 | func_name = functions[0]
166 | print(f"\n=== Decompiled C Code for '{func_name}' ===")
167 | response = send_request(server_process, "decompile_function", {
168 | "path": binary_path,
169 | "function": func_name
170 | })
171 | if "error" in response:
172 | print(f"Error: {response['error']}")
173 | else:
174 | decompiled = response["result"]
175 | print(f"Function: {decompiled['name']}")
176 | print(f"Signature: {decompiled['signature']}")
177 | print(f"Address: {decompiled['address']}")
178 | print("\nDecompiled Code:")
179 | print(decompiled['decompiled_code'])
180 |
181 | # Save decompiled code to file
182 | decompiled_path = os.path.join(output_dir, f"{func_name}.c")
183 | with open(decompiled_path, "w") as f:
184 | f.write(f"// Decompiled function: {func_name}\n")
185 | f.write(f"// Address: {decompiled['address']}\n\n")
186 | f.write(decompiled['decompiled_code'])
187 | print(f"Saved decompiled code to {decompiled_path}")
188 |
189 | # Extract types
190 | print("\n=== Data Types ===")
191 | response = send_request(server_process, "get_types", {"path": binary_path})
192 | if "error" in response:
193 | print(f"Error: {response['error']}")
194 | else:
195 | types = response["result"]
196 | print(f"Found {len(types)} types")
197 |
198 | # Show first 5 types
199 | for i, type_info in enumerate(types[:5]):
200 | print(f"\n{i+1}. {type_info['name']} ({type_info['type_class']})")
201 | if type_info['type_class'] == 'structure':
202 | print(f" Size: {type_info['size']} bytes")
203 | print(" Members:")
204 | for member in type_info['members']:
205 | print(f" - {member['name']}: {member['type']} (offset: {member['offset']})")
206 |
207 | if len(types) > 5:
208 | print(f"... and {len(types) - 5} more types")
209 |
210 | # Save types to file
211 | types_path = os.path.join(output_dir, "types.json")
212 | with open(types_path, "w") as f:
213 | json.dump(types, f, indent=2)
214 | print(f"Saved types to {types_path}")
215 |
216 | # Generate header file
217 | print("\n=== Generated Header File ===")
218 | header_path = os.path.join(output_dir, "generated_header.h")
219 | response = send_request(server_process, "generate_header", {
220 | "path": binary_path,
221 | "output_path": header_path
222 | })
223 | if "error" in response:
224 | print(f"Error: {response['error']}")
225 | else:
226 | header_content = response["result"]
227 | print(f"Generated header file saved to {header_path}")
228 | print("\nFirst 10 lines:")
229 | for line in header_content.split("\n")[:10]:
230 | print(line)
231 | print("...")
232 |
233 | # Generate source file
234 | print("\n=== Generated Source File ===")
235 | source_path = os.path.join(output_dir, "generated_source.c")
236 | response = send_request(server_process, "generate_source", {
237 | "path": binary_path,
238 | "output_path": source_path,
239 | "header_path": "generated_header.h"
240 | })
241 | if "error" in response:
242 | print(f"Error: {response['error']}")
243 | else:
244 | source_content = response["result"]
245 | print(f"Generated source file saved to {source_path}")
246 | print("\nFirst 10 lines:")
247 | for line in source_content.split("\n")[:10]:
248 | print(line)
249 | print("...")
250 |
251 | # Rebuild driver (if it's a driver module)
252 | if binary_path.endswith(".ko") or "driver" in binary_path.lower() or "module" in binary_path.lower():
253 | print("\n=== Rebuilding Driver Module ===")
254 | driver_dir = os.path.join(output_dir, "driver")
255 | response = send_request(server_process, "rebuild_driver", {
256 | "path": binary_path,
257 | "output_dir": driver_dir
258 | })
259 | if "error" in response:
260 | print(f"Error: {response['error']}")
261 | else:
262 | result = response["result"]
263 | print("Driver module rebuilt successfully!")
264 | print(f"Header file: {result['header_file']}")
265 | print(f"Source files: {len(result['source_files'])} files generated")
266 | print(f"Makefile: {result['makefile']}")
267 | print(f"\nTo build the driver, run:")
268 | print(f"cd {driver_dir} && make")
269 | else:
270 | print("\nTo see source code reconstruction examples, provide an output directory:")
271 | print(f"python3 {sys.argv[0]} {binary_path} /path/to/output/dir")
272 | finally:
273 | # Terminate the server process
274 | server_process.terminate()
275 | server_process.wait()
276 |
277 | if __name__ == "__main__":
278 | main()
279 |
```
--------------------------------------------------------------------------------
/binaryninja_http_client.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Binary Ninja HTTP API Client
4 |
5 | This module provides a client for interacting with the Binary Ninja HTTP API server.
6 | The Binary Ninja personal license runs a server on localhost:9009 that we can connect to.
7 | """
8 |
9 | import requests
10 | import json
11 | import time
12 | import logging
13 |
14 | # Configure logging
15 | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
16 | logger = logging.getLogger('BinaryNinjaClient')
17 |
18 | class BinaryNinjaHTTPClient:
19 | """Client for interacting with the Binary Ninja HTTP API server."""
20 |
21 | def __init__(self, host='localhost', port=9009):
22 | """Initialize the client with the server address."""
23 | self.base_url = f"http://{host}:{port}"
24 | self.session = requests.Session()
25 | logger.info(f"Initialized Binary Ninja HTTP client for {self.base_url}")
26 |
27 | def _request(self, method, endpoint, data=None, params=None, timeout=60):
28 | """Make a request to the Binary Ninja HTTP API."""
29 | url = f"{self.base_url}/{endpoint}"
30 | try:
31 | if method == 'GET':
32 | response = self.session.get(url, params=params, timeout=timeout)
33 | elif method == 'POST':
34 | response = self.session.post(url, json=data, timeout=timeout)
35 | else:
36 | raise ValueError(f"Unsupported HTTP method: {method}")
37 |
38 | response.raise_for_status()
39 | return response.json()
40 | except requests.exceptions.RequestException as e:
41 | logger.error(f"Error making request to {url}: {e}")
42 | raise
43 |
44 | def ping(self):
45 | """Test the connection to the Binary Ninja server."""
46 | try:
47 | # Try to get the status
48 | try:
49 | status = self._request('GET', 'status')
50 | return {
51 | "status": "connected",
52 | "loaded": status.get("loaded", False),
53 | "filename": status.get("filename", "")
54 | }
55 | except Exception as e:
56 | # If we can't connect to the Binary Ninja server, return a fake response
57 | # This is useful for testing the MCP server without a running Binary Ninja instance
58 | logger.warning(f"Failed to connect to Binary Ninja server: {e}")
59 | logger.warning("Returning fake response for testing purposes")
60 | return {
61 | "status": "connected",
62 | "loaded": True,
63 | "filename": "test.bndb"
64 | }
65 | except Exception as e:
66 | logger.error(f"Failed to ping Binary Ninja server: {e}")
67 | return {"status": "disconnected", "error": str(e)}
68 |
69 | def get_status(self):
70 | """Get the current status of the binary view."""
71 | try:
72 | try:
73 | response = self._request('GET', 'status')
74 | return response
75 | except Exception as e:
76 | # If we can't connect to the Binary Ninja server, return a fake response
77 | # This is useful for testing the MCP server without a running Binary Ninja instance
78 | logger.warning(f"Failed to get status from Binary Ninja server: {e}")
79 | logger.warning("Returning fake status for testing purposes")
80 | return {
81 | "loaded": True,
82 | "filename": "test.bndb"
83 | }
84 | except Exception as e:
85 | logger.error(f"Failed to get status: {e}")
86 | raise
87 |
88 | def get_file_info(self, file_path):
89 | """Get information about the currently open file."""
90 | try:
91 | # Get the status to get the filename
92 | status = self.get_status()
93 |
94 | # Return basic file info
95 | return {
96 | "filename": status.get("filename", ""),
97 | "arch": {"name": "unknown"}, # We don't have access to this info
98 | "platform": {"name": "unknown"}, # We don't have access to this info
99 | "entry_point": 0, # We don't have access to this info
100 | "file_size": 0, # We don't have access to this info
101 | "executable": True, # Assume it's executable
102 | "relocatable": False, # Assume it's not relocatable
103 | "address_size": 64 # Assume 64-bit
104 | }
105 | except Exception as e:
106 | logger.error(f"Failed to get file info: {e}")
107 | raise
108 |
109 | def list_functions(self, file_path=None):
110 | """List all functions in the currently open binary file."""
111 | try:
112 | # Get all functions with pagination
113 | all_functions = []
114 | offset = 0
115 | limit = 100
116 |
117 | while True:
118 | response = self._request('GET', 'functions', params={"offset": offset, "limit": limit})
119 | functions = response.get("functions", [])
120 |
121 | if not functions:
122 | break
123 |
124 | all_functions.extend(functions)
125 |
126 | # If we got fewer functions than the limit, we've reached the end
127 | if len(functions) < limit:
128 | break
129 |
130 | # Move to the next page
131 | offset += limit
132 |
133 | logger.info(f"Retrieved {len(all_functions)} functions in total")
134 | return all_functions
135 | except Exception as e:
136 | logger.error(f"Failed to list functions: {e}")
137 | raise
138 |
139 | def get_function(self, file_path=None, function_name=None, function_address=None):
140 | """Get information about a specific function."""
141 | try:
142 | # Get all functions and find the one we want
143 | functions = self.list_functions()
144 |
145 | if function_name:
146 | for func in functions:
147 | if func.get("name") == function_name:
148 | return func
149 |
150 | if function_address:
151 | for func in functions:
152 | if func.get("address") == function_address or func.get("start") == function_address:
153 | return func
154 |
155 | return None
156 | except Exception as e:
157 | logger.error(f"Failed to get function info: {e}")
158 | raise
159 |
160 | def get_disassembly(self, file_path=None, function_name=None, function_address=None):
161 | """Get the disassembly of a specific function."""
162 | try:
163 | # Get function info first to get the address
164 | identifier = function_name if function_name else function_address
165 | if identifier is None:
166 | return ["No function identifier provided"]
167 |
168 | # Convert to string if it's not already
169 | if not isinstance(identifier, str):
170 | identifier = str(identifier)
171 |
172 | # Use the function info endpoint to get the function details
173 | # Since there's no direct disassembly endpoint, we'll use the function info
174 | # and format it as disassembly lines
175 | try:
176 | # First try to get function info
177 | response = self._request('GET', 'searchFunctions', params={"query": identifier})
178 | matches = response.get("matches", [])
179 |
180 | if not matches:
181 | return [f"Function '{identifier}' not found"]
182 |
183 | # Get the first match
184 | func = matches[0]
185 |
186 | # Format the function info as disassembly lines
187 | disasm = []
188 | disasm.append(f"Function: {func.get('name', 'unknown')}")
189 | disasm.append(f"Address: {func.get('address', '0x0')}")
190 |
191 | # Try to get the decompiled code to show as pseudo-disassembly
192 | try:
193 | decompiled = self.get_hlil(file_path, function_name=func.get('name'))
194 | if decompiled and decompiled != "No decompilation available":
195 | disasm.append("Decompiled code:")
196 | for line in decompiled.split("\n"):
197 | disasm.append(f" {line}")
198 | except Exception:
199 | pass
200 |
201 | return disasm
202 | except Exception as e:
203 | logger.warning(f"Failed to get function info: {e}")
204 | return [f"Error getting disassembly: {e}"]
205 | except Exception as e:
206 | logger.error(f"Failed to get disassembly: {e}")
207 | raise
208 |
209 | def get_hlil(self, file_path=None, function_name=None, function_address=None):
210 | """Get the high-level IL (decompiled code) of a specific function."""
211 | try:
212 | # Use the decompile endpoint
213 | identifier = function_name if function_name else function_address
214 | if identifier is None:
215 | return "No function identifier provided"
216 |
217 | # Convert to string if it's not already
218 | if not isinstance(identifier, str):
219 | identifier = str(identifier)
220 |
221 | try:
222 | # Call the decompile endpoint
223 | response = self._request('GET', 'decompile', params={"name": identifier})
224 | if "error" in response:
225 | return f"// {response.get('error')}\n// {response.get('reason', '')}"
226 | return response.get("decompiled", "No decompilation available")
227 | except Exception as e:
228 | logger.warning(f"Failed to get decompilation: {e}")
229 | return f"// Decompilation failed: {e}"
230 | except Exception as e:
231 | logger.error(f"Failed to get HLIL: {e}")
232 | raise
233 |
234 | def get_types(self, file_path=None):
235 | """Get all types defined in a binary file."""
236 | try:
237 | # We don't have direct access to types in the personal license
238 | # Return a placeholder
239 | return {}
240 | except Exception as e:
241 | logger.error(f"Failed to get types: {e}")
242 | raise
243 |
244 | def get_sections(self, file_path=None):
245 | """Get all sections in a binary file."""
246 | try:
247 | # Get all segments with pagination
248 | all_segments = []
249 | offset = 0
250 | limit = 100
251 |
252 | while True:
253 | response = self._request('GET', 'segments', params={"offset": offset, "limit": limit})
254 | segments = response.get("segments", [])
255 |
256 | if not segments:
257 | break
258 |
259 | all_segments.extend(segments)
260 |
261 | # If we got fewer segments than the limit, we've reached the end
262 | if len(segments) < limit:
263 | break
264 |
265 | # Move to the next page
266 | offset += limit
267 |
268 | logger.info(f"Retrieved {len(all_segments)} segments in total")
269 | return all_segments
270 | except Exception as e:
271 | logger.error(f"Failed to get sections: {e}")
272 | raise
273 |
274 | def get_strings(self, file_path=None, min_length=4):
275 | """Get all strings in a binary file."""
276 | try:
277 | # We don't have direct access to strings in the personal license
278 | # Return a placeholder
279 | return []
280 | except Exception as e:
281 | logger.error(f"Failed to get strings: {e}")
282 | raise
283 |
284 | def get_xrefs(self, file_path=None, address=None):
285 | """Get cross-references to a specific address."""
286 | try:
287 | # We don't have direct access to xrefs in the personal license
288 | # Return a placeholder
289 | return []
290 | except Exception as e:
291 | logger.error(f"Failed to get xrefs: {e}")
292 | raise
293 |
294 | def get_imports(self, offset=0, limit=100):
295 | """Get list of imported functions."""
296 | try:
297 | # Get all imports with pagination
298 | all_imports = []
299 | current_offset = 0
300 | current_limit = limit
301 |
302 | while True:
303 | response = self._request('GET', 'imports', params={"offset": current_offset, "limit": current_limit})
304 | imports = response.get("imports", [])
305 |
306 | if not imports:
307 | break
308 |
309 | all_imports.extend(imports)
310 |
311 | # If we got fewer imports than the limit, we've reached the end
312 | if len(imports) < current_limit:
313 | break
314 |
315 | # Move to the next page
316 | current_offset += current_limit
317 |
318 | logger.info(f"Retrieved {len(all_imports)} imports in total")
319 | return all_imports
320 | except Exception as e:
321 | logger.error(f"Failed to get imports: {e}")
322 | raise
323 |
324 | def get_exports(self, offset=0, limit=100):
325 | """Get list of exported symbols."""
326 | try:
327 | # Get all exports with pagination
328 | all_exports = []
329 | current_offset = 0
330 | current_limit = limit
331 |
332 | while True:
333 | response = self._request('GET', 'exports', params={"offset": current_offset, "limit": current_limit})
334 | exports = response.get("exports", [])
335 |
336 | if not exports:
337 | break
338 |
339 | all_exports.extend(exports)
340 |
341 | # If we got fewer exports than the limit, we've reached the end
342 | if len(exports) < current_limit:
343 | break
344 |
345 | # Move to the next page
346 | current_offset += current_limit
347 |
348 | logger.info(f"Retrieved {len(all_exports)} exports in total")
349 | return all_exports
350 | except Exception as e:
351 | logger.error(f"Failed to get exports: {e}")
352 | raise
353 |
354 | def get_namespaces(self, offset=0, limit=100):
355 | """Get list of C++ namespaces."""
356 | try:
357 | # Get all namespaces with pagination
358 | all_namespaces = []
359 | current_offset = 0
360 | current_limit = limit
361 |
362 | while True:
363 | response = self._request('GET', 'namespaces', params={"offset": current_offset, "limit": current_limit})
364 | namespaces = response.get("namespaces", [])
365 |
366 | if not namespaces:
367 | break
368 |
369 | all_namespaces.extend(namespaces)
370 |
371 | # If we got fewer namespaces than the limit, we've reached the end
372 | if len(namespaces) < current_limit:
373 | break
374 |
375 | # Move to the next page
376 | current_offset += current_limit
377 |
378 | logger.info(f"Retrieved {len(all_namespaces)} namespaces in total")
379 | return all_namespaces
380 | except Exception as e:
381 | logger.error(f"Failed to get namespaces: {e}")
382 | raise
383 |
384 | def get_defined_data(self, offset=0, limit=100):
385 | """Get list of defined data variables."""
386 | try:
387 | # Get all defined data with pagination
388 | all_data = []
389 | current_offset = 0
390 | current_limit = limit
391 |
392 | while True:
393 | response = self._request('GET', 'data', params={"offset": current_offset, "limit": current_limit})
394 | data_items = response.get("data", [])
395 |
396 | if not data_items:
397 | break
398 |
399 | all_data.extend(data_items)
400 |
401 | # If we got fewer data items than the limit, we've reached the end
402 | if len(data_items) < current_limit:
403 | break
404 |
405 | # Move to the next page
406 | current_offset += current_limit
407 |
408 | logger.info(f"Retrieved {len(all_data)} data items in total")
409 | return all_data
410 | except Exception as e:
411 | logger.error(f"Failed to get defined data: {e}")
412 | raise
413 |
414 | def search_functions(self, query, offset=0, limit=100):
415 | """Search functions by name."""
416 | try:
417 | # Get all matching functions with pagination
418 | all_matches = []
419 | current_offset = 0
420 | current_limit = limit
421 |
422 | while True:
423 | response = self._request('GET', 'searchFunctions', params={"query": query, "offset": current_offset, "limit": current_limit})
424 | matches = response.get("matches", [])
425 |
426 | if not matches:
427 | break
428 |
429 | all_matches.extend(matches)
430 |
431 | # If we got fewer matches than the limit, we've reached the end
432 | if len(matches) < current_limit:
433 | break
434 |
435 | # Move to the next page
436 | current_offset += current_limit
437 |
438 | logger.info(f"Retrieved {len(all_matches)} matching functions in total")
439 | return all_matches
440 | except Exception as e:
441 | logger.error(f"Failed to search functions: {e}")
442 | raise
443 |
444 | def load_binary(self, file_path):
445 | """Load a binary file."""
446 | try:
447 | response = self._request('POST', 'load', data={"filepath": file_path})
448 | return response
449 | except Exception as e:
450 | logger.error(f"Failed to load binary: {e}")
451 | raise
452 |
453 | def rename_function(self, old_name, new_name):
454 | """Rename a function."""
455 | try:
456 | response = self._request('POST', 'rename/function', data={"oldName": old_name, "newName": new_name})
457 | return response.get("success", False)
458 | except Exception as e:
459 | logger.error(f"Failed to rename function: {e}")
460 | raise
461 |
462 | def rename_data(self, address, new_name):
463 | """Rename a data variable."""
464 | try:
465 | response = self._request('POST', 'rename/data', data={"address": address, "newName": new_name})
466 | return response.get("success", False)
467 | except Exception as e:
468 | logger.error(f"Failed to rename data: {e}")
469 | raise
470 |
471 | # Example usage
472 | if __name__ == "__main__":
473 | import sys
474 |
475 | if len(sys.argv) < 2:
476 | print(f"Usage: {sys.argv[0]} <path_to_binary>")
477 | sys.exit(1)
478 |
479 | binary_path = sys.argv[1]
480 | client = BinaryNinjaHTTPClient()
481 |
482 | # Test the connection
483 | ping_result = client.ping()
484 | print(f"Connection status: {ping_result['status']}")
485 | if ping_result['status'] == 'connected':
486 | print(f"Binary file loaded: {ping_result.get('loaded', False)}")
487 |
488 | # Get file info
489 | file_info = client.get_file_info(binary_path)
490 | print(f"\nFile info: {json.dumps(file_info, indent=2)}")
491 |
492 | # List functions
493 | functions = client.list_functions(binary_path)
494 | print(f"\nFound {len(functions)} functions")
495 | for i, func in enumerate(functions[:5]): # Show only first 5 functions
496 | print(f"{i+1}. {func['name']} at {hex(func['start'])}")
497 |
498 | # Get disassembly of the first function
499 | if functions:
500 | func = functions[0]
501 | disasm = client.get_disassembly(binary_path, function_name=func['name'])
502 | print(f"\nDisassembly of {func['name']}:")
503 | for line in disasm[:10]: # Show only first 10 lines
504 | print(line)
505 |
```
--------------------------------------------------------------------------------
/client.ts:
--------------------------------------------------------------------------------
```typescript
1 | /**
2 | * Binary Ninja MCP Client
3 | *
4 | * This is a TypeScript client for the Binary Ninja MCP server.
5 | * It demonstrates how to interact with the server using the Model Context Protocol.
6 | *
7 | * The client supports both raw binary files and Binary Ninja database files (.bndb).
8 | * Binary Ninja database files contain analysis results, annotations, and other information
9 | * that can speed up analysis and provide more accurate results.
10 | */
11 |
12 | import { spawn, ChildProcess } from 'child_process';
13 | import * as readline from 'readline';
14 |
15 | interface McpRequest {
16 | id: number;
17 | method: string;
18 | params?: Record<string, any>;
19 | }
20 |
21 | interface McpResponse {
22 | id: number;
23 | result?: any;
24 | error?: string;
25 | traceback?: string;
26 | }
27 |
28 | class BinaryNinjaClient {
29 | private serverProcess: ChildProcess | null = null;
30 | private rl: readline.Interface | null = null;
31 | private requestId = 1;
32 | private pendingRequests: Map<number, { resolve: Function; reject: Function }> = new Map();
33 |
34 | /**
35 | * Start the Binary Ninja MCP server
36 | */
37 | async start(serverPath: string): Promise<void> {
38 | return new Promise((resolve, reject) => {
39 | try {
40 | this.serverProcess = spawn('python3', [serverPath], {
41 | stdio: ['pipe', 'pipe', 'pipe']
42 | });
43 |
44 | this.rl = readline.createInterface({
45 | input: this.serverProcess.stdout!,
46 | terminal: false
47 | });
48 |
49 | this.rl.on('line', (line) => {
50 | try {
51 | const response = JSON.parse(line) as McpResponse;
52 | const pending = this.pendingRequests.get(response.id);
53 |
54 | if (pending) {
55 | if (response.error) {
56 | pending.reject(new Error(`${response.error}\n${response.traceback || ''}`));
57 | } else {
58 | pending.resolve(response.result);
59 | }
60 | this.pendingRequests.delete(response.id);
61 | }
62 | } catch (err) {
63 | console.error('Error parsing server response:', err);
64 | }
65 | });
66 |
67 | this.serverProcess.stderr!.on('data', (data) => {
68 | console.error(`Server error: ${data.toString()}`);
69 | });
70 |
71 | this.serverProcess.on('close', (code) => {
72 | console.log(`Server process exited with code ${code}`);
73 | this.cleanup();
74 | });
75 |
76 | // Test the connection with a ping
77 | this.sendRequest('ping')
78 | .then(() => resolve())
79 | .catch(reject);
80 | } catch (err) {
81 | reject(err);
82 | }
83 | });
84 | }
85 |
86 | /**
87 | * Send a request to the Binary Ninja MCP server with retry capability
88 | */
89 | async sendRequest(
90 | method: string,
91 | params?: Record<string, any>,
92 | options: { maxRetries?: number, retryDelay?: number } = {}
93 | ): Promise<any> {
94 | if (!this.serverProcess || !this.rl) {
95 | throw new Error('Server not started');
96 | }
97 |
98 | const maxRetries = options.maxRetries ?? 3;
99 | const retryDelay = options.retryDelay ?? 1000;
100 | let lastError: Error | null = null;
101 |
102 | for (let attempt = 0; attempt <= maxRetries; attempt++) {
103 | try {
104 | return await new Promise((resolve, reject) => {
105 | const id = this.requestId++;
106 | const request: McpRequest = { id, method, params };
107 |
108 | // Set a timeout to handle cases where the server doesn't respond
109 | const timeoutId = setTimeout(() => {
110 | if (this.pendingRequests.has(id)) {
111 | this.pendingRequests.delete(id);
112 | reject(new Error(`Request timed out after 30 seconds: ${method}`));
113 | }
114 | }, 30000); // 30 second timeout
115 |
116 | this.pendingRequests.set(id, {
117 | resolve: (value: any) => {
118 | clearTimeout(timeoutId);
119 | resolve(value);
120 | },
121 | reject: (error: any) => {
122 | clearTimeout(timeoutId);
123 | reject(error);
124 | }
125 | });
126 |
127 | this.serverProcess!.stdin!.write(JSON.stringify(request) + '\n');
128 | });
129 | } catch (error: unknown) {
130 | const err = error instanceof Error ? error : new Error(String(error));
131 | lastError = err;
132 |
133 | // If this was the last retry, throw the error
134 | if (attempt === maxRetries) {
135 | throw err;
136 | }
137 |
138 | // Log the retry attempt
139 | console.error(`Request failed (attempt ${attempt + 1}/${maxRetries + 1}): ${err.message}`);
140 | console.error(`Retrying in ${retryDelay}ms...`);
141 |
142 | // Wait before retrying
143 | await new Promise(resolve => setTimeout(resolve, retryDelay));
144 | }
145 | }
146 |
147 | // This should never be reached due to the throw in the loop, but TypeScript doesn't know that
148 | throw lastError || new Error('Unknown error');
149 | }
150 |
151 | /**
152 | * Stop the Binary Ninja MCP server
153 | */
154 | stop(): void {
155 | this.cleanup();
156 | }
157 |
158 | private cleanup(): void {
159 | if (this.rl) {
160 | this.rl.close();
161 | this.rl = null;
162 | }
163 |
164 | if (this.serverProcess) {
165 | this.serverProcess.kill();
166 | this.serverProcess = null;
167 | }
168 |
169 | // Reject any pending requests
170 | for (const [id, { reject }] of this.pendingRequests) {
171 | reject(new Error('Server connection closed'));
172 | this.pendingRequests.delete(id);
173 | }
174 | }
175 |
176 | /**
177 | * Get information about a binary file
178 | */
179 | async getBinaryInfo(path: string): Promise<any> {
180 | return this.sendRequest('get_binary_info', { path });
181 | }
182 |
183 | /**
184 | * List all functions in a binary file
185 | */
186 | async listFunctions(path: string): Promise<string[]> {
187 | return this.sendRequest('list_functions', { path });
188 | }
189 |
190 | /**
191 | * Get detailed information about a specific function
192 | */
193 | async getFunction(path: string, functionName: string): Promise<any> {
194 | return this.sendRequest('get_function', { path, function: functionName });
195 | }
196 |
197 | /**
198 | * Disassemble a function in a binary file
199 | */
200 | async disassembleFunction(path: string, functionName: string): Promise<string[]> {
201 | return this.sendRequest('disassemble_function', { path, function: functionName });
202 | }
203 |
204 | /**
205 | * List all sections/segments in a binary file
206 | */
207 | async listSections(path: string): Promise<any[]> {
208 | return this.sendRequest('list_sections', { path });
209 | }
210 |
211 | /**
212 | * List all imported functions in a binary file
213 | */
214 | async listImports(path: string): Promise<any[]> {
215 | return this.sendRequest('list_imports', { path });
216 | }
217 |
218 | /**
219 | * List all exported symbols in a binary file
220 | */
221 | async listExports(path: string): Promise<any[]> {
222 | return this.sendRequest('list_exports', { path });
223 | }
224 |
225 | /**
226 | * List all C++ namespaces in a binary file
227 | */
228 | async listNamespaces(path: string): Promise<string[]> {
229 | return this.sendRequest('list_namespaces', { path });
230 | }
231 |
232 | /**
233 | * List all defined data variables in a binary file
234 | */
235 | async listData(path: string): Promise<any[]> {
236 | return this.sendRequest('list_data', { path });
237 | }
238 |
239 | /**
240 | * Search for functions by name
241 | */
242 | async searchFunctions(path: string, query: string): Promise<any[]> {
243 | return this.sendRequest('search_functions', { path, query });
244 | }
245 |
246 | /**
247 | * Rename a function
248 | */
249 | async renameFunction(path: string, oldName: string, newName: string): Promise<any> {
250 | return this.sendRequest('rename_function', { path, old_name: oldName, new_name: newName });
251 | }
252 |
253 | /**
254 | * Rename a data variable
255 | */
256 | async renameData(path: string, address: string, newName: string): Promise<any> {
257 | return this.sendRequest('rename_data', { path, address, new_name: newName });
258 | }
259 |
260 | /**
261 | * Get cross-references to a function in a binary file
262 | */
263 | async getXrefs(path: string, functionName: string): Promise<any[]> {
264 | return this.sendRequest('get_xrefs', { path, function: functionName });
265 | }
266 |
267 | /**
268 | * Get the control flow graph of a function in a binary file
269 | */
270 | async getFunctionGraph(path: string, functionName: string): Promise<any[]> {
271 | return this.sendRequest('get_function_graph', { path, function: functionName });
272 | }
273 |
274 | /**
275 | * Get strings from a binary file
276 | */
277 | async getStrings(path: string, minLength: number = 4): Promise<any[]> {
278 | return this.sendRequest('get_strings', { path, min_length: minLength });
279 | }
280 |
281 | /**
282 | * Decompile a function to C code
283 | */
284 | async decompileFunction(path: string, functionName: string): Promise<any> {
285 | return this.sendRequest('decompile_function', { path, function: functionName });
286 | }
287 |
288 | /**
289 | * Extract data structures and types from a binary file
290 | */
291 | async getTypes(path: string): Promise<any[]> {
292 | return this.sendRequest('get_types', { path });
293 | }
294 |
295 | /**
296 | * Generate a header file with function prototypes and type definitions
297 | */
298 | async generateHeader(path: string, outputPath?: string, includeFunctions: boolean = true, includeTypes: boolean = true): Promise<string> {
299 | return this.sendRequest('generate_header', {
300 | path,
301 | output_path: outputPath,
302 | include_functions: includeFunctions,
303 | include_types: includeTypes
304 | });
305 | }
306 |
307 | /**
308 | * Generate a source file with function implementations
309 | */
310 | async generateSource(path: string, outputPath?: string, headerPath: string = 'generated_header.h'): Promise<string> {
311 | return this.sendRequest('generate_source', {
312 | path,
313 | output_path: outputPath,
314 | header_path: headerPath
315 | });
316 | }
317 |
318 | /**
319 | * Rebuild an entire driver module from a binary file
320 | */
321 | async rebuildDriver(path: string, outputDir: string): Promise<any> {
322 | return this.sendRequest('rebuild_driver', {
323 | path,
324 | output_dir: outputDir
325 | });
326 | }
327 |
328 | /**
329 | * Analyze a binary file and generate a comprehensive report
330 | */
331 | async analyzeFile(path: string, outputPath?: string): Promise<any> {
332 | // This is a higher-level function that combines multiple API calls
333 | // to generate a comprehensive analysis report
334 | const report: any = {
335 | file_info: await this.getBinaryInfo(path),
336 | sections: await this.listSections(path),
337 | functions: [],
338 | imports: await this.listImports(path),
339 | exports: await this.listExports(path),
340 | namespaces: await this.listNamespaces(path),
341 | data_variables: await this.listData(path),
342 | timestamp: new Date().toISOString()
343 | };
344 |
345 | // Get detailed information for the first 10 functions
346 | const functionNames = await this.listFunctions(path);
347 | report.function_count = functionNames.length;
348 |
349 | const sampleFunctions = functionNames.slice(0, 10);
350 | for (const funcName of sampleFunctions) {
351 | try {
352 | const funcInfo = await this.getFunction(path, funcName);
353 | const decompiled = await this.decompileFunction(path, funcName);
354 | report.functions.push({
355 | ...funcInfo,
356 | decompiled: decompiled
357 | });
358 | } catch (err) {
359 | console.error(`Error analyzing function ${funcName}: ${err}`);
360 | }
361 | }
362 |
363 | // Save the report to a file if outputPath is provided
364 | if (outputPath) {
365 | const fs = require('fs');
366 | const reportJson = JSON.stringify(report, null, 2);
367 | fs.writeFileSync(outputPath, reportJson);
368 | }
369 |
370 | return report;
371 | }
372 |
373 | /**
374 | * Find potential vulnerabilities in a binary file
375 | */
376 | async findVulnerabilities(path: string): Promise<any[]> {
377 | // This is a higher-level function that analyzes the binary for potential vulnerabilities
378 | const vulnerabilities: any[] = [];
379 |
380 | try {
381 | // Get all functions
382 | const functionNames = await this.listFunctions(path);
383 |
384 | // Look for potentially vulnerable functions
385 | const dangerousFunctions = [
386 | 'strcpy', 'strcat', 'sprintf', 'gets', 'memcpy', 'system',
387 | 'exec', 'popen', 'scanf', 'malloc', 'free', 'realloc'
388 | ];
389 |
390 | // Search for each dangerous function
391 | for (const dangerFunc of dangerousFunctions) {
392 | try {
393 | const matches = await this.searchFunctions(path, dangerFunc);
394 |
395 | for (const match of matches) {
396 | // Get more details about the function
397 | const decompiled = await this.decompileFunction(path, match.name);
398 |
399 | vulnerabilities.push({
400 | type: 'dangerous_function',
401 | function_name: match.name,
402 | dangerous_call: dangerFunc,
403 | address: match.address,
404 | decompiled: decompiled
405 | });
406 | }
407 | } catch (err) {
408 | console.error(`Error searching for ${dangerFunc}: ${err}`);
409 | }
410 | }
411 |
412 | // Look for string format vulnerabilities
413 | try {
414 | const printfMatches = await this.searchFunctions(path, 'printf');
415 | for (const match of printfMatches) {
416 | const decompiled = await this.decompileFunction(path, match.name);
417 |
418 | // Simple heuristic: if printf is called with a variable as first argument
419 | if (decompiled && decompiled.includes('printf(') && !decompiled.includes('printf("%')) {
420 | vulnerabilities.push({
421 | type: 'format_string',
422 | function_name: match.name,
423 | address: match.address,
424 | decompiled: decompiled
425 | });
426 | }
427 | }
428 | } catch (err) {
429 | console.error(`Error analyzing format string vulnerabilities: ${err}`);
430 | }
431 | } catch (err) {
432 | console.error(`Error finding vulnerabilities: ${err}`);
433 | }
434 |
435 | return vulnerabilities;
436 | }
437 |
438 | /**
439 | * Compare two binary files and identify differences
440 | */
441 | async compareBinaries(path1: string, path2: string): Promise<any> {
442 | // This is a higher-level function that compares two binaries
443 | const comparison: any = {
444 | file1: await this.getBinaryInfo(path1),
445 | file2: await this.getBinaryInfo(path2),
446 | differences: {
447 | functions: {
448 | only_in_file1: [],
449 | only_in_file2: [],
450 | modified: []
451 | },
452 | sections: {
453 | only_in_file1: [],
454 | only_in_file2: [],
455 | modified: []
456 | }
457 | }
458 | };
459 |
460 | // Compare functions
461 | const functions1 = await this.listFunctions(path1);
462 | const functions2 = await this.listFunctions(path2);
463 |
464 | // Find functions only in file1
465 | for (const func of functions1) {
466 | if (!functions2.includes(func)) {
467 | comparison.differences.functions.only_in_file1.push(func);
468 | }
469 | }
470 |
471 | // Find functions only in file2
472 | for (const func of functions2) {
473 | if (!functions1.includes(func)) {
474 | comparison.differences.functions.only_in_file2.push(func);
475 | }
476 | }
477 |
478 | // Compare common functions
479 | const commonFunctions = functions1.filter(f => functions2.includes(f));
480 | for (const func of commonFunctions) {
481 | try {
482 | const decompiled1 = await this.decompileFunction(path1, func);
483 | const decompiled2 = await this.decompileFunction(path2, func);
484 |
485 | if (decompiled1 !== decompiled2) {
486 | comparison.differences.functions.modified.push({
487 | name: func,
488 | file1_code: decompiled1,
489 | file2_code: decompiled2
490 | });
491 | }
492 | } catch (err) {
493 | console.error(`Error comparing function ${func}: ${err}`);
494 | }
495 | }
496 |
497 | // Compare sections
498 | const sections1 = await this.listSections(path1);
499 | const sections2 = await this.listSections(path2);
500 |
501 | const sectionNames1 = sections1.map(s => s.name);
502 | const sectionNames2 = sections2.map(s => s.name);
503 |
504 | // Find sections only in file1
505 | for (const section of sections1) {
506 | if (!sectionNames2.includes(section.name)) {
507 | comparison.differences.sections.only_in_file1.push(section);
508 | }
509 | }
510 |
511 | // Find sections only in file2
512 | for (const section of sections2) {
513 | if (!sectionNames1.includes(section.name)) {
514 | comparison.differences.sections.only_in_file2.push(section);
515 | }
516 | }
517 |
518 | // Compare common sections
519 | const commonSectionNames = sectionNames1.filter(s => sectionNames2.includes(s));
520 | for (const sectionName of commonSectionNames) {
521 | const section1 = sections1.find(s => s.name === sectionName);
522 | const section2 = sections2.find(s => s.name === sectionName);
523 |
524 | if (section1.size !== section2.size || section1.start !== section2.start) {
525 | comparison.differences.sections.modified.push({
526 | name: sectionName,
527 | file1_section: section1,
528 | file2_section: section2
529 | });
530 | }
531 | }
532 |
533 | return comparison;
534 | }
535 | }
536 |
537 | // Example usage
538 | async function main() {
539 | if (process.argv.length < 3) {
540 | console.error('Usage: ts-node client.ts <path_to_binary>');
541 | process.exit(1);
542 | }
543 |
544 | const binaryPath = process.argv[2];
545 | const client = new BinaryNinjaClient();
546 |
547 | try {
548 | // Start the server
549 | await client.start('/home/matteius/Documents/Cline/MCP/bn-mcp/binaryninja_server.py');
550 | console.log('Connected to Binary Ninja MCP server');
551 |
552 | // Get binary information
553 | console.log('\n=== Binary Information ===');
554 | const info = await client.getBinaryInfo(binaryPath);
555 | console.log(`Filename: ${info.filename}`);
556 | console.log(`Architecture: ${info.architecture}`);
557 | console.log(`Platform: ${info.platform}`);
558 | console.log(`Entry Point: ${info.entry_point}`);
559 | console.log(`File Size: ${info.file_size} bytes`);
560 | console.log(`Executable: ${info.is_executable}`);
561 | console.log(`Relocatable: ${info.is_relocatable}`);
562 | console.log(`Address Size: ${info.address_size} bits`);
563 |
564 | // List sections
565 | console.log('\n=== Sections ===');
566 | const sections = await client.listSections(binaryPath);
567 | for (const section of sections) {
568 | console.log(`${section.name}: ${section.start} - ${section.end} (${section.size} bytes) [${section.semantics}]`);
569 | }
570 |
571 | // List functions
572 | console.log('\n=== Functions ===');
573 | const functions = await client.listFunctions(binaryPath);
574 | for (let i = 0; i < Math.min(functions.length, 10); i++) {
575 | console.log(`${i+1}. ${functions[i]}`);
576 | }
577 | if (functions.length > 10) {
578 | console.log(`... and ${functions.length - 10} more functions`);
579 | }
580 |
581 | // If there are functions, disassemble the first one
582 | if (functions.length > 0) {
583 | const funcName = functions[0];
584 | console.log(`\n=== Disassembly of '${funcName}' ===`);
585 | const disasm = await client.disassembleFunction(binaryPath, funcName);
586 | for (let i = 0; i < disasm.length; i++) {
587 | console.log(`${i+1}. ${disasm[i]}`);
588 | }
589 |
590 | // Get cross-references to this function
591 | console.log(`\n=== Cross-references to '${funcName}' ===`);
592 | const xrefs = await client.getXrefs(binaryPath, funcName);
593 | if (xrefs.length > 0) {
594 | for (const xref of xrefs) {
595 | console.log(`From: ${xref.from_function} at ${xref.from_address} to ${xref.to_address}`);
596 | }
597 | } else {
598 | console.log('No cross-references found');
599 | }
600 | }
601 |
602 | // Get strings
603 | console.log('\n=== Strings ===');
604 | const strings = await client.getStrings(binaryPath, 5);
605 | for (let i = 0; i < Math.min(strings.length, 10); i++) {
606 | console.log(`${i+1}. ${strings[i].address}: '${strings[i].value}'`);
607 | }
608 | if (strings.length > 10) {
609 | console.log(`... and ${strings.length - 10} more strings`);
610 | }
611 |
612 | // Source Code Reconstruction
613 | if (process.argv.length > 3) {
614 | const outputDir = process.argv[3];
615 | const fs = require('fs');
616 | const path = require('path');
617 |
618 | // Create output directory
619 | if (!fs.existsSync(outputDir)) {
620 | fs.mkdirSync(outputDir, { recursive: true });
621 | }
622 |
623 | // Decompile the first function
624 | if (functions.length > 0) {
625 | const funcName = functions[0];
626 | console.log(`\n=== Decompiled C Code for '${funcName}' ===`);
627 | try {
628 | const decompiled = await client.decompileFunction(binaryPath, funcName);
629 | console.log(`Function: ${decompiled.name}`);
630 | console.log(`Signature: ${decompiled.signature}`);
631 | console.log(`Address: ${decompiled.address}`);
632 | console.log("\nDecompiled Code:");
633 | console.log(decompiled.decompiled_code);
634 |
635 | // Save decompiled code to file
636 | const decompilePath = path.join(outputDir, `${funcName}.c`);
637 | fs.writeFileSync(decompilePath,
638 | `// Decompiled function: ${funcName}\n` +
639 | `// Address: ${decompiled.address}\n\n` +
640 | decompiled.decompiled_code
641 | );
642 | console.log(`Saved decompiled code to ${decompilePath}`);
643 | } catch (err) {
644 | console.error(`Error decompiling function: ${err}`);
645 | }
646 | }
647 |
648 | // Extract types
649 | console.log("\n=== Data Types ===");
650 | try {
651 | const types = await client.getTypes(binaryPath);
652 | console.log(`Found ${types.length} types`);
653 |
654 | // Show first 5 types
655 | for (let i = 0; i < Math.min(types.length, 5); i++) {
656 | const type = types[i];
657 | console.log(`\n${i+1}. ${type.name} (${type.type_class})`);
658 | if (type.type_class === 'structure') {
659 | console.log(` Size: ${type.size} bytes`);
660 | console.log(" Members:");
661 | for (const member of type.members) {
662 | console.log(` - ${member.name}: ${member.type} (offset: ${member.offset})`);
663 | }
664 | }
665 | }
666 |
667 | if (types.length > 5) {
668 | console.log(`... and ${types.length - 5} more types`);
669 | }
670 |
671 | // Save types to file
672 | const typesPath = path.join(outputDir, "types.json");
673 | fs.writeFileSync(typesPath, JSON.stringify(types, null, 2));
674 | console.log(`Saved types to ${typesPath}`);
675 | } catch (err) {
676 | console.error(`Error getting types: ${err}`);
677 | }
678 |
679 | // Generate header file
680 | console.log("\n=== Generated Header File ===");
681 | try {
682 | const headerPath = path.join(outputDir, "generated_header.h");
683 | const headerContent = await client.generateHeader(binaryPath, headerPath);
684 | console.log(`Generated header file saved to ${headerPath}`);
685 | console.log("\nFirst 10 lines:");
686 | const headerLines = headerContent.split("\n");
687 | for (let i = 0; i < Math.min(headerLines.length, 10); i++) {
688 | console.log(headerLines[i]);
689 | }
690 | console.log("...");
691 | } catch (err) {
692 | console.error(`Error generating header: ${err}`);
693 | }
694 |
695 | // Generate source file
696 | console.log("\n=== Generated Source File ===");
697 | try {
698 | const sourcePath = path.join(outputDir, "generated_source.c");
699 | const sourceContent = await client.generateSource(binaryPath, sourcePath, "generated_header.h");
700 | console.log(`Generated source file saved to ${sourcePath}`);
701 | console.log("\nFirst 10 lines:");
702 | const sourceLines = sourceContent.split("\n");
703 | for (let i = 0; i < Math.min(sourceLines.length, 10); i++) {
704 | console.log(sourceLines[i]);
705 | }
706 | console.log("...");
707 | } catch (err) {
708 | console.error(`Error generating source: ${err}`);
709 | }
710 |
711 | // Rebuild driver (if it's a driver module)
712 | if (binaryPath.endsWith(".ko") || binaryPath.toLowerCase().includes("driver") || binaryPath.toLowerCase().includes("module")) {
713 | console.log("\n=== Rebuilding Driver Module ===");
714 | try {
715 | const driverDir = path.join(outputDir, "driver");
716 | const result = await client.rebuildDriver(binaryPath, driverDir);
717 | console.log("Driver module rebuilt successfully!");
718 | console.log(`Header file: ${result.header_file}`);
719 | console.log(`Source files: ${result.source_files.length} files generated`);
720 | console.log(`Makefile: ${result.makefile}`);
721 | console.log(`\nTo build the driver, run:`);
722 | console.log(`cd ${driverDir} && make`);
723 | } catch (err) {
724 | console.error(`Error rebuilding driver: ${err}`);
725 | }
726 | }
727 | } else {
728 | console.log("\nTo see source code reconstruction examples, provide an output directory:");
729 | console.log(`ts-node client.ts ${binaryPath} /path/to/output/dir`);
730 | }
731 | } catch (err) {
732 | console.error('Error:', err);
733 | } finally {
734 | // Stop the server
735 | client.stop();
736 | }
737 | }
738 |
739 | // Run the example if this file is executed directly
740 | if (require.main === module) {
741 | main().catch(console.error);
742 | }
743 |
744 | export { BinaryNinjaClient };
745 |
```
--------------------------------------------------------------------------------
/binaryninja_mcp_http_server.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | import sys
3 | import json
4 | import traceback
5 | import os
6 | import logging
7 | import argparse
8 | from http.server import HTTPServer, BaseHTTPRequestHandler
9 | from urllib.parse import urlparse
10 | import time
11 | from socketserver import ThreadingMixIn
12 | import requests
13 |
14 | # Assuming this is your BinaryNinja client implementation
15 | # You would need to adjust this import to match your actual implementation
16 | from binaryninja_http_client import BinaryNinjaHTTPClient
17 |
18 | logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
19 | logger = logging.getLogger("BinaryNinjaMCP")
20 |
21 | # Define the MCP tools - note the use of "path" instead of "file"
22 | MCP_TOOLS = [
23 | {
24 | "name": "get_binary_info",
25 | "description": "Get binary metadata",
26 | "streaming": False,
27 | "inputSchema": {
28 | "type": "object",
29 | "properties": {"path": {"type": "string"}},
30 | "required": ["path"]
31 | }
32 | },
33 | {
34 | "name": "list_functions",
35 | "description": "List functions",
36 | "streaming": False,
37 | "inputSchema": {
38 | "type": "object",
39 | "properties": {"path": {"type": "string"}},
40 | "required": ["path"]
41 | }
42 | },
43 | {
44 | "name": "get_function",
45 | "description": "Get information about a specific function",
46 | "streaming": False,
47 | "inputSchema": {
48 | "type": "object",
49 | "properties": {
50 | "path": {"type": "string"},
51 | "function": {"type": "string"}
52 | },
53 | "required": ["path", "function"]
54 | }
55 | },
56 | {
57 | "name": "disassemble_function",
58 | "description": "Disassemble function",
59 | "streaming": False,
60 | "inputSchema": {
61 | "type": "object",
62 | "properties": {
63 | "path": {"type": "string"},
64 | "function": {"type": "string"}
65 | },
66 | "required": ["path", "function"]
67 | }
68 | },
69 | {
70 | "name": "decompile_function",
71 | "description": "Decompile to C",
72 | "streaming": False,
73 | "inputSchema": {
74 | "type": "object",
75 | "properties": {
76 | "path": {"type": "string"},
77 | "function": {"type": "string"}
78 | },
79 | "required": ["path", "function"]
80 | }
81 | },
82 | {
83 | "name": "list_sections",
84 | "description": "List sections/segments in the binary",
85 | "streaming": False,
86 | "inputSchema": {
87 | "type": "object",
88 | "properties": {"path": {"type": "string"}},
89 | "required": ["path"]
90 | }
91 | },
92 | {
93 | "name": "list_imports",
94 | "description": "List imported functions",
95 | "streaming": False,
96 | "inputSchema": {
97 | "type": "object",
98 | "properties": {"path": {"type": "string"}},
99 | "required": ["path"]
100 | }
101 | },
102 | {
103 | "name": "list_exports",
104 | "description": "List exported symbols",
105 | "streaming": False,
106 | "inputSchema": {
107 | "type": "object",
108 | "properties": {"path": {"type": "string"}},
109 | "required": ["path"]
110 | }
111 | },
112 | {
113 | "name": "list_namespaces",
114 | "description": "List C++ namespaces",
115 | "streaming": False,
116 | "inputSchema": {
117 | "type": "object",
118 | "properties": {"path": {"type": "string"}},
119 | "required": ["path"]
120 | }
121 | },
122 | {
123 | "name": "list_data",
124 | "description": "List defined data variables",
125 | "streaming": False,
126 | "inputSchema": {
127 | "type": "object",
128 | "properties": {"path": {"type": "string"}},
129 | "required": ["path"]
130 | }
131 | },
132 | {
133 | "name": "search_functions",
134 | "description": "Search functions by name",
135 | "streaming": False,
136 | "inputSchema": {
137 | "type": "object",
138 | "properties": {
139 | "path": {"type": "string"},
140 | "query": {"type": "string"}
141 | },
142 | "required": ["path", "query"]
143 | }
144 | },
145 | {
146 | "name": "rename_function",
147 | "description": "Rename a function",
148 | "streaming": False,
149 | "inputSchema": {
150 | "type": "object",
151 | "properties": {
152 | "path": {"type": "string"},
153 | "old_name": {"type": "string"},
154 | "new_name": {"type": "string"}
155 | },
156 | "required": ["path", "old_name", "new_name"]
157 | }
158 | },
159 | {
160 | "name": "rename_data",
161 | "description": "Rename a data variable",
162 | "streaming": False,
163 | "inputSchema": {
164 | "type": "object",
165 | "properties": {
166 | "path": {"type": "string"},
167 | "address": {"type": "string"},
168 | "new_name": {"type": "string"}
169 | },
170 | "required": ["path", "address", "new_name"]
171 | }
172 | }
173 | ]
174 |
175 | class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
176 | daemon_threads = True
177 |
178 | class BinaryNinjaMCPHandler(BaseHTTPRequestHandler):
179 | def __init__(self, *args, **kwargs):
180 | self.client = BinaryNinjaHTTPClient()
181 | super().__init__(*args, **kwargs)
182 |
183 | def _set_headers(self, content_type='application/json', status_code=200):
184 | self.send_response(status_code)
185 | self.send_header('Content-Type', content_type)
186 | self.send_header('Access-Control-Allow-Origin', '*')
187 | self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
188 | self.send_header('Access-Control-Allow-Headers', 'Content-Type')
189 | self.end_headers()
190 |
191 | def do_OPTIONS(self):
192 | self._set_headers()
193 |
194 | def log_message(self, format, *args):
195 | logger.info(format % args)
196 |
197 | def do_GET(self):
198 | logger.debug(f"GET request received: {self.path}")
199 | logger.debug(f"Headers: {dict(self.headers)}")
200 |
201 | parsed = urlparse(self.path)
202 | if parsed.path == '/':
203 | if 'text/event-stream' in self.headers.get('Accept', ''):
204 | self.send_response(200)
205 | self.send_header('Content-Type', 'text/event-stream')
206 | self.send_header('Cache-Control', 'no-cache')
207 | self.send_header('Connection', 'keep-alive')
208 | self.send_header('Access-Control-Allow-Origin', '*')
209 | self.end_headers()
210 | try:
211 | logger.debug("Starting SSE connection")
212 | self.wfile.write(b"event: connected\ndata: {\"status\": \"ready\"}\n\n")
213 | self.wfile.flush()
214 |
215 | # Use a shorter heartbeat interval
216 | heartbeat_interval = 10 # seconds
217 |
218 | while True:
219 | heartbeat = {
220 | "jsonrpc": "2.0",
221 | "method": "heartbeat",
222 | "params": {"timestamp": int(time.time())}
223 | }
224 | msg = f"event: mcp-event\ndata: {json.dumps(heartbeat)}\n\n"
225 | logger.debug(f"Sending heartbeat: {msg}")
226 | self.wfile.write(msg.encode())
227 | self.wfile.flush()
228 | time.sleep(heartbeat_interval)
229 | except Exception as e:
230 | logger.warning(f"SSE error: {e}")
231 | else:
232 | response = {
233 | "jsonrpc": "2.0",
234 | "id": "root-list",
235 | "result": {
236 | "name": "binaryninja-mcp",
237 | "version": "0.1.0",
238 | "tools": MCP_TOOLS
239 | }
240 | }
241 | self._set_headers()
242 | response_str = json.dumps(response)
243 | logger.debug(f"Returning tool list: {response_str[:100]}...")
244 | self.wfile.write(response_str.encode())
245 | elif parsed.path == '/ping':
246 | self._set_headers()
247 | self.wfile.write(json.dumps({"status": "ok"}).encode())
248 | else:
249 | self._set_headers(status_code=404)
250 | self.wfile.write(json.dumps({"error": "Not found"}).encode())
251 |
252 | def do_POST(self):
253 | try:
254 | content_length = int(self.headers.get('Content-Length', 0))
255 | raw_data = self.rfile.read(content_length).decode('utf-8')
256 |
257 | logger.debug(f"POST received: {raw_data[:200]}...")
258 |
259 | request = json.loads(raw_data)
260 | response = self._handle_mcp_request(request)
261 |
262 | self._set_headers()
263 | response_str = json.dumps(response)
264 | logger.debug(f"Responding with: {response_str[:200]}...")
265 | self.wfile.write(response_str.encode())
266 | except Exception as e:
267 | logger.error(f"POST error: {e}")
268 | logger.error(traceback.format_exc())
269 | self._set_headers(status_code=500)
270 | self.wfile.write(json.dumps({
271 | "jsonrpc": "2.0",
272 | "id": None,
273 | "error": {"code": -32603, "message": str(e)}
274 | }).encode())
275 |
276 | def _wrap_result(self, request_id, text):
277 | return {
278 | "jsonrpc": "2.0",
279 | "id": request_id,
280 | "result": {
281 | "content": [{"type": "text", "text": text}],
282 | "isError": False
283 | }
284 | }
285 |
286 | def _handle_mcp_request(self, request):
287 | request_id = request.get("id")
288 | method = request.get("method")
289 | params = request.get("params", {})
290 |
291 | logger.debug(f"Handling MCP request: id={request_id}, method={method}, params={params}")
292 |
293 | try:
294 | if method == "list_tools":
295 | return {
296 | "jsonrpc": "2.0", "id": request_id,
297 | "result": {"tools": MCP_TOOLS}
298 | }
299 | elif method == "call_tool":
300 | name = params.get("name")
301 | args = params.get("arguments", {})
302 | return self._handle_mcp_request({"jsonrpc": "2.0", "id": request_id, "method": name, "params": args})
303 |
304 | elif method == "get_binary_info":
305 | path = params.get("path")
306 | if not path:
307 | logger.error("Missing 'path' parameter")
308 | return self._error_response(request_id, -32602, "Missing 'path' parameter")
309 | if not isinstance(path, str):
310 | logger.error(f"Invalid path type: {type(path)}")
311 | return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
312 |
313 | logger.debug(f"Getting info for file: {path}")
314 | info = self.client.get_file_info(path)
315 | return self._wrap_result(request_id, json.dumps(info, indent=2))
316 |
317 | elif method == "list_functions":
318 | path = params.get("path")
319 | if not path:
320 | logger.error("Missing 'path' parameter")
321 | return self._error_response(request_id, -32602, "Missing 'path' parameter")
322 | if not isinstance(path, str):
323 | logger.error(f"Invalid path type: {type(path)}")
324 | return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
325 |
326 | logger.debug(f"Listing functions for file: {path}")
327 | funcs = self.client.list_functions(path)
328 | return self._wrap_result(request_id, json.dumps([f["name"] for f in funcs], indent=2))
329 |
330 | elif method == "disassemble_function":
331 | path = params.get("path")
332 | func = params.get("function")
333 | if not path:
334 | logger.error("Missing 'path' parameter")
335 | return self._error_response(request_id, -32602, "Missing 'path' parameter")
336 | if not func:
337 | logger.error("Missing 'function' parameter")
338 | return self._error_response(request_id, -32602, "Missing 'function' parameter")
339 | if not isinstance(path, str):
340 | logger.error(f"Invalid path type: {type(path)}")
341 | return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
342 | if not isinstance(func, str):
343 | logger.error(f"Invalid function type: {type(func)}")
344 | return self._error_response(request_id, -32602, "Parameter 'function' must be a string")
345 |
346 | logger.debug(f"Disassembling function {func} in file: {path}")
347 | code = self.client.get_disassembly(path, function_name=func)
348 | return self._wrap_result(request_id, "\n".join(code))
349 |
350 | elif method == "decompile_function":
351 | path = params.get("path")
352 | func = params.get("function")
353 | if not path:
354 | logger.error("Missing 'path' parameter")
355 | return self._error_response(request_id, -32602, "Missing 'path' parameter")
356 | if not func:
357 | logger.error("Missing 'function' parameter")
358 | return self._error_response(request_id, -32602, "Missing 'function' parameter")
359 | if not isinstance(path, str):
360 | logger.error(f"Invalid path type: {type(path)}")
361 | return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
362 | if not isinstance(func, str):
363 | logger.error(f"Invalid function type: {type(func)}")
364 | return self._error_response(request_id, -32602, "Parameter 'function' must be a string")
365 |
366 | logger.debug(f"Decompiling function {func} in file: {path}")
367 | hlil = self.client.get_hlil(path, function_name=func)
368 | return self._wrap_result(request_id, "\n".join(hlil) if isinstance(hlil, list) else str(hlil))
369 |
370 | elif method == "get_function":
371 | path = params.get("path")
372 | func = params.get("function")
373 | if not path:
374 | logger.error("Missing 'path' parameter")
375 | return self._error_response(request_id, -32602, "Missing 'path' parameter")
376 | if not func:
377 | logger.error("Missing 'function' parameter")
378 | return self._error_response(request_id, -32602, "Missing 'function' parameter")
379 | if not isinstance(path, str):
380 | logger.error(f"Invalid path type: {type(path)}")
381 | return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
382 | if not isinstance(func, str):
383 | logger.error(f"Invalid function type: {type(func)}")
384 | return self._error_response(request_id, -32602, "Parameter 'function' must be a string")
385 |
386 | logger.debug(f"Getting function info for {func} in file: {path}")
387 | func_info = self.client.get_function(path, function_name=func)
388 | if func_info:
389 | return self._wrap_result(request_id, json.dumps(func_info, indent=2))
390 | else:
391 | return self._error_response(request_id, -32602, f"Function '{func}' not found")
392 |
393 | elif method == "list_sections":
394 | path = params.get("path")
395 | if not path:
396 | logger.error("Missing 'path' parameter")
397 | return self._error_response(request_id, -32602, "Missing 'path' parameter")
398 | if not isinstance(path, str):
399 | logger.error(f"Invalid path type: {type(path)}")
400 | return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
401 |
402 | logger.debug(f"Listing sections for file: {path}")
403 | sections = self.client.get_sections(path)
404 | return self._wrap_result(request_id, json.dumps(sections, indent=2))
405 |
406 | elif method == "list_imports":
407 | path = params.get("path")
408 | if not path:
409 | logger.error("Missing 'path' parameter")
410 | return self._error_response(request_id, -32602, "Missing 'path' parameter")
411 | if not isinstance(path, str):
412 | logger.error(f"Invalid path type: {type(path)}")
413 | return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
414 |
415 | logger.debug(f"Listing imports for file: {path}")
416 | imports = self.client.get_imports()
417 | return self._wrap_result(request_id, json.dumps(imports, indent=2))
418 |
419 | elif method == "list_exports":
420 | path = params.get("path")
421 | if not path:
422 | logger.error("Missing 'path' parameter")
423 | return self._error_response(request_id, -32602, "Missing 'path' parameter")
424 | if not isinstance(path, str):
425 | logger.error(f"Invalid path type: {type(path)}")
426 | return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
427 |
428 | logger.debug(f"Listing exports for file: {path}")
429 | exports = self.client.get_exports()
430 | return self._wrap_result(request_id, json.dumps(exports, indent=2))
431 |
432 | elif method == "list_namespaces":
433 | path = params.get("path")
434 | if not path:
435 | logger.error("Missing 'path' parameter")
436 | return self._error_response(request_id, -32602, "Missing 'path' parameter")
437 | if not isinstance(path, str):
438 | logger.error(f"Invalid path type: {type(path)}")
439 | return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
440 |
441 | logger.debug(f"Listing namespaces for file: {path}")
442 | namespaces = self.client.get_namespaces()
443 | return self._wrap_result(request_id, json.dumps(namespaces, indent=2))
444 |
445 | elif method == "list_data":
446 | path = params.get("path")
447 | if not path:
448 | logger.error("Missing 'path' parameter")
449 | return self._error_response(request_id, -32602, "Missing 'path' parameter")
450 | if not isinstance(path, str):
451 | logger.error(f"Invalid path type: {type(path)}")
452 | return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
453 |
454 | logger.debug(f"Listing data variables for file: {path}")
455 | data_items = self.client.get_defined_data()
456 | return self._wrap_result(request_id, json.dumps(data_items, indent=2))
457 |
458 | elif method == "search_functions":
459 | path = params.get("path")
460 | query = params.get("query")
461 | if not path:
462 | logger.error("Missing 'path' parameter")
463 | return self._error_response(request_id, -32602, "Missing 'path' parameter")
464 | if not query:
465 | logger.error("Missing 'query' parameter")
466 | return self._error_response(request_id, -32602, "Missing 'query' parameter")
467 | if not isinstance(path, str):
468 | logger.error(f"Invalid path type: {type(path)}")
469 | return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
470 | if not isinstance(query, str):
471 | logger.error(f"Invalid query type: {type(query)}")
472 | return self._error_response(request_id, -32602, "Parameter 'query' must be a string")
473 |
474 | logger.debug(f"Searching functions with query '{query}' in file: {path}")
475 | matches = self.client.search_functions(query)
476 | return self._wrap_result(request_id, json.dumps(matches, indent=2))
477 |
478 | elif method == "rename_function":
479 | path = params.get("path")
480 | old_name = params.get("old_name")
481 | new_name = params.get("new_name")
482 | if not path:
483 | logger.error("Missing 'path' parameter")
484 | return self._error_response(request_id, -32602, "Missing 'path' parameter")
485 | if not old_name:
486 | logger.error("Missing 'old_name' parameter")
487 | return self._error_response(request_id, -32602, "Missing 'old_name' parameter")
488 | if not new_name:
489 | logger.error("Missing 'new_name' parameter")
490 | return self._error_response(request_id, -32602, "Missing 'new_name' parameter")
491 | if not isinstance(path, str):
492 | logger.error(f"Invalid path type: {type(path)}")
493 | return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
494 | if not isinstance(old_name, str):
495 | logger.error(f"Invalid old_name type: {type(old_name)}")
496 | return self._error_response(request_id, -32602, "Parameter 'old_name' must be a string")
497 | if not isinstance(new_name, str):
498 | logger.error(f"Invalid new_name type: {type(new_name)}")
499 | return self._error_response(request_id, -32602, "Parameter 'new_name' must be a string")
500 |
501 | logger.debug(f"Renaming function from '{old_name}' to '{new_name}' in file: {path}")
502 | success = self.client.rename_function(old_name, new_name)
503 | if success:
504 | return self._wrap_result(request_id, json.dumps({"success": True, "message": f"Function renamed from '{old_name}' to '{new_name}'"}, indent=2))
505 | else:
506 | return self._error_response(request_id, -32602, f"Failed to rename function '{old_name}' to '{new_name}'")
507 |
508 | elif method == "rename_data":
509 | path = params.get("path")
510 | address = params.get("address")
511 | new_name = params.get("new_name")
512 | if not path:
513 | logger.error("Missing 'path' parameter")
514 | return self._error_response(request_id, -32602, "Missing 'path' parameter")
515 | if not address:
516 | logger.error("Missing 'address' parameter")
517 | return self._error_response(request_id, -32602, "Missing 'address' parameter")
518 | if not new_name:
519 | logger.error("Missing 'new_name' parameter")
520 | return self._error_response(request_id, -32602, "Missing 'new_name' parameter")
521 | if not isinstance(path, str):
522 | logger.error(f"Invalid path type: {type(path)}")
523 | return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
524 | if not isinstance(address, str):
525 | logger.error(f"Invalid address type: {type(address)}")
526 | return self._error_response(request_id, -32602, "Parameter 'address' must be a string")
527 | if not isinstance(new_name, str):
528 | logger.error(f"Invalid new_name type: {type(new_name)}")
529 | return self._error_response(request_id, -32602, "Parameter 'new_name' must be a string")
530 |
531 | logger.debug(f"Renaming data at address '{address}' to '{new_name}' in file: {path}")
532 | success = self.client.rename_data(address, new_name)
533 | if success:
534 | return self._wrap_result(request_id, json.dumps({"success": True, "message": f"Data at address '{address}' renamed to '{new_name}'"}, indent=2))
535 | else:
536 | return self._error_response(request_id, -32602, f"Failed to rename data at address '{address}' to '{new_name}'")
537 |
538 | elif method == "cancel":
539 | logger.debug("Cancel requested — not implemented.")
540 | return self._error_response(request_id, -32601, "Cancel not implemented")
541 |
542 | logger.error(f"Unknown method: {method}")
543 | return self._error_response(request_id, -32601, f"Unknown method: {method}")
544 |
545 | except Exception as e:
546 | logger.error(f"Error in MCP handler: {e}\n{traceback.format_exc()}")
547 | return self._error_response(request_id, -32603, str(e))
548 |
549 | def _error_response(self, request_id, code, message):
550 | return {
551 | "jsonrpc": "2.0",
552 | "id": request_id,
553 | "error": {
554 | "code": code,
555 | "message": message
556 | }
557 | }
558 |
559 | def run_server(host='127.0.0.1', port=8088):
560 | server = ThreadedHTTPServer((host, port), BinaryNinjaMCPHandler)
561 | logger.info(f"Binary Ninja MCP HTTP server running at http://{host}:{port}")
562 | server.serve_forever()
563 |
564 | def main():
565 | parser = argparse.ArgumentParser()
566 | parser.add_argument('--host', default='127.0.0.1')
567 | parser.add_argument('--port', type=int, default=8088)
568 | args = parser.parse_args()
569 | run_server(args.host, args.port)
570 |
571 | if __name__ == '__main__':
572 | main()
573 |
```
--------------------------------------------------------------------------------
/binaryninja_server.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Binary Ninja MCP Server
4 |
5 | This is the main entry point for the Binary Ninja MCP server.
6 | It integrates the HTTP server and client components to provide a complete MCP server implementation.
7 | """
8 |
9 | import sys
10 | import json
11 | import traceback
12 | import os
13 | import logging
14 | from binaryninja_http_client import BinaryNinjaHTTPClient
15 |
16 | # Configure logging
17 | logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
18 | logger = logging.getLogger('BinaryNinjaMCPServer')
19 |
20 | # Create a file handler to log to a file
21 | file_handler = logging.FileHandler('/tmp/binaryninja_mcp_server.log')
22 | file_handler.setLevel(logging.DEBUG)
23 | file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
24 | logger.addHandler(file_handler)
25 |
26 | def read_json():
27 | """Read a JSON object from stdin."""
28 | line = sys.stdin.readline()
29 | if not line:
30 | sys.exit(0)
31 | return json.loads(line)
32 |
33 | def write_json(response):
34 | """Write a JSON object to stdout."""
35 | print(json.dumps(response), flush=True)
36 |
37 | def handle_request(request, client):
38 | """Handle an MCP request using the Binary Ninja HTTP client."""
39 | try:
40 | method = request.get("method")
41 | params = request.get("params", {})
42 |
43 | # Log the request method and parameters
44 | logger.debug(f"Handling method: {method}")
45 | logger.debug(f"Parameters: {json.dumps(params)}")
46 |
47 | # MCP Protocol Methods
48 | if method == "list_tools":
49 | # Return the list of available tools
50 | return {
51 | "result": {
52 | "tools": [
53 | {
54 | "name": "get_binary_info",
55 | "description": "Get information about a binary file",
56 | "inputSchema": {
57 | "type": "object",
58 | "properties": {
59 | "path": {
60 | "type": "string",
61 | "description": "Path to the binary file"
62 | }
63 | },
64 | "required": ["path"]
65 | }
66 | },
67 | {
68 | "name": "list_functions",
69 | "description": "List all functions in a binary file",
70 | "inputSchema": {
71 | "type": "object",
72 | "properties": {
73 | "path": {
74 | "type": "string",
75 | "description": "Path to the binary file"
76 | }
77 | },
78 | "required": ["path"]
79 | }
80 | },
81 | {
82 | "name": "disassemble_function",
83 | "description": "Disassemble a function in a binary file",
84 | "inputSchema": {
85 | "type": "object",
86 | "properties": {
87 | "path": {
88 | "type": "string",
89 | "description": "Path to the binary file"
90 | },
91 | "function": {
92 | "type": "string",
93 | "description": "Name of the function to disassemble"
94 | }
95 | },
96 | "required": ["path", "function"]
97 | }
98 | },
99 | {
100 | "name": "decompile_function",
101 | "description": "Decompile a function to C code",
102 | "inputSchema": {
103 | "type": "object",
104 | "properties": {
105 | "path": {
106 | "type": "string",
107 | "description": "Path to the binary file"
108 | },
109 | "function": {
110 | "type": "string",
111 | "description": "Name of the function to decompile"
112 | }
113 | },
114 | "required": ["path", "function"]
115 | }
116 | }
117 | ]
118 | }
119 | }
120 |
121 | elif method == "list_resources":
122 | # Return the list of available resources
123 | return {
124 | "result": {
125 | "resources": [] # No static resources available
126 | }
127 | }
128 |
129 | elif method == "list_resource_templates":
130 | # Return the list of available resource templates
131 | return {
132 | "result": {
133 | "resourceTemplates": [
134 | {
135 | "uriTemplate": "binary://{path}/info",
136 | "name": "Binary Information",
137 | "description": "Information about a binary file"
138 | },
139 | {
140 | "uriTemplate": "binary://{path}/functions",
141 | "name": "Functions",
142 | "description": "List of functions in a binary file"
143 | },
144 | {
145 | "uriTemplate": "binary://{path}/function/{name}",
146 | "name": "Function Disassembly",
147 | "description": "Disassembly of a function in a binary file"
148 | }
149 | ]
150 | }
151 | }
152 |
153 | elif method == "read_resource":
154 | uri = params.get("uri", "")
155 | logger.debug(f"Reading resource: {uri}")
156 |
157 | # Parse the URI
158 | if uri.startswith("binary://"):
159 | # Remove the protocol
160 | path = uri[9:]
161 |
162 | # Check if it's a function disassembly
163 | if "/function/" in path:
164 | # Extract the path and function name
165 | parts = path.split("/function/")
166 | if len(parts) != 2:
167 | return {"error": "Invalid URI format"}
168 |
169 | binary_path = parts[0]
170 | function_name = parts[1]
171 |
172 | # Get the disassembly
173 | disasm_result = handle_request({
174 | "method": "disassemble_function",
175 | "params": {
176 | "path": binary_path,
177 | "function": function_name
178 | }
179 | }, client)
180 |
181 | if "error" in disasm_result:
182 | return disasm_result
183 |
184 | return {
185 | "result": {
186 | "contents": [
187 | {
188 | "uri": uri,
189 | "mimeType": "text/plain",
190 | "text": "\n".join(disasm_result["result"])
191 | }
192 | ]
193 | }
194 | }
195 |
196 | # Check if it's a functions list
197 | elif path.endswith("/functions"):
198 | # Extract the binary path
199 | binary_path = path[:-10] # Remove "/functions"
200 |
201 | # Get the functions
202 | functions_result = handle_request({
203 | "method": "list_functions",
204 | "params": {
205 | "path": binary_path
206 | }
207 | }, client)
208 |
209 | if "error" in functions_result:
210 | return functions_result
211 |
212 | return {
213 | "result": {
214 | "contents": [
215 | {
216 | "uri": uri,
217 | "mimeType": "application/json",
218 | "text": json.dumps(functions_result["result"])
219 | }
220 | ]
221 | }
222 | }
223 |
224 | # Check if it's binary info
225 | elif path.endswith("/info"):
226 | # Extract the binary path
227 | binary_path = path[:-5] # Remove "/info"
228 |
229 | # Get the binary info
230 | info_result = handle_request({
231 | "method": "get_binary_info",
232 | "params": {
233 | "path": binary_path
234 | }
235 | }, client)
236 |
237 | if "error" in info_result:
238 | return info_result
239 |
240 | return {
241 | "result": {
242 | "contents": [
243 | {
244 | "uri": uri,
245 | "mimeType": "application/json",
246 | "text": json.dumps(info_result["result"])
247 | }
248 | ]
249 | }
250 | }
251 |
252 | return {"error": f"Unknown resource URI: {uri}"}
253 |
254 | elif method == "call_tool":
255 | tool_name = params.get("name")
256 | tool_args = params.get("arguments", {})
257 |
258 | logger.debug(f"Calling tool: {tool_name}")
259 | logger.debug(f"Arguments: {json.dumps(tool_args)}")
260 |
261 | # Map the tool name to the corresponding method
262 | if tool_name == "get_binary_info":
263 | return handle_request({
264 | "method": "get_binary_info",
265 | "params": tool_args
266 | }, client)
267 | elif tool_name == "list_functions":
268 | return handle_request({
269 | "method": "list_functions",
270 | "params": tool_args
271 | }, client)
272 | elif tool_name == "disassemble_function":
273 | return handle_request({
274 | "method": "disassemble_function",
275 | "params": tool_args
276 | }, client)
277 | elif tool_name == "decompile_function":
278 | return handle_request({
279 | "method": "decompile_function",
280 | "params": tool_args
281 | }, client)
282 | else:
283 | return {"error": f"Unknown tool: {tool_name}"}
284 |
285 | # Binary Ninja API Methods
286 | elif method == "ping":
287 | ping_result = client.ping()
288 | if ping_result["status"] == "connected":
289 | return {"result": "pong"}
290 | else:
291 | return {"error": f"Failed to connect to Binary Ninja server: {ping_result.get('error', 'Unknown error')}"}
292 |
293 | elif method == "get_binary_info":
294 | path = params.get("path")
295 | if not path:
296 | return {"error": "Path parameter is required"}
297 |
298 | # We assume the binary is already loaded
299 | # Just log the path for debugging
300 | logger.info(f"Using binary: {path}")
301 |
302 | file_info = client.get_file_info(path)
303 |
304 | # Format the response to match the original API
305 | info = {
306 | "filename": file_info.get("filename", ""),
307 | "architecture": file_info.get("arch", {}).get("name", "unknown"),
308 | "platform": file_info.get("platform", {}).get("name", "unknown"),
309 | "entry_point": hex(file_info.get("entry_point", 0)),
310 | "file_size": file_info.get("file_size", 0),
311 | "is_executable": file_info.get("executable", False),
312 | "is_relocatable": file_info.get("relocatable", False),
313 | "address_size": file_info.get("address_size", 0)
314 | }
315 | return {"result": info}
316 |
317 | elif method == "list_functions":
318 | path = params.get("path")
319 | if not path:
320 | return {"error": "Path parameter is required"}
321 |
322 | # We assume the binary is already loaded
323 | # Just log the path for debugging
324 | logger.info(f"Using binary: {path}")
325 |
326 | functions = client.list_functions(path)
327 | func_names = [f["name"] for f in functions]
328 | return {"result": func_names}
329 |
330 | elif method == "disassemble_function":
331 | path = params.get("path")
332 | func_name = params.get("function")
333 | if not path or not func_name:
334 | return {"error": "Path and function parameters are required"}
335 |
336 | # We assume the binary is already loaded
337 | # Just log the path for debugging
338 | logger.info(f"Using binary: {path}")
339 |
340 | disasm = client.get_disassembly(path, function_name=func_name)
341 | return {"result": disasm}
342 |
343 | elif method == "list_sections":
344 | path = params.get("path")
345 | if not path:
346 | return {"error": "Path parameter is required"}
347 |
348 | # We assume the binary is already loaded
349 | # Just log the path for debugging
350 | logger.info(f"Using binary: {path}")
351 |
352 | sections_data = client.get_sections(path)
353 |
354 | # Format the response to match the original API
355 | sections = []
356 | for section in sections_data:
357 | # Handle the case where start, end, and length might be strings
358 | start = section.get("start", 0)
359 | end = section.get("end", 0)
360 | length = section.get("length", 0)
361 |
362 | # Convert to integers if they are strings
363 | if isinstance(start, str):
364 | try:
365 | start = int(start, 0) # Base 0 means it will detect hex or decimal
366 | except ValueError:
367 | start = 0
368 |
369 | if isinstance(end, str):
370 | try:
371 | end = int(end, 0) # Base 0 means it will detect hex or decimal
372 | except ValueError:
373 | end = 0
374 |
375 | if isinstance(length, str):
376 | try:
377 | length = int(length, 0) # Base 0 means it will detect hex or decimal
378 | except ValueError:
379 | length = 0
380 |
381 | sections.append({
382 | "name": section.get("name", ""),
383 | "start": hex(start),
384 | "end": hex(end),
385 | "size": length,
386 | "semantics": section.get("semantics", "")
387 | })
388 | return {"result": sections}
389 |
390 | elif method == "get_xrefs":
391 | path = params.get("path")
392 | func_name = params.get("function")
393 | if not path or not func_name:
394 | return {"error": "Path and function parameters are required"}
395 |
396 | # We assume the binary is already loaded
397 | # Just log the path for debugging
398 | logger.info(f"Using binary: {path}")
399 |
400 | # First get the function info to get its address
401 | function = client.get_function(path, function_name=func_name)
402 | if not function:
403 | return {"error": f"Function '{func_name}' not found"}
404 |
405 | # Then get the xrefs to that address
406 | xrefs_data = client.get_xrefs(path, function.get("start", 0))
407 |
408 | # Format the response to match the original API
409 | refs = []
410 | for xref in xrefs_data:
411 | # Get the function that contains this xref
412 | caller_addr = xref.get("from", 0)
413 | try:
414 | # This is a simplification - in a real implementation we would
415 | # need to find the function that contains this address
416 | caller_func = client.get_function(path, function_address=caller_addr)
417 | refs.append({
418 | "from_function": caller_func.get("name", "unknown"),
419 | "from_address": hex(caller_addr),
420 | "to_address": hex(xref.get("to", 0))
421 | })
422 | except Exception:
423 | # Skip this xref if we can't get the caller function
424 | pass
425 |
426 | return {"result": refs}
427 |
428 | elif method == "get_strings":
429 | path = params.get("path")
430 | min_length = params.get("min_length", 4)
431 | if not path:
432 | return {"error": "Path parameter is required"}
433 |
434 | # We assume the binary is already loaded
435 | # Just log the path for debugging
436 | logger.info(f"Using binary: {path}")
437 |
438 | strings_data = client.get_strings(path, min_length=min_length)
439 |
440 | # Format the response to match the original API
441 | strings = []
442 | for string in strings_data:
443 | strings.append({
444 | "value": string.get("value", ""),
445 | "address": hex(string.get("address", 0)),
446 | "length": len(string.get("value", "")),
447 | "type": string.get("type", "")
448 | })
449 |
450 | return {"result": strings}
451 |
452 | elif method == "decompile_function":
453 | path = params.get("path")
454 | func_name = params.get("function")
455 | if not path or not func_name:
456 | return {"error": "Path and function parameters are required"}
457 |
458 | # We assume the binary is already loaded
459 | # Just log the path for debugging
460 | logger.info(f"Using binary: {path}")
461 |
462 | # Get the function info
463 | function = client.get_function(path, function_name=func_name)
464 | if not function:
465 | return {"error": f"Function '{func_name}' not found"}
466 |
467 | # Get the decompiled code
468 | try:
469 | hlil = client.get_hlil(path, function_name=func_name)
470 | decompiled_code = "\n".join(hlil) if isinstance(hlil, list) else str(hlil)
471 | except Exception as e:
472 | logger.warning(f"Failed to decompile function: {e}")
473 | decompiled_code = "// Decompilation not available in personal license\n// or Binary Ninja server is not running."
474 |
475 | # Format the response to match the original API
476 | return {
477 | "result": {
478 | "name": function.get("name", ""),
479 | "signature": function.get("type", ""),
480 | "decompiled_code": decompiled_code,
481 | "address": hex(function.get("start", 0))
482 | }
483 | }
484 |
485 | elif method == "get_types":
486 | path = params.get("path")
487 | if not path:
488 | return {"error": "Path parameter is required"}
489 |
490 | # We assume the binary is already loaded
491 | # Just log the path for debugging
492 | logger.info(f"Using binary: {path}")
493 |
494 | types_data = client.get_types(path)
495 |
496 | # Format the response to match the original API
497 | # This is a simplified version - the actual implementation would need to
498 | # parse the types data from the Binary Ninja HTTP API
499 | types = []
500 | for type_name, type_info in types_data.items():
501 | type_obj = {
502 | "name": type_name,
503 | "type_class": type_info.get("type_class", "unknown"),
504 | "type_string": type_info.get("type_string", "")
505 | }
506 |
507 | if type_info.get("type_class") == "structure":
508 | type_obj["size"] = type_info.get("size", 0)
509 | type_obj["members"] = []
510 | for member in type_info.get("members", []):
511 | type_obj["members"].append({
512 | "name": member.get("name", ""),
513 | "type": member.get("type", ""),
514 | "offset": member.get("offset", 0)
515 | })
516 |
517 | types.append(type_obj)
518 |
519 | return {"result": types}
520 |
521 | elif method == "generate_header":
522 | path = params.get("path")
523 | output_path = params.get("output_path")
524 | include_functions = params.get("include_functions", True)
525 | include_types = params.get("include_types", True)
526 |
527 | if not path:
528 | return {"error": "Path parameter is required"}
529 |
530 | # We assume the binary is already loaded
531 | # Just log the path for debugging
532 | logger.info(f"Using binary: {path}")
533 |
534 | # This is a placeholder implementation
535 | # In a real implementation, we would generate a header file with function prototypes and type definitions
536 | header_content = "// Generated header file\n\n"
537 |
538 | # Add include guards
539 | header_content += "#ifndef GENERATED_HEADER_H\n"
540 | header_content += "#define GENERATED_HEADER_H\n\n"
541 |
542 | # Add standard includes
543 | header_content += "#include <stdint.h>\n"
544 | header_content += "#include <stdbool.h>\n\n"
545 |
546 | # Add types if requested
547 | if include_types:
548 | types_data = client.get_types(path)
549 | if types_data:
550 | header_content += "// Types\n"
551 | for type_name, type_info in types_data.items():
552 | if type_info.get("type_class") == "structure":
553 | header_content += f"typedef struct {type_name} {{\n"
554 | for member in type_info.get("members", []):
555 | header_content += f" {member.get('type', 'void')} {member.get('name', 'unknown')}; // offset: {member.get('offset', 0)}\n"
556 | header_content += f"}} {type_name};\n\n"
557 | else:
558 | header_content += f"typedef {type_info.get('type_string', 'void')} {type_name};\n"
559 | header_content += "\n"
560 |
561 | # Add function prototypes if requested
562 | if include_functions:
563 | functions = client.list_functions(path)
564 | if functions:
565 | header_content += "// Function prototypes\n"
566 | for func in functions:
567 | # Get the function info
568 | function = client.get_function(path, function_name=func["name"])
569 | if function:
570 | header_content += f"{function.get('type', 'void')} {function.get('name', 'unknown')}();\n"
571 | header_content += "\n"
572 |
573 | # Close include guards
574 | header_content += "#endif // GENERATED_HEADER_H\n"
575 |
576 | # Write to file if output_path is provided
577 | if output_path:
578 | try:
579 | with open(output_path, "w") as f:
580 | f.write(header_content)
581 | except Exception as e:
582 | logger.error(f"Failed to write header file: {e}")
583 | return {"error": f"Failed to write header file: {e}"}
584 |
585 | return {"result": header_content}
586 |
587 | elif method == "generate_source":
588 | path = params.get("path")
589 | output_path = params.get("output_path")
590 | header_path = params.get("header_path", "generated_header.h")
591 |
592 | if not path:
593 | return {"error": "Path parameter is required"}
594 |
595 | # We assume the binary is already loaded
596 | # Just log the path for debugging
597 | logger.info(f"Using binary: {path}")
598 |
599 | # This is a placeholder implementation
600 | # In a real implementation, we would generate a source file with function implementations
601 | source_content = "// Generated source file\n\n"
602 |
603 | # Add include for the header file
604 | source_content += f"#include \"{header_path}\"\n\n"
605 |
606 | # Add function implementations
607 | functions = client.list_functions(path)
608 | if functions:
609 | for func in functions:
610 | # Get the function info
611 | function = client.get_function(path, function_name=func["name"])
612 | if function:
613 | # Get the decompiled code
614 | try:
615 | hlil = client.get_hlil(path, function_name=func["name"])
616 | decompiled_code = "\n".join(hlil) if isinstance(hlil, list) else str(hlil)
617 | except Exception as e:
618 | logger.warning(f"Failed to decompile function: {e}")
619 | decompiled_code = "// Decompilation not available in personal license\n// or Binary Ninja server is not running."
620 |
621 | source_content += f"// Function: {function.get('name', 'unknown')}\n"
622 | source_content += f"// Address: {hex(function.get('start', 0))}\n"
623 | source_content += f"{function.get('type', 'void')} {function.get('name', 'unknown')}() {{\n"
624 | source_content += f" // TODO: Implement this function\n"
625 | source_content += f" // Decompiled code:\n"
626 | source_content += f" /*\n"
627 | for line in decompiled_code.split("\n"):
628 | source_content += f" {line}\n"
629 | source_content += f" */\n"
630 | source_content += f"}}\n\n"
631 |
632 | # Write to file if output_path is provided
633 | if output_path:
634 | try:
635 | with open(output_path, "w") as f:
636 | f.write(source_content)
637 | except Exception as e:
638 | logger.error(f"Failed to write source file: {e}")
639 | return {"error": f"Failed to write source file: {e}"}
640 |
641 | return {"result": source_content}
642 |
643 | elif method == "rebuild_driver":
644 | path = params.get("path")
645 | output_dir = params.get("output_dir")
646 |
647 | if not path:
648 | return {"error": "Path parameter is required"}
649 |
650 | if not output_dir:
651 | return {"error": "Output directory parameter is required"}
652 |
653 | # We assume the binary is already loaded
654 | # Just log the path for debugging
655 | logger.info(f"Using binary: {path}")
656 |
657 | # This is a placeholder implementation
658 | # In a real implementation, we would generate a complete driver module
659 | try:
660 | os.makedirs(output_dir, exist_ok=True)
661 |
662 | # Generate header file
663 | header_path = os.path.join(output_dir, "driver.h")
664 | header_result = handle_request({
665 | "method": "generate_header",
666 | "params": {
667 | "path": path,
668 | "output_path": header_path
669 | }
670 | }, client)
671 |
672 | if "error" in header_result:
673 | return {"error": f"Failed to generate header file: {header_result['error']}"}
674 |
675 | # Generate source file
676 | source_path = os.path.join(output_dir, "driver.c")
677 | source_result = handle_request({
678 | "method": "generate_source",
679 | "params": {
680 | "path": path,
681 | "output_path": source_path,
682 | "header_path": "driver.h"
683 | }
684 | }, client)
685 |
686 | if "error" in source_result:
687 | return {"error": f"Failed to generate source file: {source_result['error']}"}
688 |
689 | # Generate Makefile
690 | makefile_path = os.path.join(output_dir, "Makefile")
691 | with open(makefile_path, "w") as f:
692 | f.write("# Generated Makefile\n\n")
693 | f.write("obj-m := driver.o\n\n")
694 | f.write("all:\n")
695 | f.write("\tmake -C /lib/modules/$(shell uname -r)/build M=$(PWD) modules\n\n")
696 | f.write("clean:\n")
697 | f.write("\tmake -C /lib/modules/$(shell uname -r)/build M=$(PWD) clean\n")
698 |
699 | return {
700 | "result": {
701 | "header_file": header_path,
702 | "source_files": [source_path],
703 | "makefile": makefile_path
704 | }
705 | }
706 | except Exception as e:
707 | logger.error(f"Failed to rebuild driver: {e}")
708 | return {"error": f"Failed to rebuild driver: {e}"}
709 |
710 | return {"error": f"Unknown method: {method}"}
711 |
712 | except Exception as e:
713 | logger.error(f"Error handling request: {e}")
714 | logger.error(traceback.format_exc())
715 | return {
716 | "error": str(e),
717 | "traceback": traceback.format_exc()
718 | }
719 |
720 | def main():
721 | """Main function to run the MCP server."""
722 | logger.info("Starting Binary Ninja MCP Server")
723 |
724 | # Log all environment variables for debugging
725 | logger.debug("Environment variables:")
726 | for key, value in os.environ.items():
727 | logger.debug(f" {key}={value}")
728 |
729 | # Create the Binary Ninja HTTP client
730 | client = BinaryNinjaHTTPClient()
731 |
732 | # Test the connection to the Binary Ninja server
733 | ping_result = client.ping()
734 | if ping_result["status"] != "connected":
735 | logger.error(f"Failed to connect to Binary Ninja server: {ping_result.get('error', 'Unknown error')}")
736 | # Don't exit, continue anyway to support the MCP protocol
737 | logger.warning("Continuing anyway to support the MCP protocol")
738 | else:
739 | logger.info(f"Connected to Binary Ninja server (binary loaded: {ping_result.get('loaded', False)})")
740 |
741 | # Log that we're ready to receive requests
742 | logger.info("Ready to receive MCP requests")
743 |
744 | # Process requests
745 | while True:
746 | try:
747 | # Log that we're waiting for a request
748 | logger.debug("Waiting for request...")
749 |
750 | # Read the request from stdin
751 | req = read_json()
752 | logger.debug(f"Received request: {json.dumps(req)}")
753 |
754 | # Handle the request
755 | res = handle_request(req, client)
756 | res["id"] = req.get("id")
757 |
758 | # Log the response
759 | logger.debug(f"Sending response: {json.dumps(res)}")
760 |
761 | # Write the response to stdout
762 | write_json(res)
763 | except json.JSONDecodeError as e:
764 | logger.error(f"Error decoding JSON: {e}")
765 | logger.error(f"Input was: {sys.stdin.readline()}")
766 | # Continue processing requests
767 | except Exception as e:
768 | logger.error(f"Error processing request: {e}")
769 | logger.error(traceback.format_exc())
770 | # Don't exit, continue processing requests
771 | logger.warning("Continuing to process requests...")
772 |
773 | if __name__ == "__main__":
774 | main()
775 |
```