#
tokens: 36412/50000 23/24 files (page 1/2)
lines: off (toggle) GitHub
raw markdown copy
This is page 1 of 2. Use http://codebase.md/opensensor/bn_cline_mcp?page={x} to view the full context.

# Directory Structure

```
├── .gitignore
├── .idea
│   ├── editor.xml
│   ├── vcs.xml
│   └── workspace.xml
├── binaryninja_http_client.py
├── binaryninja_http_server.py
├── binaryninja_mcp_client.py
├── binaryninja_mcp_http_server.py
├── binaryninja_server.py
├── binaryninja-mcp-bridge.js
├── bridge
│   ├── bn_mcp_bridge_http.py
│   ├── bn_mcp_bridge_stdio.py
│   ├── Pipfile
│   ├── Pipfile.lock
│   └── requirements.txt
├── client.ts
├── cline-tool.json
├── example.py
├── examples
│   └── binary_analysis.ts
├── LICENSE
├── package-lock.json
├── package.json
├── README_EXTENDED.md
├── README.md
├── run_analysis.sh
├── run-bridge.sh
├── run.js
├── test_pagination.py
└── tsconfig.json
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
__pycache__/
node_modules/
.venv

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
[![MseeP.ai Security Assessment Badge](https://mseep.net/pr/opensensor-bn-cline-mcp-badge.png)](https://mseep.ai/app/opensensor-bn-cline-mcp)

# binary_ninja_cline_mcp
An MCP server for Cline that works with Binary Ninja (Personal License)

This repository contains an MCP server that allows Cline to analyze binaries using Binary Ninja.
Note:  Not all files will be used, there is also prototype of using headless Binary Ninja but my 
license is Personal so I can't test it.

## Setup

1. Install the latest of Binary Ninja MCP Plugin https://github.com/fosdickio/binary_ninja_mcp
2. Open your binary and start the MCP server from within Binary Ninja.
3. Open a terminal and run python binary_ninja_mcp_http_server.py --port 8088
4. Open another terminal and run `npm start`
5. Open Cline and add the following tool:{
Example:
```
{
  "mcpServers": {
    "BN MCP": {
      "command": "node",
      "args": ["/home/matteius/binary_ninja_cline/bn_cline_mcp/binaryninja-mcp-bridge.js"],
      "env": {
        "BN_HTTP_SERVER": "http://localhost:8088"
      },
      "autoApprove": [],
      "disabled": false,
      "timeout": 30
    }
  }
}

```
```

--------------------------------------------------------------------------------
/bridge/requirements.txt:
--------------------------------------------------------------------------------

```
anthropic>=0.49.0
fastmcp>=2.0.0
requests>=2.32.3

```

--------------------------------------------------------------------------------
/.idea/vcs.xml:
--------------------------------------------------------------------------------

```
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
  <component name="VcsDirectoryMappings">
    <mapping directory="" vcs="Git" />
  </component>
</project>
```

--------------------------------------------------------------------------------
/run-bridge.sh:
--------------------------------------------------------------------------------

```bash
#!/bin/bash
# Save this as /home/matteius/Documents/Cline/MCP/bn-mcp/run-bridge.sh
export BN_HTTP_SERVER=http://localhost:8088
cd /home/matteius/Documents/Cline/MCP/bn-mcp/
/usr/bin/node binaryninja-mcp-bridge.js

```

--------------------------------------------------------------------------------
/cline-tool.json:
--------------------------------------------------------------------------------

```json
{
  "name": "Binary Ninja MCP",
  "description": "Analyze binaries via Binary Ninja MCP server",
  "entry": "/home/matteius/Documents/Cline/MCP/bn-mcp/binaryninja_server.py",
  "language": "json",
  "tool": {
    "type": "mcp"
  }
}

```

--------------------------------------------------------------------------------
/tsconfig.json:
--------------------------------------------------------------------------------

```json
{
  "compilerOptions": {
    "target": "ES2020",
    "module": "commonjs",
    "lib": ["ES2020", "DOM"],
    "outDir": "./dist",
    "rootDir": "./",
    "strict": true,
    "esModuleInterop": true,
    "skipLibCheck": true,
    "forceConsistentCasingInFileNames": true,
    "resolveJsonModule": true
  },
  "include": [
    "*.ts"
  ],
  "exclude": [
    "node_modules",
    "dist"
  ]
}

```

--------------------------------------------------------------------------------
/package.json:
--------------------------------------------------------------------------------

```json
{
  "name": "binaryninja-mcp-bridge",
  "version": "0.1.0",
  "description": "Bridge between MCP and Binary Ninja HTTP Server",
  "type": "module",
  "main": "binaryninja-mcp-bridge.js",
  "scripts": {
    "start": "node binaryninja-mcp-bridge.js"
  },
  "author": "",
  "license": "MIT",
  "dependencies": {
    "@modelcontextprotocol/sdk": "^1.0.2",
    "node-fetch": "^3.3.2",
    "zod": "^3.22.4",
    "zod-to-json-schema": "^3.22.3"
  },
  "devDependencies": {
    "@types/node": "^22.14.0"
  }
}

```

--------------------------------------------------------------------------------
/run_analysis.sh:
--------------------------------------------------------------------------------

```bash
#!/bin/bash
# Script to run the binary analysis example

# Check if a binary path was provided
if [ $# -lt 1 ]; then
  echo "Usage: $0 <path_to_binary> [output_dir]"
  exit 1
fi

BINARY_PATH="$1"
OUTPUT_DIR="${2:-./analysis_output}"

# Check if the binary exists
if [ ! -f "$BINARY_PATH" ]; then
  echo "Error: Binary file '$BINARY_PATH' not found"
  exit 1
fi

# Create the output directory if it doesn't exist
mkdir -p "$OUTPUT_DIR"

# Run the analysis script
echo "Running binary analysis on '$BINARY_PATH'..."
echo "Results will be saved to '$OUTPUT_DIR'"
echo

# Make sure we're in the right directory
cd "$(dirname "$0")"

# Run the TypeScript example using ts-node
npx ts-node examples/binary_analysis.ts "$BINARY_PATH" "$OUTPUT_DIR"

# Check if the analysis was successful
if [ $? -eq 0 ]; then
  echo
  echo "Analysis completed successfully!"
  echo "Results are available in '$OUTPUT_DIR'"
else
  echo
  echo "Analysis failed. Please check the error messages above."
fi

```

--------------------------------------------------------------------------------
/test_pagination.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Test script to verify that pagination is working correctly in the Binary Ninja HTTP client.
This script will load a binary file and retrieve all functions, demonstrating that
pagination is working correctly by retrieving more than the default limit of 100 functions.
"""

import sys
import json
from binaryninja_http_client import BinaryNinjaHTTPClient

def main():
    if len(sys.argv) < 2:
        print(f"Usage: {sys.argv[0]} <path_to_binary>")
        sys.exit(1)
        
    binary_path = sys.argv[1]
    client = BinaryNinjaHTTPClient()
    
    # Test the connection
    ping_result = client.ping()
    print(f"Connection status: {ping_result['status']}")
    
    if ping_result['status'] != 'connected':
        print(f"Error: {ping_result.get('error', 'Unknown error')}")
        sys.exit(1)
        
    # Load the binary if not already loaded
    if not ping_result.get('loaded', False):
        try:
            print(f"Loading binary: {binary_path}")
            load_result = client.load_binary(binary_path)
            print(f"Load result: {json.dumps(load_result, indent=2)}")
        except Exception as e:
            print(f"Error loading binary: {e}")
            sys.exit(1)
    
    # Get all functions
    print("\nRetrieving all functions...")
    functions = client.list_functions()
    print(f"Retrieved {len(functions)} functions in total")
    
    # Print the first 5 and last 5 functions to verify pagination is working
    if functions:
        print("\nFirst 5 functions:")
        for i, func in enumerate(functions[:5]):
            print(f"{i+1}. {func['name']} at {func.get('address', 'unknown')}")
            
        if len(functions) > 10:
            print("\nLast 5 functions:")
            for i, func in enumerate(functions[-5:]):
                print(f"{len(functions)-4+i}. {func['name']} at {func.get('address', 'unknown')}")
    
    # Test other paginated methods
    print("\nRetrieving all imports...")
    imports = client.get_imports()
    print(f"Retrieved {len(imports)} imports in total")
    
    print("\nRetrieving all exports...")
    exports = client.get_exports()
    print(f"Retrieved {len(exports)} exports in total")
    
    print("\nRetrieving all segments...")
    segments = client.get_sections()
    print(f"Retrieved {len(segments)} segments in total")
    
    print("\nRetrieving all data items...")
    data_items = client.get_defined_data()
    print(f"Retrieved {len(data_items)} data items in total")
    
    print("\nTest completed successfully!")

if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/run.js:
--------------------------------------------------------------------------------

```javascript
#!/usr/bin/env node
/**
 * Binary Ninja MCP Bridge Runner
 * 
 * This script helps start all the necessary components for the Binary Ninja MCP setup.
 * It can start the HTTP server, the MCP bridge, or both.
 */

const { spawn } = require('child_process');
const path = require('path');
const fs = require('fs');

// Parse command line arguments
const args = process.argv.slice(2);
const command = args[0] || 'all'; // Default to 'all'

// Configuration
const HTTP_SERVER_PORT = 8088;
const HTTP_SERVER_HOST = '127.0.0.1';

// Helper function to run a command and pipe its output
function runCommand(cmd, args, name) {
  console.log(`Starting ${name}...`);
  
  const proc = spawn(cmd, args, {
    stdio: 'inherit',
    shell: true
  });
  
  proc.on('error', (err) => {
    console.error(`Error starting ${name}: ${err.message}`);
  });
  
  proc.on('close', (code) => {
    if (code !== 0) {
      console.error(`${name} exited with code ${code}`);
    } else {
      console.log(`${name} stopped`);
    }
  });
  
  return proc;
}

// Start the HTTP server
function startHttpServer() {
  const serverPath = path.join(__dirname, 'binaryninja_mcp_http_server.py');
  
  if (!fs.existsSync(serverPath)) {
    console.error(`HTTP server script not found at ${serverPath}`);
    process.exit(1);
  }
  
  return runCommand('python3', [
    serverPath,
    '--host', HTTP_SERVER_HOST,
    '--port', HTTP_SERVER_PORT.toString()
  ], 'HTTP Server');
}

// Start the MCP bridge
function startMcpBridge() {
  const bridgePath = path.join(__dirname, 'binaryninja-mcp-bridge.js');
  
  if (!fs.existsSync(bridgePath)) {
    console.error(`MCP bridge script not found at ${bridgePath}`);
    process.exit(1);
  }
  
  // Set the environment variable for the HTTP server URL
  process.env.BN_HTTP_SERVER = `http://${HTTP_SERVER_HOST}:${HTTP_SERVER_PORT}`;
  
  return runCommand('node', [bridgePath], 'MCP Bridge');
}

// Print usage information
function printUsage() {
  console.log(`
Usage: node run.js [command]

Commands:
  http     Start only the HTTP server
  bridge   Start only the MCP bridge
  all      Start both the HTTP server and MCP bridge (default)
  help     Show this help message

Example:
  node run.js all
`);
}

// Main function
function main() {
  switch (command.toLowerCase()) {
    case 'http':
      startHttpServer();
      break;
      
    case 'bridge':
      startMcpBridge();
      break;
      
    case 'all':
      startHttpServer();
      setTimeout(() => {
        startMcpBridge();
      }, 2000); // Wait 2 seconds for the HTTP server to start
      break;
      
    case 'help':
      printUsage();
      break;
      
    default:
      console.error(`Unknown command: ${command}`);
      printUsage();
      process.exit(1);
  }
}

// Run the main function
main();

```

--------------------------------------------------------------------------------
/.idea/workspace.xml:
--------------------------------------------------------------------------------

```
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
  <component name="AutoImportSettings">
    <option name="autoReloadType" value="SELECTIVE" />
  </component>
  <component name="BackendCodeEditorMiscSettings">
    <option name="/Default/RiderDebugger/RiderRestoreDecompile/RestoreDecompileSetting/@EntryValue" value="false" type="bool" />
    <option name="/Default/Housekeeping/GlobalSettingsUpgraded/IsUpgraded/@EntryValue" value="true" type="bool" />
    <option name="/Default/Housekeeping/FeatureSuggestion/FeatureSuggestionManager/DisabledSuggesters/=SwitchToGoToActionSuggester/@EntryIndexedValue" value="true" type="bool" />
    <option name="/Default/Environment/Hierarchy/GeneratedFilesCacheKey/Timestamp/@EntryValue" value="7" type="long" />
    <option name="/Default/Housekeeping/FeatureSuggestion/FeatureSuggestionManager/DisabledSuggesters/=SwitchToGoToActionSuggester/@EntryIndexRemoved" />
  </component>
  <component name="CMakeProjectFlavorService">
    <option name="flavorId" value="CMakePlainProjectFlavor" />
  </component>
  <component name="CMakeSettings">
    <configurations>
      <configuration PROFILE_NAME="Debug" ENABLED="true" CONFIG_NAME="Debug" />
    </configurations>
  </component>
  <component name="ChangeListManager">
    <list default="true" id="670a9151-2f3f-4278-bef4-d2416d369e62" name="Changes" comment="" />
    <option name="SHOW_DIALOG" value="false" />
    <option name="HIGHLIGHT_CONFLICTS" value="true" />
    <option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
    <option name="LAST_RESOLUTION" value="IGNORE" />
  </component>
  <component name="ClangdSettings">
    <option name="formatViaClangd" value="false" />
  </component>
  <component name="Git.Settings">
    <option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
  </component>
  <component name="ProjectColorInfo">{
  &quot;associatedIndex&quot;: 3
}</component>
  <component name="ProjectId" id="2vCdF1tV8kT5ok6lkq0D4zlu2P2" />
  <component name="ProjectViewState">
    <option name="hideEmptyMiddlePackages" value="true" />
    <option name="showLibraryContents" value="true" />
  </component>
  <component name="PropertiesComponent">{
  &quot;keyToString&quot;: {
    &quot;RunOnceActivity.RadMigrateCodeStyle&quot;: &quot;true&quot;,
    &quot;RunOnceActivity.ShowReadmeOnStart&quot;: &quot;true&quot;,
    &quot;RunOnceActivity.cidr.known.project.marker&quot;: &quot;true&quot;,
    &quot;RunOnceActivity.git.unshallow&quot;: &quot;true&quot;,
    &quot;RunOnceActivity.readMode.enableVisualFormatting&quot;: &quot;true&quot;,
    &quot;cf.first.check.clang-format&quot;: &quot;false&quot;,
    &quot;cidr.known.project.marker&quot;: &quot;true&quot;,
    &quot;git-widget-placeholder&quot;: &quot;main&quot;,
    &quot;last_opened_file_path&quot;: &quot;/home/matteius/bn_cline_mcp&quot;,
    &quot;node.js.detected.package.eslint&quot;: &quot;true&quot;,
    &quot;node.js.detected.package.tslint&quot;: &quot;true&quot;,
    &quot;node.js.selected.package.eslint&quot;: &quot;(autodetect)&quot;,
    &quot;node.js.selected.package.tslint&quot;: &quot;(autodetect)&quot;,
    &quot;nodejs_package_manager_path&quot;: &quot;npm&quot;,
    &quot;vue.rearranger.settings.migration&quot;: &quot;true&quot;
  }
}</component>
  <component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
  <component name="TaskManager">
    <task active="true" id="Default" summary="Default task">
      <changelist id="670a9151-2f3f-4278-bef4-d2416d369e62" name="Changes" comment="" />
      <created>1743652882529</created>
      <option name="number" value="Default" />
      <option name="presentableId" value="Default" />
      <updated>1743652882529</updated>
      <workItem from="1743652883573" duration="5236000" />
      <workItem from="1743696826637" duration="600000" />
      <workItem from="1743702948433" duration="7000" />
    </task>
    <servers />
  </component>
  <component name="TypeScriptGeneratedFilesManager">
    <option name="version" value="3" />
  </component>
</project>
```

--------------------------------------------------------------------------------
/bridge/bn_mcp_bridge_stdio.py:
--------------------------------------------------------------------------------

```python
from mcp.server.fastmcp import FastMCP
import requests


binja_server_url = "http://localhost:9009"
mcp = FastMCP("binja-mcp")


def safe_get(endpoint: str, params: dict = None) -> list:
    """
    Perform a GET request. If 'params' is given, we convert it to a query string.
    """
    if params is None:
        params = {}
    qs = [f"{k}={v}" for k, v in params.items()]
    query_string = "&".join(qs)
    url = f"{binja_server_url}/{endpoint}"
    if query_string:
        url += "?" + query_string

    try:
        response = requests.get(url, timeout=5)
        response.encoding = "utf-8"
        if response.ok:
            return response.text.splitlines()
        else:
            return [f"Error {response.status_code}: {response.text.strip()}"]
    except Exception as e:
        return [f"Request failed: {str(e)}"]


def safe_post(endpoint: str, data: dict | str) -> str:
    try:
        if isinstance(data, dict):
            response = requests.post(
                f"{binja_server_url}/{endpoint}", data=data, timeout=5
            )
        else:
            response = requests.post(
                f"{binja_server_url}/{endpoint}", data=data.encode("utf-8"), timeout=5
            )
        response.encoding = "utf-8"
        if response.ok:
            return response.text.strip()
        else:
            return f"Error {response.status_code}: {response.text.strip()}"
    except Exception as e:
        return f"Request failed: {str(e)}"


@mcp.tool()
def list_methods(offset: int = 0, limit: int = 100) -> list:
    """
    List all function names in the program with pagination.
    """
    return safe_get("methods", {"offset": offset, "limit": limit})


@mcp.tool()
def list_classes(offset: int = 0, limit: int = 100) -> list:
    """
    List all namespace/class names in the program with pagination.
    """
    return safe_get("classes", {"offset": offset, "limit": limit})


@mcp.tool()
def decompile_function(name: str) -> str:
    """
    Decompile a specific function by name and return the decompiled C code.
    """
    return safe_post("decompile", name)


@mcp.tool()
def rename_function(old_name: str, new_name: str) -> str:
    """
    Rename a function by its current name to a new user-defined name.
    """
    return safe_post("renameFunction", {"oldName": old_name, "newName": new_name})


@mcp.tool()
def rename_data(address: str, new_name: str) -> str:
    """
    Rename a data label at the specified address.
    """
    return safe_post("renameData", {"address": address, "newName": new_name})


@mcp.tool()
def list_segments(offset: int = 0, limit: int = 100) -> list:
    """
    List all memory segments in the program with pagination.
    """
    return safe_get("segments", {"offset": offset, "limit": limit})


@mcp.tool()
def list_imports(offset: int = 0, limit: int = 100) -> list:
    """
    List imported symbols in the program with pagination.
    """
    return safe_get("imports", {"offset": offset, "limit": limit})


@mcp.tool()
def list_exports(offset: int = 0, limit: int = 100) -> list:
    """
    List exported functions/symbols with pagination.
    """
    return safe_get("exports", {"offset": offset, "limit": limit})


@mcp.tool()
def list_namespaces(offset: int = 0, limit: int = 100) -> list:
    """
    List all non-global namespaces in the program with pagination.
    """
    return safe_get("namespaces", {"offset": offset, "limit": limit})


@mcp.tool()
def list_data_items(offset: int = 0, limit: int = 100) -> list:
    """
    List defined data labels and their values with pagination.
    """
    return safe_get("data", {"offset": offset, "limit": limit})


@mcp.tool()
def search_functions_by_name(query: str, offset: int = 0, limit: int = 100) -> list:
    """
    Search for functions whose name contains the given substring.
    """
    if not query:
        return ["Error: query string is required"]
    return safe_get(
        "searchFunctions", {"query": query, "offset": offset, "limit": limit}
    )


@mcp.tool()
def get_binary_status() -> str:
    """
    Get the current status of the loaded binary.
    """
    return safe_get("status")[0]


if __name__ == "__main__":
    print("Starting MCP bridge service...")
    mcp.run()

```

--------------------------------------------------------------------------------
/README_EXTENDED.md:
--------------------------------------------------------------------------------

```markdown
# Extended Binary Ninja MCP Client

This project extends the Binary Ninja MCP client to provide more comprehensive binary analysis capabilities through the Model Context Protocol (MCP).

## Features

The extended client adds the following capabilities:

### Basic Binary Analysis
- Get binary metadata
- List functions, sections, imports, exports
- Get detailed function information
- Disassemble and decompile functions
- Search for functions by name
- List C++ namespaces
- List and analyze data variables

### Advanced Analysis
- Generate comprehensive analysis reports
- Find potential vulnerabilities in binaries
- Compare two binaries to identify differences
- Rename functions and data variables

## Setup

1. Install dependencies:
```bash
cd bn_cline_mcp
npm install
npm install --save-dev @types/node
```

2. Make sure Binary Ninja is running with the MCP plugin installed.

## Usage

### TypeScript Client

The TypeScript client provides a simple API for interacting with Binary Ninja:

```typescript
import { BinaryNinjaClient } from './client';

async function main() {
  const client = new BinaryNinjaClient();
  
  try {
    // Start the server
    await client.start('/path/to/binaryninja_server.py');
    
    // Load a binary file
    const binaryPath = '/path/to/binary';
    
    // Get binary information
    const info = await client.getBinaryInfo(binaryPath);
    console.log(info);
    
    // List functions
    const functions = await client.listFunctions(binaryPath);
    console.log(functions);
    
    // Decompile a function
    const decompiled = await client.decompileFunction(binaryPath, functions[0]);
    console.log(decompiled);
    
    // Generate a comprehensive analysis report
    const report = await client.analyzeFile(binaryPath, 'report.json');
    console.log(`Found ${report.function_count} functions`);
    
    // Find potential vulnerabilities
    const vulnerabilities = await client.findVulnerabilities(binaryPath);
    console.log(`Found ${vulnerabilities.length} potential vulnerabilities`);
    
  } catch (err) {
    console.error('Error:', err);
  } finally {
    // Stop the server
    client.stop();
  }
}

main().catch(console.error);
```

### Example Scripts

The `examples` directory contains example scripts that demonstrate how to use the extended client:

- `binary_analysis.ts`: Demonstrates comprehensive binary analysis capabilities

To run an example:

```bash
cd bn_cline_mcp
npx ts-node examples/binary_analysis.ts /path/to/binary [output_dir]
```

## API Reference

### Basic Operations

- `getBinaryInfo(path: string)`: Get information about a binary file
- `listFunctions(path: string)`: List all functions in a binary file
- `getFunction(path: string, functionName: string)`: Get detailed information about a specific function
- `disassembleFunction(path: string, functionName: string)`: Disassemble a function
- `decompileFunction(path: string, functionName: string)`: Decompile a function to C code
- `listSections(path: string)`: List all sections/segments in a binary file
- `listImports(path: string)`: List all imported functions
- `listExports(path: string)`: List all exported symbols
- `listNamespaces(path: string)`: List all C++ namespaces
- `listData(path: string)`: List all defined data variables
- `searchFunctions(path: string, query: string)`: Search for functions by name
- `renameFunction(path: string, oldName: string, newName: string)`: Rename a function
- `renameData(path: string, address: string, newName: string)`: Rename a data variable

### Advanced Analysis

- `analyzeFile(path: string, outputPath?: string)`: Generate a comprehensive analysis report
- `findVulnerabilities(path: string)`: Find potential vulnerabilities in a binary file
- `compareBinaries(path1: string, path2: string)`: Compare two binary files and identify differences

## Extending the Client

You can extend the client by adding new methods to the `BinaryNinjaClient` class in `client.ts`. If you need to add new server-side functionality, you'll need to:

1. Add a new tool definition to the `MCP_TOOLS` array in `binaryninja_mcp_http_server.py`
2. Implement the handler for the new tool in the `_handle_mcp_request` method
3. Add a corresponding method to the `BinaryNinjaClient` class in `client.ts`

## Troubleshooting

- Make sure Binary Ninja is running and the MCP plugin is installed
- Check that the server path in `client.start()` is correct
- If you get TypeScript errors, make sure you've installed the required dependencies with `npm install --save-dev @types/node`

## Handling Timeouts

The client includes built-in retry logic to handle occasional timeouts that may occur when communicating with the Binary Ninja MCP server. By default, each request will:

- Timeout after 30 seconds if no response is received
- Automatically retry up to 3 times with a 1-second delay between attempts
- Log detailed error information for debugging

You can customize the retry behavior when making requests:

```typescript
// Custom retry options
const result = await client.sendRequest('some_method', { param: 'value' }, {
  maxRetries: 5,    // Retry up to 5 times
  retryDelay: 2000  // Wait 2 seconds between retries
});
```

This makes the client more robust when dealing with large binaries or complex analysis tasks that might occasionally cause timeouts.

```

--------------------------------------------------------------------------------
/binaryninja-mcp-bridge.js:
--------------------------------------------------------------------------------

```javascript
#!/usr/bin/env node
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import { z } from 'zod';
import { zodToJsonSchema } from 'zod-to-json-schema';
import fetch from 'node-fetch';

// Define the version
const VERSION = "0.1.0";

// Configuration
const BN_HTTP_SERVER = process.env.BN_HTTP_SERVER || 'http://localhost:8088';

// Schema definitions
const FilePathSchema = z.object({
  path: z.string().min(1, "File path cannot be empty")
});

const FunctionSchema = z.object({
  path: z.string().min(1, "File path cannot be empty"),
  function: z.string().min(1, "Function name cannot be empty")
});

// Create a server instance
const server = new Server(
  {
    name: "binaryninja-mcp-server",
    version: VERSION,
  },
  {
    capabilities: {
      tools: {},
    },
  }
);

// HTTP client for Binary Ninja server
async function callBinaryNinjaServer(method, params = {}) {
  try {
    console.error(`[INFO] Calling Binary Ninja server method: ${method}`);
    console.error(`[INFO] Params: ${JSON.stringify(params)}`);
    
    const response = await fetch(`${BN_HTTP_SERVER}`, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        jsonrpc: "2.0",
        id: Date.now().toString(),
        method: method,
        params: params
      })
    });
    
    if (!response.ok) {
      const errorText = await response.text();
      throw new Error(`HTTP error ${response.status}: ${errorText}`);
    }
    
    const result = await response.json();
    
    if (result.error) {
      throw new Error(`Binary Ninja server error: ${JSON.stringify(result.error)}`);
    }
    
    return result;
  } catch (error) {
    console.error(`[ERROR] Failed to call Binary Ninja server: ${error.message}`);
    throw error;
  }
}

// Register the ListTools handler
server.setRequestHandler(ListToolsRequestSchema, async () => {
  console.error("[INFO] Received ListTools request");
  return {
    tools: [
      {
        name: "get_binary_info",
        description: "Get binary metadata",
        inputSchema: zodToJsonSchema(FilePathSchema),
      },
      {
        name: "list_functions",
        description: "List functions in a binary",
        inputSchema: zodToJsonSchema(FilePathSchema),
      },
      {
        name: "disassemble_function",
        description: "Disassemble a function from a binary",
        inputSchema: zodToJsonSchema(FunctionSchema),
      },
      {
        name: "decompile_function",
        description: "Decompile a function to C",
        inputSchema: zodToJsonSchema(FunctionSchema),
      }
    ],
  };
});

// Register the CallTool handler
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  try {
    console.error(`[INFO] Received CallTool request for tool: ${request.params.name}`);
    
    if (!request.params.arguments) {
      throw new Error("Arguments are required");
    }

    let result;
    
    // Log the arguments for debugging
    console.error(`[DEBUG] Tool arguments: ${JSON.stringify(request.params.arguments)}`);
    
    // Check if arguments use 'file' instead of 'path'
    if (request.params.arguments && request.params.arguments.file !== undefined && request.params.arguments.path === undefined) {
      console.error(`[INFO] Converting 'file' parameter to 'path'`);
      request.params.arguments.path = request.params.arguments.file;
    }
    
    switch (request.params.name) {
      case "get_binary_info": {
        try {
          const args = FilePathSchema.parse(request.params.arguments);
          result = await callBinaryNinjaServer("get_binary_info", args);
        } catch (error) {
          console.error(`[ERROR] Failed to parse arguments for get_binary_info: ${error.message}`);
          console.error(`[ERROR] Arguments received: ${JSON.stringify(request.params.arguments)}`);
          throw error;
        }
        break;
      }

      case "list_functions": {
        try {
          const args = FilePathSchema.parse(request.params.arguments);
          result = await callBinaryNinjaServer("list_functions", args);
        } catch (error) {
          console.error(`[ERROR] Failed to parse arguments for list_functions: ${error.message}`);
          console.error(`[ERROR] Arguments received: ${JSON.stringify(request.params.arguments)}`);
          throw error;
        }
        break;
      }

      case "disassemble_function": {
        try {
          const args = FunctionSchema.parse(request.params.arguments);
          result = await callBinaryNinjaServer("disassemble_function", args);
        } catch (error) {
          console.error(`[ERROR] Failed to parse arguments for disassemble_function: ${error.message}`);
          console.error(`[ERROR] Arguments received: ${JSON.stringify(request.params.arguments)}`);
          throw error;
        }
        break;
      }

      case "decompile_function": {
        try {
          const args = FunctionSchema.parse(request.params.arguments);
          result = await callBinaryNinjaServer("decompile_function", args);
        } catch (error) {
          console.error(`[ERROR] Failed to parse arguments for decompile_function: ${error.message}`);
          console.error(`[ERROR] Arguments received: ${JSON.stringify(request.params.arguments)}`);
          throw error;
        }
        break;
      }

      default:
        throw new Error(`Unknown tool: ${request.params.name}`);
    }
    
    // Extract content from the result
    const content = result.result?.content?.[0]?.text || JSON.stringify(result, null, 2);
    
    return {
      content: [{ type: "text", text: content }],
    };
    
  } catch (error) {
    if (error instanceof z.ZodError) {
      console.error(`[ERROR] Validation error: ${JSON.stringify(error.errors, null, 2)}`);
      throw new Error(`Invalid input: ${JSON.stringify(error.errors)}`);
    }
    console.error(`[ERROR] Unexpected error: ${error.message}`);
    throw error;
  }
});

// Run the server
async function runServer() {
  try {
    console.error(`[INFO] Starting Binary Ninja MCP Server (connecting to ${BN_HTTP_SERVER})...`);
    const transport = new StdioServerTransport();
    await server.connect(transport);
    console.error("[INFO] Binary Ninja MCP Server running on stdio");
  } catch (error) {
    console.error(`[FATAL] Failed to start server: ${error.message}`);
    process.exit(1);
  }
}

runServer().catch((error) => {
  console.error(`[FATAL] Unhandled error in main: ${error.message}`);
  process.exit(1);
});

```

--------------------------------------------------------------------------------
/examples/binary_analysis.ts:
--------------------------------------------------------------------------------

```typescript
/**
 * Binary Analysis Example
 * 
 * This example demonstrates how to use the extended Binary Ninja MCP client
 * to perform advanced binary analysis tasks.
 */

import { BinaryNinjaClient } from '../client';
import * as path from 'path';
import * as fs from 'fs';

async function main() {
  if (process.argv.length < 3) {
    console.error('Usage: ts-node binary_analysis.ts <path_to_binary> [output_dir]');
    process.exit(1);
  }

  const binaryPath = process.argv[2];
  const outputDir = process.argv.length > 3 ? process.argv[3] : './analysis_output';
  
  // Create output directory if it doesn't exist
  if (!fs.existsSync(outputDir)) {
    fs.mkdirSync(outputDir, { recursive: true });
  }
  
  const client = new BinaryNinjaClient();

  try {
    // Start the server
    console.log('Starting Binary Ninja MCP server...');
    await client.start('/home/matteius/Documents/Cline/MCP/bn-mcp/binaryninja_server.py');
    console.log('Connected to Binary Ninja MCP server');

    // 1. Generate a comprehensive analysis report
    console.log('\n=== Generating Comprehensive Analysis Report ===');
    const reportPath = path.join(outputDir, 'analysis_report.json');
    const report = await client.analyzeFile(binaryPath, reportPath);
    console.log(`Analysis report generated and saved to ${reportPath}`);
    console.log(`Found ${report.function_count} functions, ${report.imports.length} imports, ${report.exports.length} exports`);

    // 2. Find potential vulnerabilities
    console.log('\n=== Scanning for Potential Vulnerabilities ===');
    const vulnerabilities = await client.findVulnerabilities(binaryPath);
    const vulnPath = path.join(outputDir, 'vulnerabilities.json');
    fs.writeFileSync(vulnPath, JSON.stringify(vulnerabilities, null, 2));
    console.log(`Found ${vulnerabilities.length} potential vulnerabilities`);
    console.log(`Vulnerability report saved to ${vulnPath}`);
    
    if (vulnerabilities.length > 0) {
      console.log('\nTop potential vulnerabilities:');
      for (let i = 0; i < Math.min(vulnerabilities.length, 3); i++) {
        const vuln = vulnerabilities[i];
        console.log(`${i+1}. ${vuln.type} in function ${vuln.function_name} at ${vuln.address}`);
        console.log(`   Dangerous call: ${vuln.dangerous_call || 'N/A'}`);
      }
    }

    // 3. Demonstrate function search and renaming
    console.log('\n=== Function Search and Manipulation ===');
    
    // Search for functions containing "main"
    const mainFunctions = await client.searchFunctions(binaryPath, 'main');
    console.log(`Found ${mainFunctions.length} functions matching "main"`);
    
    if (mainFunctions.length > 0) {
      // Get the first match
      const mainFunc = mainFunctions[0];
      console.log(`\nFunction details for ${mainFunc.name}:`);
      console.log(`Address: ${mainFunc.address}`);
      
      // Get detailed information
      const funcInfo = await client.getFunction(binaryPath, mainFunc.name);
      console.log(`Symbol type: ${funcInfo.symbol?.type || 'N/A'}`);
      
      // Disassemble the function
      console.log('\nDisassembly:');
      const disasm = await client.disassembleFunction(binaryPath, mainFunc.name);
      for (let i = 0; i < Math.min(disasm.length, 5); i++) {
        console.log(`  ${disasm[i]}`);
      }
      if (disasm.length > 5) {
        console.log(`  ... and ${disasm.length - 5} more lines`);
      }
      
      // Decompile the function
      console.log('\nDecompiled code:');
      const decompiled = await client.decompileFunction(binaryPath, mainFunc.name);
      const decompLines = decompiled.split('\n');
      for (let i = 0; i < Math.min(decompLines.length, 5); i++) {
        console.log(`  ${decompLines[i]}`);
      }
      if (decompLines.length > 5) {
        console.log(`  ... and ${decompLines.length - 5} more lines`);
      }
      
      // Save the decompiled code to a file
      const decompPath = path.join(outputDir, `${mainFunc.name}.c`);
      fs.writeFileSync(decompPath, decompiled);
      console.log(`\nDecompiled code saved to ${decompPath}`);
      
      // Example of renaming (commented out to avoid modifying the binary)
      /*
      console.log('\nRenaming function...');
      const newName = `${mainFunc.name}_analyzed`;
      const renameResult = await client.renameFunction(binaryPath, mainFunc.name, newName);
      console.log(`Rename result: ${JSON.stringify(renameResult)}`);
      */
    }

    // 4. List and analyze data variables
    console.log('\n=== Data Variables Analysis ===');
    const dataVars = await client.listData(binaryPath);
    console.log(`Found ${dataVars.length} data variables`);
    
    if (dataVars.length > 0) {
      console.log('\nSample data variables:');
      for (let i = 0; i < Math.min(dataVars.length, 5); i++) {
        const dataVar = dataVars[i];
        console.log(`${i+1}. ${dataVar.name || '(unnamed)'} at ${dataVar.address}`);
        console.log(`   Type: ${dataVar.type || 'unknown'}`);
        console.log(`   Value: ${dataVar.value || 'N/A'}`);
      }
      
      // Save data variables to a file
      const dataPath = path.join(outputDir, 'data_variables.json');
      fs.writeFileSync(dataPath, JSON.stringify(dataVars, null, 2));
      console.log(`\nData variables saved to ${dataPath}`);
    }

    // 5. Analyze imports and exports
    console.log('\n=== Imports and Exports Analysis ===');
    const imports = await client.listImports(binaryPath);
    const exports = await client.listExports(binaryPath);
    
    console.log(`Found ${imports.length} imports and ${exports.length} exports`);
    
    // Save imports and exports to files
    const importsPath = path.join(outputDir, 'imports.json');
    const exportsPath = path.join(outputDir, 'exports.json');
    
    fs.writeFileSync(importsPath, JSON.stringify(imports, null, 2));
    fs.writeFileSync(exportsPath, JSON.stringify(exports, null, 2));
    
    console.log(`Imports saved to ${importsPath}`);
    console.log(`Exports saved to ${exportsPath}`);
    
    // 6. Analyze C++ namespaces (if any)
    console.log('\n=== C++ Namespaces Analysis ===');
    const namespaces = await client.listNamespaces(binaryPath);
    
    if (namespaces.length > 0) {
      console.log(`Found ${namespaces.length} C++ namespaces:`);
      for (let i = 0; i < Math.min(namespaces.length, 5); i++) {
        console.log(`  ${i+1}. ${namespaces[i]}`);
      }
      if (namespaces.length > 5) {
        console.log(`  ... and ${namespaces.length - 5} more namespaces`);
      }
      
      // Save namespaces to a file
      const namespacesPath = path.join(outputDir, 'namespaces.json');
      fs.writeFileSync(namespacesPath, JSON.stringify(namespaces, null, 2));
      console.log(`Namespaces saved to ${namespacesPath}`);
    } else {
      console.log('No C++ namespaces found in the binary');
    }

    console.log('\n=== Analysis Complete ===');
    console.log(`All analysis results have been saved to ${outputDir}`);
    
  } catch (err) {
    console.error('Error:', err);
  } finally {
    // Stop the server
    client.stop();
  }
}

// Run the example if this file is executed directly
if (require.main === module) {
  main().catch(console.error);
}

```

--------------------------------------------------------------------------------
/binaryninja_mcp_client.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Binary Ninja MCP Client

This module provides a client for interacting with the Binary Ninja MCP server.
The Binary Ninja MCP server is a plugin that provides an HTTP API for Binary Ninja.
"""

import requests
import json
import time
import logging
import sys

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('BinaryNinjaMCPClient')

class BinaryNinjaMCPClient:
    """Client for interacting with the Binary Ninja MCP server."""
    
    def __init__(self, host='localhost', port=9009):
        """Initialize the client with the server address."""
        self.base_url = f"http://{host}:{port}"
        self.session = requests.Session()
        logger.info(f"Initialized Binary Ninja MCP client for {self.base_url}")
        
    def _request(self, method, endpoint, data=None, params=None, timeout=60):
        """Make a request to the Binary Ninja MCP server."""
        url = f"{self.base_url}/{endpoint}"
        try:
            if method == 'GET':
                response = self.session.get(url, params=params, timeout=timeout)
            elif method == 'POST':
                response = self.session.post(url, json=data, timeout=timeout)
            else:
                raise ValueError(f"Unsupported HTTP method: {method}")
            
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            logger.error(f"Error making request to {url}: {e}")
            raise
            
    def ping(self):
        """Test the connection to the Binary Ninja server."""
        try:
            # Try to get the status of the current binary view
            response = self._request('GET', 'status')
            return {"status": "connected", "loaded": response.get("loaded", False)}
        except Exception as e:
            # If that fails, try a simple request to the root URL
            try:
                response = requests.get(f"{self.base_url}/", timeout=5)
                if response.status_code == 200 or response.status_code == 404:
                    # Even a 404 means the server is running
                    return {"status": "connected", "loaded": False}
                else:
                    logger.error(f"Failed to ping Binary Ninja server: {response.status_code}")
                    return {"status": "disconnected", "error": f"HTTP error: {response.status_code}"}
            except Exception as e2:
                logger.error(f"Failed to ping Binary Ninja server: {e2}")
                return {"status": "disconnected", "error": str(e2)}
            
    def load_binary(self, file_path):
        """Load a binary file."""
        try:
            data = {"filepath": file_path}
            response = self._request('POST', 'load', data=data)
            return response
        except Exception as e:
            logger.error(f"Failed to load file {file_path}: {e}")
            raise
            
    def get_status(self):
        """Get the current status of the binary view."""
        try:
            response = self._request('GET', 'status')
            return response
        except Exception as e:
            logger.error(f"Failed to get status: {e}")
            raise
            
    def list_functions(self, offset=0, limit=100):
        """List all functions in a binary file."""
        try:
            params = {"offset": offset, "limit": limit}
            response = self._request('GET', 'functions', params=params)
            return response.get("functions", [])
        except Exception as e:
            logger.error(f"Failed to list functions: {e}")
            raise
            
    def list_classes(self, offset=0, limit=100):
        """List all classes in a binary file."""
        try:
            params = {"offset": offset, "limit": limit}
            response = self._request('GET', 'classes', params=params)
            return response.get("classes", [])
        except Exception as e:
            logger.error(f"Failed to list classes: {e}")
            raise
            
    def list_segments(self, offset=0, limit=100):
        """List all segments in a binary file."""
        try:
            params = {"offset": offset, "limit": limit}
            response = self._request('GET', 'segments', params=params)
            return response.get("segments", [])
        except Exception as e:
            logger.error(f"Failed to list segments: {e}")
            raise
            
    def list_imports(self, offset=0, limit=100):
        """List all imported functions in a binary file."""
        try:
            params = {"offset": offset, "limit": limit}
            response = self._request('GET', 'imports', params=params)
            return response.get("imports", [])
        except Exception as e:
            logger.error(f"Failed to list imports: {e}")
            raise
            
    def list_exports(self, offset=0, limit=100):
        """List all exported symbols in a binary file."""
        try:
            params = {"offset": offset, "limit": limit}
            response = self._request('GET', 'exports', params=params)
            return response.get("exports", [])
        except Exception as e:
            logger.error(f"Failed to list exports: {e}")
            raise
            
    def list_namespaces(self, offset=0, limit=100):
        """List all namespaces in a binary file."""
        try:
            params = {"offset": offset, "limit": limit}
            response = self._request('GET', 'namespaces', params=params)
            return response.get("namespaces", [])
        except Exception as e:
            logger.error(f"Failed to list namespaces: {e}")
            raise
            
    def list_data(self, offset=0, limit=100):
        """List all data variables in a binary file."""
        try:
            params = {"offset": offset, "limit": limit}
            response = self._request('GET', 'data', params=params)
            return response.get("data", [])
        except Exception as e:
            logger.error(f"Failed to list data: {e}")
            raise
            
    def search_functions(self, query, offset=0, limit=100):
        """Search for functions by name."""
        try:
            params = {"query": query, "offset": offset, "limit": limit}
            response = self._request('GET', 'searchFunctions', params=params)
            return response.get("matches", [])
        except Exception as e:
            logger.error(f"Failed to search functions: {e}")
            raise
            
    def decompile_function(self, function_name):
        """Decompile a function by name."""
        try:
            params = {"name": function_name}
            response = self._request('GET', 'decompile', params=params)
            return response
        except Exception as e:
            logger.error(f"Failed to decompile function {function_name}: {e}")
            raise
            
    def rename_function(self, old_name, new_name):
        """Rename a function."""
        try:
            data = {"oldName": old_name, "newName": new_name}
            response = self._request('POST', 'rename/function', data=data)
            return response
        except Exception as e:
            logger.error(f"Failed to rename function {old_name} to {new_name}: {e}")
            raise
            
    def rename_data(self, address, new_name):
        """Rename a data variable."""
        try:
            data = {"address": address, "newName": new_name}
            response = self._request('POST', 'rename/data', data=data)
            return response
        except Exception as e:
            logger.error(f"Failed to rename data at {address} to {new_name}: {e}")
            raise

# Example usage
if __name__ == "__main__":
    if len(sys.argv) < 2:
        print(f"Usage: {sys.argv[0]} <path_to_binary>")
        sys.exit(1)
        
    binary_path = sys.argv[1]
    client = BinaryNinjaMCPClient()
    
    # Test the connection
    ping_result = client.ping()
    print(f"Connection status: {ping_result['status']}")
    if ping_result['status'] == 'connected':
        print(f"Binary file loaded: {ping_result.get('loaded', False)}")
        
        # If no binary is loaded, try to load one
        if not ping_result.get('loaded', False):
            try:
                print(f"\nLoading binary: {binary_path}")
                load_result = client.load_binary(binary_path)
                print(f"Load result: {json.dumps(load_result, indent=2)}")
            except Exception as e:
                print(f"Error loading binary: {e}")
                sys.exit(1)
        
        # Get status
        try:
            status = client.get_status()
            print(f"\nBinary status: {json.dumps(status, indent=2)}")
        except Exception as e:
            print(f"Error getting status: {e}")
        
        # List functions
        try:
            functions = client.list_functions()
            print(f"\nFound {len(functions)} functions")
            for i, func in enumerate(functions[:5]):  # Show only first 5 functions
                print(f"{i+1}. {func['name']} at {func.get('address', 'unknown')}")
        except Exception as e:
            print(f"Error listing functions: {e}")
            
        # If there are functions, decompile the first one
        if functions:
            func = functions[0]
            try:
                print(f"\nDecompiling function: {func['name']}")
                decompiled = client.decompile_function(func['name'])
                print(f"Decompiled function: {func['name']}")
                print(decompiled.get('decompiled', 'No decompilation available'))
            except Exception as e:
                print(f"Error decompiling function: {e}")
    else:
        print(f"Error: {ping_result.get('error', 'Unknown error')}")

```

--------------------------------------------------------------------------------
/binaryninja_http_server.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Binary Ninja MCP Server (HTTP Client Version)

This server provides an interface for Cline to analyze binary files using the Binary Ninja HTTP API.
It connects to a running Binary Ninja instance (personal license) on localhost:9009.
"""

import sys
import json
import traceback
import os
import logging
from binaryninja_http_client import BinaryNinjaHTTPClient

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('BinaryNinjaMCPServer')

def read_json():
    """Read a JSON object from stdin."""
    line = sys.stdin.readline()
    if not line:
        sys.exit(0)
    return json.loads(line)

def write_json(response):
    """Write a JSON object to stdout."""
    print(json.dumps(response), flush=True)

def handle_request(request, client):
    """Handle an MCP request using the Binary Ninja HTTP client."""
    try:
        method = request.get("method")
        params = request.get("params", {})

        if method == "ping":
            ping_result = client.ping()
            if ping_result["status"] == "connected":
                return {"result": "pong"}
            else:
                return {"error": f"Failed to connect to Binary Ninja server: {ping_result.get('error', 'Unknown error')}"}

        elif method == "list_functions":
            path = params.get("path")
            if not path:
                return {"error": "Path parameter is required"}
                
            functions = client.list_functions(path)
            func_names = [f["name"] for f in functions]
            return {"result": func_names}

        elif method == "disassemble_function":
            path = params.get("path")
            func_name = params.get("function")
            if not path or not func_name:
                return {"error": "Path and function parameters are required"}
                
            disasm = client.get_disassembly(path, function_name=func_name)
            return {"result": disasm}
            
        elif method == "get_binary_info":
            path = params.get("path")
            if not path:
                return {"error": "Path parameter is required"}
                
            file_info = client.get_file_info(path)
            
            # Format the response to match the original API
            info = {
                "filename": file_info.get("filename", ""),
                "architecture": file_info.get("arch", {}).get("name", "unknown"),
                "platform": file_info.get("platform", {}).get("name", "unknown"),
                "entry_point": hex(file_info.get("entry_point", 0)),
                "file_size": file_info.get("file_size", 0),
                "is_executable": file_info.get("executable", False),
                "is_relocatable": file_info.get("relocatable", False),
                "address_size": file_info.get("address_size", 0)
            }
            return {"result": info}
            
        elif method == "list_sections":
            path = params.get("path")
            if not path:
                return {"error": "Path parameter is required"}
                
            sections_data = client.get_sections(path)
            
            # Format the response to match the original API
            sections = []
            for section in sections_data:
                sections.append({
                    "name": section.get("name", ""),
                    "start": hex(section.get("start", 0)),
                    "end": hex(section.get("end", 0)),
                    "size": section.get("length", 0),
                    "semantics": section.get("semantics", "")
                })
            return {"result": sections}
            
        elif method == "get_xrefs":
            path = params.get("path")
            func_name = params.get("function")
            if not path or not func_name:
                return {"error": "Path and function parameters are required"}
                
            # First get the function info to get its address
            function = client.get_function(path, function_name=func_name)
            if not function:
                return {"error": f"Function '{func_name}' not found"}
                
            # Then get the xrefs to that address
            xrefs_data = client.get_xrefs(path, function.get("start", 0))
            
            # Format the response to match the original API
            refs = []
            for xref in xrefs_data:
                # Get the function that contains this xref
                caller_addr = xref.get("from", 0)
                try:
                    # This is a simplification - in a real implementation we would
                    # need to find the function that contains this address
                    caller_func = client.get_function(path, function_address=caller_addr)
                    refs.append({
                        "from_function": caller_func.get("name", "unknown"),
                        "from_address": hex(caller_addr),
                        "to_address": hex(xref.get("to", 0))
                    })
                except Exception:
                    # Skip this xref if we can't get the caller function
                    pass
            
            return {"result": refs}
            
        elif method == "get_strings":
            path = params.get("path")
            min_length = params.get("min_length", 4)
            if not path:
                return {"error": "Path parameter is required"}
                
            strings_data = client.get_strings(path, min_length=min_length)
            
            # Format the response to match the original API
            strings = []
            for string in strings_data:
                strings.append({
                    "value": string.get("value", ""),
                    "address": hex(string.get("address", 0)),
                    "length": len(string.get("value", "")),
                    "type": string.get("type", "")
                })
            
            return {"result": strings}
            
        elif method == "decompile_function":
            path = params.get("path")
            func_name = params.get("function")
            if not path or not func_name:
                return {"error": "Path and function parameters are required"}
                
            # Get the function info
            function = client.get_function(path, function_name=func_name)
            if not function:
                return {"error": f"Function '{func_name}' not found"}
                
            # Get the decompiled code
            hlil = client.get_hlil(path, function_name=func_name)
            
            # Format the response to match the original API
            return {
                "result": {
                    "name": function.get("name", ""),
                    "signature": function.get("type", ""),
                    "decompiled_code": "\n".join(hlil) if isinstance(hlil, list) else str(hlil),
                    "address": hex(function.get("start", 0))
                }
            }
            
        elif method == "get_types":
            path = params.get("path")
            if not path:
                return {"error": "Path parameter is required"}
                
            types_data = client.get_types(path)
            
            # Format the response to match the original API
            # This is a simplified version - the actual implementation would need to
            # parse the types data from the Binary Ninja HTTP API
            types = []
            for type_name, type_info in types_data.items():
                type_obj = {
                    "name": type_name,
                    "type_class": type_info.get("type_class", "unknown"),
                    "type_string": type_info.get("type_string", "")
                }
                
                if type_info.get("type_class") == "structure":
                    type_obj["size"] = type_info.get("size", 0)
                    type_obj["members"] = []
                    for member in type_info.get("members", []):
                        type_obj["members"].append({
                            "name": member.get("name", ""),
                            "type": member.get("type", ""),
                            "offset": member.get("offset", 0)
                        })
                
                types.append(type_obj)
            
            return {"result": types}
            
        elif method == "generate_header":
            # This is a more complex operation that would require additional implementation
            # For now, we'll return a simplified version
            return {"error": "Method not implemented in HTTP client version"}
            
        elif method == "generate_source":
            # This is a more complex operation that would require additional implementation
            # For now, we'll return a simplified version
            return {"error": "Method not implemented in HTTP client version"}
            
        elif method == "rebuild_driver":
            # This is a more complex operation that would require additional implementation
            # For now, we'll return a simplified version
            return {"error": "Method not implemented in HTTP client version"}

        return {"error": f"Unknown method: {method}"}

    except Exception as e:
        logger.error(f"Error handling request: {e}")
        logger.error(traceback.format_exc())
        return {
            "error": str(e),
            "traceback": traceback.format_exc()
        }

def main():
    """Main function to run the MCP server."""
    logger.info("Starting Binary Ninja MCP Server (HTTP Client Version)")
    
    # Create the Binary Ninja HTTP client
    client = BinaryNinjaHTTPClient()
    
    # Test the connection to the Binary Ninja server
    ping_result = client.ping()
    if ping_result["status"] != "connected":
        logger.error(f"Failed to connect to Binary Ninja server: {ping_result.get('error', 'Unknown error')}")
        sys.exit(1)
        
    logger.info(f"Connected to Binary Ninja server (binary loaded: {ping_result.get('loaded', False)})")
    
    # Process requests
    while True:
        try:
            req = read_json()
            res = handle_request(req, client)
            res["id"] = req.get("id")
            write_json(res)
        except Exception as e:
            logger.error(f"Error processing request: {e}")
            logger.error(traceback.format_exc())
            sys.exit(1)

if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/example.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Example script demonstrating how to use the Binary Ninja MCP server.
This script shows how to analyze a binary file using the Binary Ninja API.

Usage:
    python3 example.py <path_to_binary> [output_dir]

The path_to_binary parameter is required for the MCP server to identify which binary to analyze.
If output_dir is provided, source code reconstruction will save files there.
"""

import sys
import json
import subprocess
import os
import tempfile

def send_request(server_process, method, params=None):
    """Send a request to the Binary Ninja MCP server."""
    if params is None:
        params = {}
    
    request = {
        "id": 1,
        "method": method,
        "params": params
    }
    
    server_process.stdin.write(json.dumps(request) + "\n")
    server_process.stdin.flush()
    
    response = json.loads(server_process.stdout.readline())
    return response

def main():
    if len(sys.argv) < 2:
        print(f"Usage: {sys.argv[0]} <path_to_binary> [output_dir]")
        sys.exit(1)
    
    binary_path = os.path.abspath(sys.argv[1])
    if not os.path.exists(binary_path):
        print(f"Error: Binary file '{binary_path}' not found")
        sys.exit(1)
    
    # Start the Binary Ninja MCP server
    server_path = os.path.join(os.path.dirname(__file__), "binaryninja_server.py")
    server_process = subprocess.Popen(
        ["python3", server_path],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
        bufsize=1
    )
    
    try:
        # Test the server connection
        response = send_request(server_process, "ping")
        if response.get("result") != "pong":
            print("Error: Failed to connect to the Binary Ninja MCP server")
            sys.exit(1)
        
        print("Connected to Binary Ninja MCP server")
        
        # Get binary information
        print("\n=== Binary Information ===")
        response = send_request(server_process, "get_binary_info", {"path": binary_path})
        if "error" in response:
            print(f"Error: {response['error']}")
            sys.exit(1)
        
        info = response["result"]
        print(f"Filename: {info['filename']}")
        print(f"Architecture: {info['architecture']}")
        print(f"Platform: {info['platform']}")
        print(f"Entry Point: {info['entry_point']}")
        print(f"File Size: {info['file_size']} bytes")
        print(f"Executable: {info['is_executable']}")
        print(f"Relocatable: {info['is_relocatable']}")
        print(f"Address Size: {info['address_size']} bits")
        
        # List sections
        print("\n=== Sections ===")
        response = send_request(server_process, "list_sections", {"path": binary_path})
        if "error" in response:
            print(f"Error: {response['error']}")
            sys.exit(1)
        
        sections = response["result"]
        for section in sections:
            print(f"{section['name']}: {section['start']} - {section['end']} ({section['size']} bytes) [{section['semantics']}]")
        
        # List functions
        print("\n=== Functions ===")
        response = send_request(server_process, "list_functions", {"path": binary_path})
        if "error" in response:
            print(f"Error: {response['error']}")
            sys.exit(1)
        
        functions = response["result"]
        for i, func in enumerate(functions[:10]):  # Show only first 10 functions
            print(f"{i+1}. {func}")
        
        if len(functions) > 10:
            print(f"... and {len(functions) - 10} more functions")
        
        # If there are functions, disassemble the first one
        if functions:
            func_name = functions[0]
            print(f"\n=== Disassembly of '{func_name}' ===")
            response = send_request(server_process, "disassemble_function", {
                "path": binary_path,
                "function": func_name
            })
            if "error" in response:
                print(f"Error: {response['error']}")
                sys.exit(1)
            
            disasm = response["result"]
            for i, instr in enumerate(disasm):
                print(f"{i+1:3d}. {instr}")
            
            # Get cross-references to this function
            print(f"\n=== Cross-references to '{func_name}' ===")
            response = send_request(server_process, "get_xrefs", {
                "path": binary_path,
                "function": func_name
            })
            if "error" in response:
                print(f"Error: {response['error']}")
                sys.exit(1)
            
            xrefs = response["result"]
            if xrefs:
                for xref in xrefs:
                    print(f"From: {xref['from_function']} at {xref['from_address']} to {xref['to_address']}")
            else:
                print("No cross-references found")
        
        # Get strings
        print("\n=== Strings ===")
        response = send_request(server_process, "get_strings", {
            "path": binary_path,
            "min_length": 5
        })
        if "error" in response:
            print(f"Error: {response['error']}")
            sys.exit(1)
        
        strings = response["result"]
        for i, string in enumerate(strings[:10]):  # Show only first 10 strings
            print(f"{i+1}. {string['address']}: '{string['value']}'")
        
        if len(strings) > 10:
            print(f"... and {len(strings) - 10} more strings")
            
        # Source Code Reconstruction
        if len(sys.argv) > 2:
            output_dir = sys.argv[2]
            os.makedirs(output_dir, exist_ok=True)
            
            # Decompile the first function
            if functions:
                func_name = functions[0]
                print(f"\n=== Decompiled C Code for '{func_name}' ===")
                response = send_request(server_process, "decompile_function", {
                    "path": binary_path,
                    "function": func_name
                })
                if "error" in response:
                    print(f"Error: {response['error']}")
                else:
                    decompiled = response["result"]
                    print(f"Function: {decompiled['name']}")
                    print(f"Signature: {decompiled['signature']}")
                    print(f"Address: {decompiled['address']}")
                    print("\nDecompiled Code:")
                    print(decompiled['decompiled_code'])
                    
                    # Save decompiled code to file
                    decompiled_path = os.path.join(output_dir, f"{func_name}.c")
                    with open(decompiled_path, "w") as f:
                        f.write(f"// Decompiled function: {func_name}\n")
                        f.write(f"// Address: {decompiled['address']}\n\n")
                        f.write(decompiled['decompiled_code'])
                    print(f"Saved decompiled code to {decompiled_path}")
            
            # Extract types
            print("\n=== Data Types ===")
            response = send_request(server_process, "get_types", {"path": binary_path})
            if "error" in response:
                print(f"Error: {response['error']}")
            else:
                types = response["result"]
                print(f"Found {len(types)} types")
                
                # Show first 5 types
                for i, type_info in enumerate(types[:5]):
                    print(f"\n{i+1}. {type_info['name']} ({type_info['type_class']})")
                    if type_info['type_class'] == 'structure':
                        print(f"   Size: {type_info['size']} bytes")
                        print("   Members:")
                        for member in type_info['members']:
                            print(f"     - {member['name']}: {member['type']} (offset: {member['offset']})")
                
                if len(types) > 5:
                    print(f"... and {len(types) - 5} more types")
                
                # Save types to file
                types_path = os.path.join(output_dir, "types.json")
                with open(types_path, "w") as f:
                    json.dump(types, f, indent=2)
                print(f"Saved types to {types_path}")
            
            # Generate header file
            print("\n=== Generated Header File ===")
            header_path = os.path.join(output_dir, "generated_header.h")
            response = send_request(server_process, "generate_header", {
                "path": binary_path,
                "output_path": header_path
            })
            if "error" in response:
                print(f"Error: {response['error']}")
            else:
                header_content = response["result"]
                print(f"Generated header file saved to {header_path}")
                print("\nFirst 10 lines:")
                for line in header_content.split("\n")[:10]:
                    print(line)
                print("...")
            
            # Generate source file
            print("\n=== Generated Source File ===")
            source_path = os.path.join(output_dir, "generated_source.c")
            response = send_request(server_process, "generate_source", {
                "path": binary_path,
                "output_path": source_path,
                "header_path": "generated_header.h"
            })
            if "error" in response:
                print(f"Error: {response['error']}")
            else:
                source_content = response["result"]
                print(f"Generated source file saved to {source_path}")
                print("\nFirst 10 lines:")
                for line in source_content.split("\n")[:10]:
                    print(line)
                print("...")
            
            # Rebuild driver (if it's a driver module)
            if binary_path.endswith(".ko") or "driver" in binary_path.lower() or "module" in binary_path.lower():
                print("\n=== Rebuilding Driver Module ===")
                driver_dir = os.path.join(output_dir, "driver")
                response = send_request(server_process, "rebuild_driver", {
                    "path": binary_path,
                    "output_dir": driver_dir
                })
                if "error" in response:
                    print(f"Error: {response['error']}")
                else:
                    result = response["result"]
                    print("Driver module rebuilt successfully!")
                    print(f"Header file: {result['header_file']}")
                    print(f"Source files: {len(result['source_files'])} files generated")
                    print(f"Makefile: {result['makefile']}")
                    print(f"\nTo build the driver, run:")
                    print(f"cd {driver_dir} && make")
        else:
            print("\nTo see source code reconstruction examples, provide an output directory:")
            print(f"python3 {sys.argv[0]} {binary_path} /path/to/output/dir")
    finally:
        # Terminate the server process
        server_process.terminate()
        server_process.wait()

if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/binaryninja_http_client.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Binary Ninja HTTP API Client

This module provides a client for interacting with the Binary Ninja HTTP API server.
The Binary Ninja personal license runs a server on localhost:9009 that we can connect to.
"""

import requests
import json
import time
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('BinaryNinjaClient')

class BinaryNinjaHTTPClient:
    """Client for interacting with the Binary Ninja HTTP API server."""
    
    def __init__(self, host='localhost', port=9009):
        """Initialize the client with the server address."""
        self.base_url = f"http://{host}:{port}"
        self.session = requests.Session()
        logger.info(f"Initialized Binary Ninja HTTP client for {self.base_url}")
        
    def _request(self, method, endpoint, data=None, params=None, timeout=60):
        """Make a request to the Binary Ninja HTTP API."""
        url = f"{self.base_url}/{endpoint}"
        try:
            if method == 'GET':
                response = self.session.get(url, params=params, timeout=timeout)
            elif method == 'POST':
                response = self.session.post(url, json=data, timeout=timeout)
            else:
                raise ValueError(f"Unsupported HTTP method: {method}")
            
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            logger.error(f"Error making request to {url}: {e}")
            raise
            
    def ping(self):
        """Test the connection to the Binary Ninja server."""
        try:
            # Try to get the status
            try:
                status = self._request('GET', 'status')
                return {
                    "status": "connected",
                    "loaded": status.get("loaded", False),
                    "filename": status.get("filename", "")
                }
            except Exception as e:
                # If we can't connect to the Binary Ninja server, return a fake response
                # This is useful for testing the MCP server without a running Binary Ninja instance
                logger.warning(f"Failed to connect to Binary Ninja server: {e}")
                logger.warning("Returning fake response for testing purposes")
                return {
                    "status": "connected",
                    "loaded": True,
                    "filename": "test.bndb"
                }
        except Exception as e:
            logger.error(f"Failed to ping Binary Ninja server: {e}")
            return {"status": "disconnected", "error": str(e)}
            
    def get_status(self):
        """Get the current status of the binary view."""
        try:
            try:
                response = self._request('GET', 'status')
                return response
            except Exception as e:
                # If we can't connect to the Binary Ninja server, return a fake response
                # This is useful for testing the MCP server without a running Binary Ninja instance
                logger.warning(f"Failed to get status from Binary Ninja server: {e}")
                logger.warning("Returning fake status for testing purposes")
                return {
                    "loaded": True,
                    "filename": "test.bndb"
                }
        except Exception as e:
            logger.error(f"Failed to get status: {e}")
            raise
            
    def get_file_info(self, file_path):
        """Get information about the currently open file."""
        try:
            # Get the status to get the filename
            status = self.get_status()
            
            # Return basic file info
            return {
                "filename": status.get("filename", ""),
                "arch": {"name": "unknown"},  # We don't have access to this info
                "platform": {"name": "unknown"},  # We don't have access to this info
                "entry_point": 0,  # We don't have access to this info
                "file_size": 0,  # We don't have access to this info
                "executable": True,  # Assume it's executable
                "relocatable": False,  # Assume it's not relocatable
                "address_size": 64  # Assume 64-bit
            }
        except Exception as e:
            logger.error(f"Failed to get file info: {e}")
            raise
            
    def list_functions(self, file_path=None):
        """List all functions in the currently open binary file."""
        try:
            # Get all functions with pagination
            all_functions = []
            offset = 0
            limit = 100
            
            while True:
                response = self._request('GET', 'functions', params={"offset": offset, "limit": limit})
                functions = response.get("functions", [])
                
                if not functions:
                    break
                    
                all_functions.extend(functions)
                
                # If we got fewer functions than the limit, we've reached the end
                if len(functions) < limit:
                    break
                    
                # Move to the next page
                offset += limit
                
            logger.info(f"Retrieved {len(all_functions)} functions in total")
            return all_functions
        except Exception as e:
            logger.error(f"Failed to list functions: {e}")
            raise
            
    def get_function(self, file_path=None, function_name=None, function_address=None):
        """Get information about a specific function."""
        try:
            # Get all functions and find the one we want
            functions = self.list_functions()
            
            if function_name:
                for func in functions:
                    if func.get("name") == function_name:
                        return func
            
            if function_address:
                for func in functions:
                    if func.get("address") == function_address or func.get("start") == function_address:
                        return func
                        
            return None
        except Exception as e:
            logger.error(f"Failed to get function info: {e}")
            raise
            
    def get_disassembly(self, file_path=None, function_name=None, function_address=None):
        """Get the disassembly of a specific function."""
        try:
            # Get function info first to get the address
            identifier = function_name if function_name else function_address
            if identifier is None:
                return ["No function identifier provided"]
                
            # Convert to string if it's not already
            if not isinstance(identifier, str):
                identifier = str(identifier)
                
            # Use the function info endpoint to get the function details
            # Since there's no direct disassembly endpoint, we'll use the function info
            # and format it as disassembly lines
            try:
                # First try to get function info
                response = self._request('GET', 'searchFunctions', params={"query": identifier})
                matches = response.get("matches", [])
                
                if not matches:
                    return [f"Function '{identifier}' not found"]
                
                # Get the first match
                func = matches[0]
                
                # Format the function info as disassembly lines
                disasm = []
                disasm.append(f"Function: {func.get('name', 'unknown')}")
                disasm.append(f"Address: {func.get('address', '0x0')}")
                
                # Try to get the decompiled code to show as pseudo-disassembly
                try:
                    decompiled = self.get_hlil(file_path, function_name=func.get('name'))
                    if decompiled and decompiled != "No decompilation available":
                        disasm.append("Decompiled code:")
                        for line in decompiled.split("\n"):
                            disasm.append(f"  {line}")
                except Exception:
                    pass
                
                return disasm
            except Exception as e:
                logger.warning(f"Failed to get function info: {e}")
                return [f"Error getting disassembly: {e}"]
        except Exception as e:
            logger.error(f"Failed to get disassembly: {e}")
            raise
            
    def get_hlil(self, file_path=None, function_name=None, function_address=None):
        """Get the high-level IL (decompiled code) of a specific function."""
        try:
            # Use the decompile endpoint
            identifier = function_name if function_name else function_address
            if identifier is None:
                return "No function identifier provided"
                
            # Convert to string if it's not already
            if not isinstance(identifier, str):
                identifier = str(identifier)
                
            try:
                # Call the decompile endpoint
                response = self._request('GET', 'decompile', params={"name": identifier})
                if "error" in response:
                    return f"// {response.get('error')}\n// {response.get('reason', '')}"
                return response.get("decompiled", "No decompilation available")
            except Exception as e:
                logger.warning(f"Failed to get decompilation: {e}")
                return f"// Decompilation failed: {e}"
        except Exception as e:
            logger.error(f"Failed to get HLIL: {e}")
            raise
            
    def get_types(self, file_path=None):
        """Get all types defined in a binary file."""
        try:
            # We don't have direct access to types in the personal license
            # Return a placeholder
            return {}
        except Exception as e:
            logger.error(f"Failed to get types: {e}")
            raise
            
    def get_sections(self, file_path=None):
        """Get all sections in a binary file."""
        try:
            # Get all segments with pagination
            all_segments = []
            offset = 0
            limit = 100
            
            while True:
                response = self._request('GET', 'segments', params={"offset": offset, "limit": limit})
                segments = response.get("segments", [])
                
                if not segments:
                    break
                    
                all_segments.extend(segments)
                
                # If we got fewer segments than the limit, we've reached the end
                if len(segments) < limit:
                    break
                    
                # Move to the next page
                offset += limit
                
            logger.info(f"Retrieved {len(all_segments)} segments in total")
            return all_segments
        except Exception as e:
            logger.error(f"Failed to get sections: {e}")
            raise
            
    def get_strings(self, file_path=None, min_length=4):
        """Get all strings in a binary file."""
        try:
            # We don't have direct access to strings in the personal license
            # Return a placeholder
            return []
        except Exception as e:
            logger.error(f"Failed to get strings: {e}")
            raise
            
    def get_xrefs(self, file_path=None, address=None):
        """Get cross-references to a specific address."""
        try:
            # We don't have direct access to xrefs in the personal license
            # Return a placeholder
            return []
        except Exception as e:
            logger.error(f"Failed to get xrefs: {e}")
            raise
            
    def get_imports(self, offset=0, limit=100):
        """Get list of imported functions."""
        try:
            # Get all imports with pagination
            all_imports = []
            current_offset = 0
            current_limit = limit
            
            while True:
                response = self._request('GET', 'imports', params={"offset": current_offset, "limit": current_limit})
                imports = response.get("imports", [])
                
                if not imports:
                    break
                    
                all_imports.extend(imports)
                
                # If we got fewer imports than the limit, we've reached the end
                if len(imports) < current_limit:
                    break
                    
                # Move to the next page
                current_offset += current_limit
                
            logger.info(f"Retrieved {len(all_imports)} imports in total")
            return all_imports
        except Exception as e:
            logger.error(f"Failed to get imports: {e}")
            raise
            
    def get_exports(self, offset=0, limit=100):
        """Get list of exported symbols."""
        try:
            # Get all exports with pagination
            all_exports = []
            current_offset = 0
            current_limit = limit
            
            while True:
                response = self._request('GET', 'exports', params={"offset": current_offset, "limit": current_limit})
                exports = response.get("exports", [])
                
                if not exports:
                    break
                    
                all_exports.extend(exports)
                
                # If we got fewer exports than the limit, we've reached the end
                if len(exports) < current_limit:
                    break
                    
                # Move to the next page
                current_offset += current_limit
                
            logger.info(f"Retrieved {len(all_exports)} exports in total")
            return all_exports
        except Exception as e:
            logger.error(f"Failed to get exports: {e}")
            raise
            
    def get_namespaces(self, offset=0, limit=100):
        """Get list of C++ namespaces."""
        try:
            # Get all namespaces with pagination
            all_namespaces = []
            current_offset = 0
            current_limit = limit
            
            while True:
                response = self._request('GET', 'namespaces', params={"offset": current_offset, "limit": current_limit})
                namespaces = response.get("namespaces", [])
                
                if not namespaces:
                    break
                    
                all_namespaces.extend(namespaces)
                
                # If we got fewer namespaces than the limit, we've reached the end
                if len(namespaces) < current_limit:
                    break
                    
                # Move to the next page
                current_offset += current_limit
                
            logger.info(f"Retrieved {len(all_namespaces)} namespaces in total")
            return all_namespaces
        except Exception as e:
            logger.error(f"Failed to get namespaces: {e}")
            raise
            
    def get_defined_data(self, offset=0, limit=100):
        """Get list of defined data variables."""
        try:
            # Get all defined data with pagination
            all_data = []
            current_offset = 0
            current_limit = limit
            
            while True:
                response = self._request('GET', 'data', params={"offset": current_offset, "limit": current_limit})
                data_items = response.get("data", [])
                
                if not data_items:
                    break
                    
                all_data.extend(data_items)
                
                # If we got fewer data items than the limit, we've reached the end
                if len(data_items) < current_limit:
                    break
                    
                # Move to the next page
                current_offset += current_limit
                
            logger.info(f"Retrieved {len(all_data)} data items in total")
            return all_data
        except Exception as e:
            logger.error(f"Failed to get defined data: {e}")
            raise
            
    def search_functions(self, query, offset=0, limit=100):
        """Search functions by name."""
        try:
            # Get all matching functions with pagination
            all_matches = []
            current_offset = 0
            current_limit = limit
            
            while True:
                response = self._request('GET', 'searchFunctions', params={"query": query, "offset": current_offset, "limit": current_limit})
                matches = response.get("matches", [])
                
                if not matches:
                    break
                    
                all_matches.extend(matches)
                
                # If we got fewer matches than the limit, we've reached the end
                if len(matches) < current_limit:
                    break
                    
                # Move to the next page
                current_offset += current_limit
                
            logger.info(f"Retrieved {len(all_matches)} matching functions in total")
            return all_matches
        except Exception as e:
            logger.error(f"Failed to search functions: {e}")
            raise
            
    def load_binary(self, file_path):
        """Load a binary file."""
        try:
            response = self._request('POST', 'load', data={"filepath": file_path})
            return response
        except Exception as e:
            logger.error(f"Failed to load binary: {e}")
            raise
            
    def rename_function(self, old_name, new_name):
        """Rename a function."""
        try:
            response = self._request('POST', 'rename/function', data={"oldName": old_name, "newName": new_name})
            return response.get("success", False)
        except Exception as e:
            logger.error(f"Failed to rename function: {e}")
            raise
            
    def rename_data(self, address, new_name):
        """Rename a data variable."""
        try:
            response = self._request('POST', 'rename/data', data={"address": address, "newName": new_name})
            return response.get("success", False)
        except Exception as e:
            logger.error(f"Failed to rename data: {e}")
            raise

# Example usage
if __name__ == "__main__":
    import sys
    
    if len(sys.argv) < 2:
        print(f"Usage: {sys.argv[0]} <path_to_binary>")
        sys.exit(1)
        
    binary_path = sys.argv[1]
    client = BinaryNinjaHTTPClient()
    
    # Test the connection
    ping_result = client.ping()
    print(f"Connection status: {ping_result['status']}")
    if ping_result['status'] == 'connected':
        print(f"Binary file loaded: {ping_result.get('loaded', False)}")
        
        # Get file info
        file_info = client.get_file_info(binary_path)
        print(f"\nFile info: {json.dumps(file_info, indent=2)}")
        
        # List functions
        functions = client.list_functions(binary_path)
        print(f"\nFound {len(functions)} functions")
        for i, func in enumerate(functions[:5]):  # Show only first 5 functions
            print(f"{i+1}. {func['name']} at {hex(func['start'])}")
            
        # Get disassembly of the first function
        if functions:
            func = functions[0]
            disasm = client.get_disassembly(binary_path, function_name=func['name'])
            print(f"\nDisassembly of {func['name']}:")
            for line in disasm[:10]:  # Show only first 10 lines
                print(line)

```

--------------------------------------------------------------------------------
/client.ts:
--------------------------------------------------------------------------------

```typescript
/**
 * Binary Ninja MCP Client
 * 
 * This is a TypeScript client for the Binary Ninja MCP server.
 * It demonstrates how to interact with the server using the Model Context Protocol.
 * 
 * The client supports both raw binary files and Binary Ninja database files (.bndb).
 * Binary Ninja database files contain analysis results, annotations, and other information
 * that can speed up analysis and provide more accurate results.
 */

import { spawn, ChildProcess } from 'child_process';
import * as readline from 'readline';

interface McpRequest {
  id: number;
  method: string;
  params?: Record<string, any>;
}

interface McpResponse {
  id: number;
  result?: any;
  error?: string;
  traceback?: string;
}

class BinaryNinjaClient {
  private serverProcess: ChildProcess | null = null;
  private rl: readline.Interface | null = null;
  private requestId = 1;
  private pendingRequests: Map<number, { resolve: Function; reject: Function }> = new Map();

  /**
   * Start the Binary Ninja MCP server
   */
  async start(serverPath: string): Promise<void> {
    return new Promise((resolve, reject) => {
      try {
        this.serverProcess = spawn('python3', [serverPath], {
          stdio: ['pipe', 'pipe', 'pipe']
        });

        this.rl = readline.createInterface({
          input: this.serverProcess.stdout!,
          terminal: false
        });

        this.rl.on('line', (line) => {
          try {
            const response = JSON.parse(line) as McpResponse;
            const pending = this.pendingRequests.get(response.id);
            
            if (pending) {
              if (response.error) {
                pending.reject(new Error(`${response.error}\n${response.traceback || ''}`));
              } else {
                pending.resolve(response.result);
              }
              this.pendingRequests.delete(response.id);
            }
          } catch (err) {
            console.error('Error parsing server response:', err);
          }
        });

        this.serverProcess.stderr!.on('data', (data) => {
          console.error(`Server error: ${data.toString()}`);
        });

        this.serverProcess.on('close', (code) => {
          console.log(`Server process exited with code ${code}`);
          this.cleanup();
        });

        // Test the connection with a ping
        this.sendRequest('ping')
          .then(() => resolve())
          .catch(reject);
      } catch (err) {
        reject(err);
      }
    });
  }

  /**
   * Send a request to the Binary Ninja MCP server with retry capability
   */
  async sendRequest(
    method: string, 
    params?: Record<string, any>, 
    options: { maxRetries?: number, retryDelay?: number } = {}
  ): Promise<any> {
    if (!this.serverProcess || !this.rl) {
      throw new Error('Server not started');
    }

    const maxRetries = options.maxRetries ?? 3;
    const retryDelay = options.retryDelay ?? 1000;
    let lastError: Error | null = null;

    for (let attempt = 0; attempt <= maxRetries; attempt++) {
      try {
        return await new Promise((resolve, reject) => {
          const id = this.requestId++;
          const request: McpRequest = { id, method, params };
          
          // Set a timeout to handle cases where the server doesn't respond
          const timeoutId = setTimeout(() => {
            if (this.pendingRequests.has(id)) {
              this.pendingRequests.delete(id);
              reject(new Error(`Request timed out after 30 seconds: ${method}`));
            }
          }, 30000); // 30 second timeout
          
          this.pendingRequests.set(id, { 
            resolve: (value: any) => {
              clearTimeout(timeoutId);
              resolve(value);
            }, 
            reject: (error: any) => {
              clearTimeout(timeoutId);
              reject(error);
            }
          });
          
          this.serverProcess!.stdin!.write(JSON.stringify(request) + '\n');
        });
      } catch (error: unknown) {
        const err = error instanceof Error ? error : new Error(String(error));
        lastError = err;
        
        // If this was the last retry, throw the error
        if (attempt === maxRetries) {
          throw err;
        }
        
        // Log the retry attempt
        console.error(`Request failed (attempt ${attempt + 1}/${maxRetries + 1}): ${err.message}`);
        console.error(`Retrying in ${retryDelay}ms...`);
        
        // Wait before retrying
        await new Promise(resolve => setTimeout(resolve, retryDelay));
      }
    }

    // This should never be reached due to the throw in the loop, but TypeScript doesn't know that
    throw lastError || new Error('Unknown error');
  }

  /**
   * Stop the Binary Ninja MCP server
   */
  stop(): void {
    this.cleanup();
  }

  private cleanup(): void {
    if (this.rl) {
      this.rl.close();
      this.rl = null;
    }

    if (this.serverProcess) {
      this.serverProcess.kill();
      this.serverProcess = null;
    }

    // Reject any pending requests
    for (const [id, { reject }] of this.pendingRequests) {
      reject(new Error('Server connection closed'));
      this.pendingRequests.delete(id);
    }
  }

  /**
   * Get information about a binary file
   */
  async getBinaryInfo(path: string): Promise<any> {
    return this.sendRequest('get_binary_info', { path });
  }

  /**
   * List all functions in a binary file
   */
  async listFunctions(path: string): Promise<string[]> {
    return this.sendRequest('list_functions', { path });
  }

  /**
   * Get detailed information about a specific function
   */
  async getFunction(path: string, functionName: string): Promise<any> {
    return this.sendRequest('get_function', { path, function: functionName });
  }

  /**
   * Disassemble a function in a binary file
   */
  async disassembleFunction(path: string, functionName: string): Promise<string[]> {
    return this.sendRequest('disassemble_function', { path, function: functionName });
  }

  /**
   * List all sections/segments in a binary file
   */
  async listSections(path: string): Promise<any[]> {
    return this.sendRequest('list_sections', { path });
  }

  /**
   * List all imported functions in a binary file
   */
  async listImports(path: string): Promise<any[]> {
    return this.sendRequest('list_imports', { path });
  }

  /**
   * List all exported symbols in a binary file
   */
  async listExports(path: string): Promise<any[]> {
    return this.sendRequest('list_exports', { path });
  }

  /**
   * List all C++ namespaces in a binary file
   */
  async listNamespaces(path: string): Promise<string[]> {
    return this.sendRequest('list_namespaces', { path });
  }

  /**
   * List all defined data variables in a binary file
   */
  async listData(path: string): Promise<any[]> {
    return this.sendRequest('list_data', { path });
  }

  /**
   * Search for functions by name
   */
  async searchFunctions(path: string, query: string): Promise<any[]> {
    return this.sendRequest('search_functions', { path, query });
  }

  /**
   * Rename a function
   */
  async renameFunction(path: string, oldName: string, newName: string): Promise<any> {
    return this.sendRequest('rename_function', { path, old_name: oldName, new_name: newName });
  }

  /**
   * Rename a data variable
   */
  async renameData(path: string, address: string, newName: string): Promise<any> {
    return this.sendRequest('rename_data', { path, address, new_name: newName });
  }

  /**
   * Get cross-references to a function in a binary file
   */
  async getXrefs(path: string, functionName: string): Promise<any[]> {
    return this.sendRequest('get_xrefs', { path, function: functionName });
  }

  /**
   * Get the control flow graph of a function in a binary file
   */
  async getFunctionGraph(path: string, functionName: string): Promise<any[]> {
    return this.sendRequest('get_function_graph', { path, function: functionName });
  }

  /**
   * Get strings from a binary file
   */
  async getStrings(path: string, minLength: number = 4): Promise<any[]> {
    return this.sendRequest('get_strings', { path, min_length: minLength });
  }

  /**
   * Decompile a function to C code
   */
  async decompileFunction(path: string, functionName: string): Promise<any> {
    return this.sendRequest('decompile_function', { path, function: functionName });
  }

  /**
   * Extract data structures and types from a binary file
   */
  async getTypes(path: string): Promise<any[]> {
    return this.sendRequest('get_types', { path });
  }

  /**
   * Generate a header file with function prototypes and type definitions
   */
  async generateHeader(path: string, outputPath?: string, includeFunctions: boolean = true, includeTypes: boolean = true): Promise<string> {
    return this.sendRequest('generate_header', { 
      path, 
      output_path: outputPath,
      include_functions: includeFunctions,
      include_types: includeTypes
    });
  }

  /**
   * Generate a source file with function implementations
   */
  async generateSource(path: string, outputPath?: string, headerPath: string = 'generated_header.h'): Promise<string> {
    return this.sendRequest('generate_source', { 
      path, 
      output_path: outputPath,
      header_path: headerPath
    });
  }

  /**
   * Rebuild an entire driver module from a binary file
   */
  async rebuildDriver(path: string, outputDir: string): Promise<any> {
    return this.sendRequest('rebuild_driver', { 
      path, 
      output_dir: outputDir
    });
  }

  /**
   * Analyze a binary file and generate a comprehensive report
   */
  async analyzeFile(path: string, outputPath?: string): Promise<any> {
    // This is a higher-level function that combines multiple API calls
    // to generate a comprehensive analysis report
    const report: any = {
      file_info: await this.getBinaryInfo(path),
      sections: await this.listSections(path),
      functions: [],
      imports: await this.listImports(path),
      exports: await this.listExports(path),
      namespaces: await this.listNamespaces(path),
      data_variables: await this.listData(path),
      timestamp: new Date().toISOString()
    };

    // Get detailed information for the first 10 functions
    const functionNames = await this.listFunctions(path);
    report.function_count = functionNames.length;
    
    const sampleFunctions = functionNames.slice(0, 10);
    for (const funcName of sampleFunctions) {
      try {
        const funcInfo = await this.getFunction(path, funcName);
        const decompiled = await this.decompileFunction(path, funcName);
        report.functions.push({
          ...funcInfo,
          decompiled: decompiled
        });
      } catch (err) {
        console.error(`Error analyzing function ${funcName}: ${err}`);
      }
    }

    // Save the report to a file if outputPath is provided
    if (outputPath) {
      const fs = require('fs');
      const reportJson = JSON.stringify(report, null, 2);
      fs.writeFileSync(outputPath, reportJson);
    }

    return report;
  }

  /**
   * Find potential vulnerabilities in a binary file
   */
  async findVulnerabilities(path: string): Promise<any[]> {
    // This is a higher-level function that analyzes the binary for potential vulnerabilities
    const vulnerabilities: any[] = [];
    
    try {
      // Get all functions
      const functionNames = await this.listFunctions(path);
      
      // Look for potentially vulnerable functions
      const dangerousFunctions = [
        'strcpy', 'strcat', 'sprintf', 'gets', 'memcpy', 'system',
        'exec', 'popen', 'scanf', 'malloc', 'free', 'realloc'
      ];
      
      // Search for each dangerous function
      for (const dangerFunc of dangerousFunctions) {
        try {
          const matches = await this.searchFunctions(path, dangerFunc);
          
          for (const match of matches) {
            // Get more details about the function
            const decompiled = await this.decompileFunction(path, match.name);
            
            vulnerabilities.push({
              type: 'dangerous_function',
              function_name: match.name,
              dangerous_call: dangerFunc,
              address: match.address,
              decompiled: decompiled
            });
          }
        } catch (err) {
          console.error(`Error searching for ${dangerFunc}: ${err}`);
        }
      }
      
      // Look for string format vulnerabilities
      try {
        const printfMatches = await this.searchFunctions(path, 'printf');
        for (const match of printfMatches) {
          const decompiled = await this.decompileFunction(path, match.name);
          
          // Simple heuristic: if printf is called with a variable as first argument
          if (decompiled && decompiled.includes('printf(') && !decompiled.includes('printf("%')) {
            vulnerabilities.push({
              type: 'format_string',
              function_name: match.name,
              address: match.address,
              decompiled: decompiled
            });
          }
        }
      } catch (err) {
        console.error(`Error analyzing format string vulnerabilities: ${err}`);
      }
    } catch (err) {
      console.error(`Error finding vulnerabilities: ${err}`);
    }
    
    return vulnerabilities;
  }

  /**
   * Compare two binary files and identify differences
   */
  async compareBinaries(path1: string, path2: string): Promise<any> {
    // This is a higher-level function that compares two binaries
    const comparison: any = {
      file1: await this.getBinaryInfo(path1),
      file2: await this.getBinaryInfo(path2),
      differences: {
        functions: {
          only_in_file1: [],
          only_in_file2: [],
          modified: []
        },
        sections: {
          only_in_file1: [],
          only_in_file2: [],
          modified: []
        }
      }
    };
    
    // Compare functions
    const functions1 = await this.listFunctions(path1);
    const functions2 = await this.listFunctions(path2);
    
    // Find functions only in file1
    for (const func of functions1) {
      if (!functions2.includes(func)) {
        comparison.differences.functions.only_in_file1.push(func);
      }
    }
    
    // Find functions only in file2
    for (const func of functions2) {
      if (!functions1.includes(func)) {
        comparison.differences.functions.only_in_file2.push(func);
      }
    }
    
    // Compare common functions
    const commonFunctions = functions1.filter(f => functions2.includes(f));
    for (const func of commonFunctions) {
      try {
        const decompiled1 = await this.decompileFunction(path1, func);
        const decompiled2 = await this.decompileFunction(path2, func);
        
        if (decompiled1 !== decompiled2) {
          comparison.differences.functions.modified.push({
            name: func,
            file1_code: decompiled1,
            file2_code: decompiled2
          });
        }
      } catch (err) {
        console.error(`Error comparing function ${func}: ${err}`);
      }
    }
    
    // Compare sections
    const sections1 = await this.listSections(path1);
    const sections2 = await this.listSections(path2);
    
    const sectionNames1 = sections1.map(s => s.name);
    const sectionNames2 = sections2.map(s => s.name);
    
    // Find sections only in file1
    for (const section of sections1) {
      if (!sectionNames2.includes(section.name)) {
        comparison.differences.sections.only_in_file1.push(section);
      }
    }
    
    // Find sections only in file2
    for (const section of sections2) {
      if (!sectionNames1.includes(section.name)) {
        comparison.differences.sections.only_in_file2.push(section);
      }
    }
    
    // Compare common sections
    const commonSectionNames = sectionNames1.filter(s => sectionNames2.includes(s));
    for (const sectionName of commonSectionNames) {
      const section1 = sections1.find(s => s.name === sectionName);
      const section2 = sections2.find(s => s.name === sectionName);
      
      if (section1.size !== section2.size || section1.start !== section2.start) {
        comparison.differences.sections.modified.push({
          name: sectionName,
          file1_section: section1,
          file2_section: section2
        });
      }
    }
    
    return comparison;
  }
}

// Example usage
async function main() {
  if (process.argv.length < 3) {
    console.error('Usage: ts-node client.ts <path_to_binary>');
    process.exit(1);
  }

  const binaryPath = process.argv[2];
  const client = new BinaryNinjaClient();

  try {
    // Start the server
    await client.start('/home/matteius/Documents/Cline/MCP/bn-mcp/binaryninja_server.py');
    console.log('Connected to Binary Ninja MCP server');

    // Get binary information
    console.log('\n=== Binary Information ===');
    const info = await client.getBinaryInfo(binaryPath);
    console.log(`Filename: ${info.filename}`);
    console.log(`Architecture: ${info.architecture}`);
    console.log(`Platform: ${info.platform}`);
    console.log(`Entry Point: ${info.entry_point}`);
    console.log(`File Size: ${info.file_size} bytes`);
    console.log(`Executable: ${info.is_executable}`);
    console.log(`Relocatable: ${info.is_relocatable}`);
    console.log(`Address Size: ${info.address_size} bits`);

    // List sections
    console.log('\n=== Sections ===');
    const sections = await client.listSections(binaryPath);
    for (const section of sections) {
      console.log(`${section.name}: ${section.start} - ${section.end} (${section.size} bytes) [${section.semantics}]`);
    }

    // List functions
    console.log('\n=== Functions ===');
    const functions = await client.listFunctions(binaryPath);
    for (let i = 0; i < Math.min(functions.length, 10); i++) {
      console.log(`${i+1}. ${functions[i]}`);
    }
    if (functions.length > 10) {
      console.log(`... and ${functions.length - 10} more functions`);
    }

    // If there are functions, disassemble the first one
    if (functions.length > 0) {
      const funcName = functions[0];
      console.log(`\n=== Disassembly of '${funcName}' ===`);
      const disasm = await client.disassembleFunction(binaryPath, funcName);
      for (let i = 0; i < disasm.length; i++) {
        console.log(`${i+1}. ${disasm[i]}`);
      }

      // Get cross-references to this function
      console.log(`\n=== Cross-references to '${funcName}' ===`);
      const xrefs = await client.getXrefs(binaryPath, funcName);
      if (xrefs.length > 0) {
        for (const xref of xrefs) {
          console.log(`From: ${xref.from_function} at ${xref.from_address} to ${xref.to_address}`);
        }
      } else {
        console.log('No cross-references found');
      }
    }

    // Get strings
    console.log('\n=== Strings ===');
    const strings = await client.getStrings(binaryPath, 5);
    for (let i = 0; i < Math.min(strings.length, 10); i++) {
      console.log(`${i+1}. ${strings[i].address}: '${strings[i].value}'`);
    }
    if (strings.length > 10) {
      console.log(`... and ${strings.length - 10} more strings`);
    }

    // Source Code Reconstruction
    if (process.argv.length > 3) {
      const outputDir = process.argv[3];
      const fs = require('fs');
      const path = require('path');
      
      // Create output directory
      if (!fs.existsSync(outputDir)) {
        fs.mkdirSync(outputDir, { recursive: true });
      }
      
      // Decompile the first function
      if (functions.length > 0) {
        const funcName = functions[0];
        console.log(`\n=== Decompiled C Code for '${funcName}' ===`);
        try {
          const decompiled = await client.decompileFunction(binaryPath, funcName);
          console.log(`Function: ${decompiled.name}`);
          console.log(`Signature: ${decompiled.signature}`);
          console.log(`Address: ${decompiled.address}`);
          console.log("\nDecompiled Code:");
          console.log(decompiled.decompiled_code);
          
          // Save decompiled code to file
          const decompilePath = path.join(outputDir, `${funcName}.c`);
          fs.writeFileSync(decompilePath, 
            `// Decompiled function: ${funcName}\n` +
            `// Address: ${decompiled.address}\n\n` +
            decompiled.decompiled_code
          );
          console.log(`Saved decompiled code to ${decompilePath}`);
        } catch (err) {
          console.error(`Error decompiling function: ${err}`);
        }
      }
      
      // Extract types
      console.log("\n=== Data Types ===");
      try {
        const types = await client.getTypes(binaryPath);
        console.log(`Found ${types.length} types`);
        
        // Show first 5 types
        for (let i = 0; i < Math.min(types.length, 5); i++) {
          const type = types[i];
          console.log(`\n${i+1}. ${type.name} (${type.type_class})`);
          if (type.type_class === 'structure') {
            console.log(`   Size: ${type.size} bytes`);
            console.log("   Members:");
            for (const member of type.members) {
              console.log(`     - ${member.name}: ${member.type} (offset: ${member.offset})`);
            }
          }
        }
        
        if (types.length > 5) {
          console.log(`... and ${types.length - 5} more types`);
        }
        
        // Save types to file
        const typesPath = path.join(outputDir, "types.json");
        fs.writeFileSync(typesPath, JSON.stringify(types, null, 2));
        console.log(`Saved types to ${typesPath}`);
      } catch (err) {
        console.error(`Error getting types: ${err}`);
      }
      
      // Generate header file
      console.log("\n=== Generated Header File ===");
      try {
        const headerPath = path.join(outputDir, "generated_header.h");
        const headerContent = await client.generateHeader(binaryPath, headerPath);
        console.log(`Generated header file saved to ${headerPath}`);
        console.log("\nFirst 10 lines:");
        const headerLines = headerContent.split("\n");
        for (let i = 0; i < Math.min(headerLines.length, 10); i++) {
          console.log(headerLines[i]);
        }
        console.log("...");
      } catch (err) {
        console.error(`Error generating header: ${err}`);
      }
      
      // Generate source file
      console.log("\n=== Generated Source File ===");
      try {
        const sourcePath = path.join(outputDir, "generated_source.c");
        const sourceContent = await client.generateSource(binaryPath, sourcePath, "generated_header.h");
        console.log(`Generated source file saved to ${sourcePath}`);
        console.log("\nFirst 10 lines:");
        const sourceLines = sourceContent.split("\n");
        for (let i = 0; i < Math.min(sourceLines.length, 10); i++) {
          console.log(sourceLines[i]);
        }
        console.log("...");
      } catch (err) {
        console.error(`Error generating source: ${err}`);
      }
      
      // Rebuild driver (if it's a driver module)
      if (binaryPath.endsWith(".ko") || binaryPath.toLowerCase().includes("driver") || binaryPath.toLowerCase().includes("module")) {
        console.log("\n=== Rebuilding Driver Module ===");
        try {
          const driverDir = path.join(outputDir, "driver");
          const result = await client.rebuildDriver(binaryPath, driverDir);
          console.log("Driver module rebuilt successfully!");
          console.log(`Header file: ${result.header_file}`);
          console.log(`Source files: ${result.source_files.length} files generated`);
          console.log(`Makefile: ${result.makefile}`);
          console.log(`\nTo build the driver, run:`);
          console.log(`cd ${driverDir} && make`);
        } catch (err) {
          console.error(`Error rebuilding driver: ${err}`);
        }
      }
    } else {
      console.log("\nTo see source code reconstruction examples, provide an output directory:");
      console.log(`ts-node client.ts ${binaryPath} /path/to/output/dir`);
    }
  } catch (err) {
    console.error('Error:', err);
  } finally {
    // Stop the server
    client.stop();
  }
}

// Run the example if this file is executed directly
if (require.main === module) {
  main().catch(console.error);
}

export { BinaryNinjaClient };

```

--------------------------------------------------------------------------------
/binaryninja_mcp_http_server.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
import sys
import json
import traceback
import os
import logging
import argparse
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse
import time
from socketserver import ThreadingMixIn
import requests

# Assuming this is your BinaryNinja client implementation
# You would need to adjust this import to match your actual implementation
from binaryninja_http_client import BinaryNinjaHTTPClient

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("BinaryNinjaMCP")

# Define the MCP tools - note the use of "path" instead of "file"
MCP_TOOLS = [
    {
        "name": "get_binary_info",
        "description": "Get binary metadata",
        "streaming": False,
        "inputSchema": {
            "type": "object",
            "properties": {"path": {"type": "string"}},
            "required": ["path"]
        }
    },
    {
        "name": "list_functions",
        "description": "List functions",
        "streaming": False,
        "inputSchema": {
            "type": "object",
            "properties": {"path": {"type": "string"}},
            "required": ["path"]
        }
    },
    {
        "name": "get_function",
        "description": "Get information about a specific function",
        "streaming": False,
        "inputSchema": {
            "type": "object",
            "properties": {
                "path": {"type": "string"},
                "function": {"type": "string"}
            },
            "required": ["path", "function"]
        }
    },
    {
        "name": "disassemble_function",
        "description": "Disassemble function",
        "streaming": False,
        "inputSchema": {
            "type": "object",
            "properties": {
                "path": {"type": "string"},
                "function": {"type": "string"}
            },
            "required": ["path", "function"]
        }
    },
    {
        "name": "decompile_function",
        "description": "Decompile to C",
        "streaming": False,
        "inputSchema": {
            "type": "object",
            "properties": {
                "path": {"type": "string"},
                "function": {"type": "string"}
            },
            "required": ["path", "function"]
        }
    },
    {
        "name": "list_sections",
        "description": "List sections/segments in the binary",
        "streaming": False,
        "inputSchema": {
            "type": "object",
            "properties": {"path": {"type": "string"}},
            "required": ["path"]
        }
    },
    {
        "name": "list_imports",
        "description": "List imported functions",
        "streaming": False,
        "inputSchema": {
            "type": "object",
            "properties": {"path": {"type": "string"}},
            "required": ["path"]
        }
    },
    {
        "name": "list_exports",
        "description": "List exported symbols",
        "streaming": False,
        "inputSchema": {
            "type": "object",
            "properties": {"path": {"type": "string"}},
            "required": ["path"]
        }
    },
    {
        "name": "list_namespaces",
        "description": "List C++ namespaces",
        "streaming": False,
        "inputSchema": {
            "type": "object",
            "properties": {"path": {"type": "string"}},
            "required": ["path"]
        }
    },
    {
        "name": "list_data",
        "description": "List defined data variables",
        "streaming": False,
        "inputSchema": {
            "type": "object",
            "properties": {"path": {"type": "string"}},
            "required": ["path"]
        }
    },
    {
        "name": "search_functions",
        "description": "Search functions by name",
        "streaming": False,
        "inputSchema": {
            "type": "object",
            "properties": {
                "path": {"type": "string"},
                "query": {"type": "string"}
            },
            "required": ["path", "query"]
        }
    },
    {
        "name": "rename_function",
        "description": "Rename a function",
        "streaming": False,
        "inputSchema": {
            "type": "object",
            "properties": {
                "path": {"type": "string"},
                "old_name": {"type": "string"},
                "new_name": {"type": "string"}
            },
            "required": ["path", "old_name", "new_name"]
        }
    },
    {
        "name": "rename_data",
        "description": "Rename a data variable",
        "streaming": False,
        "inputSchema": {
            "type": "object",
            "properties": {
                "path": {"type": "string"},
                "address": {"type": "string"},
                "new_name": {"type": "string"}
            },
            "required": ["path", "address", "new_name"]
        }
    }
]

class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
    daemon_threads = True

class BinaryNinjaMCPHandler(BaseHTTPRequestHandler):
    def __init__(self, *args, **kwargs):
        self.client = BinaryNinjaHTTPClient()
        super().__init__(*args, **kwargs)

    def _set_headers(self, content_type='application/json', status_code=200):
        self.send_response(status_code)
        self.send_header('Content-Type', content_type)
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
        self.end_headers()

    def do_OPTIONS(self):
        self._set_headers()

    def log_message(self, format, *args):
        logger.info(format % args)

    def do_GET(self):
        logger.debug(f"GET request received: {self.path}")
        logger.debug(f"Headers: {dict(self.headers)}")
        
        parsed = urlparse(self.path)
        if parsed.path == '/':
            if 'text/event-stream' in self.headers.get('Accept', ''):
                self.send_response(200)
                self.send_header('Content-Type', 'text/event-stream')
                self.send_header('Cache-Control', 'no-cache')
                self.send_header('Connection', 'keep-alive')
                self.send_header('Access-Control-Allow-Origin', '*')
                self.end_headers()
                try:
                    logger.debug("Starting SSE connection")
                    self.wfile.write(b"event: connected\ndata: {\"status\": \"ready\"}\n\n")
                    self.wfile.flush()
                    
                    # Use a shorter heartbeat interval
                    heartbeat_interval = 10  # seconds
                    
                    while True:
                        heartbeat = {
                            "jsonrpc": "2.0",
                            "method": "heartbeat",
                            "params": {"timestamp": int(time.time())}
                        }
                        msg = f"event: mcp-event\ndata: {json.dumps(heartbeat)}\n\n"
                        logger.debug(f"Sending heartbeat: {msg}")
                        self.wfile.write(msg.encode())
                        self.wfile.flush()
                        time.sleep(heartbeat_interval)
                except Exception as e:
                    logger.warning(f"SSE error: {e}")
            else:
                response = {
                    "jsonrpc": "2.0",
                    "id": "root-list",
                    "result": {
                        "name": "binaryninja-mcp",
                        "version": "0.1.0",
                        "tools": MCP_TOOLS
                    }
                }
                self._set_headers()
                response_str = json.dumps(response)
                logger.debug(f"Returning tool list: {response_str[:100]}...")
                self.wfile.write(response_str.encode())
        elif parsed.path == '/ping':
            self._set_headers()
            self.wfile.write(json.dumps({"status": "ok"}).encode())
        else:
            self._set_headers(status_code=404)
            self.wfile.write(json.dumps({"error": "Not found"}).encode())

    def do_POST(self):
        try:
            content_length = int(self.headers.get('Content-Length', 0))
            raw_data = self.rfile.read(content_length).decode('utf-8')
            
            logger.debug(f"POST received: {raw_data[:200]}...")
            
            request = json.loads(raw_data)
            response = self._handle_mcp_request(request)
            
            self._set_headers()
            response_str = json.dumps(response)
            logger.debug(f"Responding with: {response_str[:200]}...")
            self.wfile.write(response_str.encode())
        except Exception as e:
            logger.error(f"POST error: {e}")
            logger.error(traceback.format_exc())
            self._set_headers(status_code=500)
            self.wfile.write(json.dumps({
                "jsonrpc": "2.0",
                "id": None,
                "error": {"code": -32603, "message": str(e)}
            }).encode())

    def _wrap_result(self, request_id, text):
        return {
            "jsonrpc": "2.0",
            "id": request_id,
            "result": {
                "content": [{"type": "text", "text": text}],
                "isError": False
            }
        }

    def _handle_mcp_request(self, request):
        request_id = request.get("id")
        method = request.get("method")
        params = request.get("params", {})

        logger.debug(f"Handling MCP request: id={request_id}, method={method}, params={params}")

        try:
            if method == "list_tools":
                return {
                    "jsonrpc": "2.0", "id": request_id,
                    "result": {"tools": MCP_TOOLS}
                }
            elif method == "call_tool":
                name = params.get("name")
                args = params.get("arguments", {})
                return self._handle_mcp_request({"jsonrpc": "2.0", "id": request_id, "method": name, "params": args})

            elif method == "get_binary_info":
                path = params.get("path")
                if not path:
                    logger.error("Missing 'path' parameter")
                    return self._error_response(request_id, -32602, "Missing 'path' parameter")
                if not isinstance(path, str):
                    logger.error(f"Invalid path type: {type(path)}")
                    return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
                
                logger.debug(f"Getting info for file: {path}")
                info = self.client.get_file_info(path)
                return self._wrap_result(request_id, json.dumps(info, indent=2))

            elif method == "list_functions":
                path = params.get("path")
                if not path:
                    logger.error("Missing 'path' parameter")
                    return self._error_response(request_id, -32602, "Missing 'path' parameter")
                if not isinstance(path, str):
                    logger.error(f"Invalid path type: {type(path)}")
                    return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
                
                logger.debug(f"Listing functions for file: {path}")
                funcs = self.client.list_functions(path)
                return self._wrap_result(request_id, json.dumps([f["name"] for f in funcs], indent=2))

            elif method == "disassemble_function":
                path = params.get("path")
                func = params.get("function")
                if not path:
                    logger.error("Missing 'path' parameter")
                    return self._error_response(request_id, -32602, "Missing 'path' parameter")
                if not func:
                    logger.error("Missing 'function' parameter")
                    return self._error_response(request_id, -32602, "Missing 'function' parameter")
                if not isinstance(path, str):
                    logger.error(f"Invalid path type: {type(path)}")
                    return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
                if not isinstance(func, str):
                    logger.error(f"Invalid function type: {type(func)}")
                    return self._error_response(request_id, -32602, "Parameter 'function' must be a string")
                
                logger.debug(f"Disassembling function {func} in file: {path}")
                code = self.client.get_disassembly(path, function_name=func)
                return self._wrap_result(request_id, "\n".join(code))

            elif method == "decompile_function":
                path = params.get("path")
                func = params.get("function")
                if not path:
                    logger.error("Missing 'path' parameter")
                    return self._error_response(request_id, -32602, "Missing 'path' parameter")
                if not func:
                    logger.error("Missing 'function' parameter")
                    return self._error_response(request_id, -32602, "Missing 'function' parameter")
                if not isinstance(path, str):
                    logger.error(f"Invalid path type: {type(path)}")
                    return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
                if not isinstance(func, str):
                    logger.error(f"Invalid function type: {type(func)}")
                    return self._error_response(request_id, -32602, "Parameter 'function' must be a string")
                
                logger.debug(f"Decompiling function {func} in file: {path}")
                hlil = self.client.get_hlil(path, function_name=func)
                return self._wrap_result(request_id, "\n".join(hlil) if isinstance(hlil, list) else str(hlil))

            elif method == "get_function":
                path = params.get("path")
                func = params.get("function")
                if not path:
                    logger.error("Missing 'path' parameter")
                    return self._error_response(request_id, -32602, "Missing 'path' parameter")
                if not func:
                    logger.error("Missing 'function' parameter")
                    return self._error_response(request_id, -32602, "Missing 'function' parameter")
                if not isinstance(path, str):
                    logger.error(f"Invalid path type: {type(path)}")
                    return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
                if not isinstance(func, str):
                    logger.error(f"Invalid function type: {type(func)}")
                    return self._error_response(request_id, -32602, "Parameter 'function' must be a string")
                
                logger.debug(f"Getting function info for {func} in file: {path}")
                func_info = self.client.get_function(path, function_name=func)
                if func_info:
                    return self._wrap_result(request_id, json.dumps(func_info, indent=2))
                else:
                    return self._error_response(request_id, -32602, f"Function '{func}' not found")

            elif method == "list_sections":
                path = params.get("path")
                if not path:
                    logger.error("Missing 'path' parameter")
                    return self._error_response(request_id, -32602, "Missing 'path' parameter")
                if not isinstance(path, str):
                    logger.error(f"Invalid path type: {type(path)}")
                    return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
                
                logger.debug(f"Listing sections for file: {path}")
                sections = self.client.get_sections(path)
                return self._wrap_result(request_id, json.dumps(sections, indent=2))

            elif method == "list_imports":
                path = params.get("path")
                if not path:
                    logger.error("Missing 'path' parameter")
                    return self._error_response(request_id, -32602, "Missing 'path' parameter")
                if not isinstance(path, str):
                    logger.error(f"Invalid path type: {type(path)}")
                    return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
                
                logger.debug(f"Listing imports for file: {path}")
                imports = self.client.get_imports()
                return self._wrap_result(request_id, json.dumps(imports, indent=2))

            elif method == "list_exports":
                path = params.get("path")
                if not path:
                    logger.error("Missing 'path' parameter")
                    return self._error_response(request_id, -32602, "Missing 'path' parameter")
                if not isinstance(path, str):
                    logger.error(f"Invalid path type: {type(path)}")
                    return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
                
                logger.debug(f"Listing exports for file: {path}")
                exports = self.client.get_exports()
                return self._wrap_result(request_id, json.dumps(exports, indent=2))

            elif method == "list_namespaces":
                path = params.get("path")
                if not path:
                    logger.error("Missing 'path' parameter")
                    return self._error_response(request_id, -32602, "Missing 'path' parameter")
                if not isinstance(path, str):
                    logger.error(f"Invalid path type: {type(path)}")
                    return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
                
                logger.debug(f"Listing namespaces for file: {path}")
                namespaces = self.client.get_namespaces()
                return self._wrap_result(request_id, json.dumps(namespaces, indent=2))

            elif method == "list_data":
                path = params.get("path")
                if not path:
                    logger.error("Missing 'path' parameter")
                    return self._error_response(request_id, -32602, "Missing 'path' parameter")
                if not isinstance(path, str):
                    logger.error(f"Invalid path type: {type(path)}")
                    return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
                
                logger.debug(f"Listing data variables for file: {path}")
                data_items = self.client.get_defined_data()
                return self._wrap_result(request_id, json.dumps(data_items, indent=2))

            elif method == "search_functions":
                path = params.get("path")
                query = params.get("query")
                if not path:
                    logger.error("Missing 'path' parameter")
                    return self._error_response(request_id, -32602, "Missing 'path' parameter")
                if not query:
                    logger.error("Missing 'query' parameter")
                    return self._error_response(request_id, -32602, "Missing 'query' parameter")
                if not isinstance(path, str):
                    logger.error(f"Invalid path type: {type(path)}")
                    return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
                if not isinstance(query, str):
                    logger.error(f"Invalid query type: {type(query)}")
                    return self._error_response(request_id, -32602, "Parameter 'query' must be a string")
                
                logger.debug(f"Searching functions with query '{query}' in file: {path}")
                matches = self.client.search_functions(query)
                return self._wrap_result(request_id, json.dumps(matches, indent=2))

            elif method == "rename_function":
                path = params.get("path")
                old_name = params.get("old_name")
                new_name = params.get("new_name")
                if not path:
                    logger.error("Missing 'path' parameter")
                    return self._error_response(request_id, -32602, "Missing 'path' parameter")
                if not old_name:
                    logger.error("Missing 'old_name' parameter")
                    return self._error_response(request_id, -32602, "Missing 'old_name' parameter")
                if not new_name:
                    logger.error("Missing 'new_name' parameter")
                    return self._error_response(request_id, -32602, "Missing 'new_name' parameter")
                if not isinstance(path, str):
                    logger.error(f"Invalid path type: {type(path)}")
                    return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
                if not isinstance(old_name, str):
                    logger.error(f"Invalid old_name type: {type(old_name)}")
                    return self._error_response(request_id, -32602, "Parameter 'old_name' must be a string")
                if not isinstance(new_name, str):
                    logger.error(f"Invalid new_name type: {type(new_name)}")
                    return self._error_response(request_id, -32602, "Parameter 'new_name' must be a string")
                
                logger.debug(f"Renaming function from '{old_name}' to '{new_name}' in file: {path}")
                success = self.client.rename_function(old_name, new_name)
                if success:
                    return self._wrap_result(request_id, json.dumps({"success": True, "message": f"Function renamed from '{old_name}' to '{new_name}'"}, indent=2))
                else:
                    return self._error_response(request_id, -32602, f"Failed to rename function '{old_name}' to '{new_name}'")

            elif method == "rename_data":
                path = params.get("path")
                address = params.get("address")
                new_name = params.get("new_name")
                if not path:
                    logger.error("Missing 'path' parameter")
                    return self._error_response(request_id, -32602, "Missing 'path' parameter")
                if not address:
                    logger.error("Missing 'address' parameter")
                    return self._error_response(request_id, -32602, "Missing 'address' parameter")
                if not new_name:
                    logger.error("Missing 'new_name' parameter")
                    return self._error_response(request_id, -32602, "Missing 'new_name' parameter")
                if not isinstance(path, str):
                    logger.error(f"Invalid path type: {type(path)}")
                    return self._error_response(request_id, -32602, "Parameter 'path' must be a string")
                if not isinstance(address, str):
                    logger.error(f"Invalid address type: {type(address)}")
                    return self._error_response(request_id, -32602, "Parameter 'address' must be a string")
                if not isinstance(new_name, str):
                    logger.error(f"Invalid new_name type: {type(new_name)}")
                    return self._error_response(request_id, -32602, "Parameter 'new_name' must be a string")
                
                logger.debug(f"Renaming data at address '{address}' to '{new_name}' in file: {path}")
                success = self.client.rename_data(address, new_name)
                if success:
                    return self._wrap_result(request_id, json.dumps({"success": True, "message": f"Data at address '{address}' renamed to '{new_name}'"}, indent=2))
                else:
                    return self._error_response(request_id, -32602, f"Failed to rename data at address '{address}' to '{new_name}'")

            elif method == "cancel":
                logger.debug("Cancel requested — not implemented.")
                return self._error_response(request_id, -32601, "Cancel not implemented")

            logger.error(f"Unknown method: {method}")
            return self._error_response(request_id, -32601, f"Unknown method: {method}")

        except Exception as e:
            logger.error(f"Error in MCP handler: {e}\n{traceback.format_exc()}")
            return self._error_response(request_id, -32603, str(e))

    def _error_response(self, request_id, code, message):
        return {
            "jsonrpc": "2.0",
            "id": request_id,
            "error": {
                "code": code,
                "message": message
            }
        }

def run_server(host='127.0.0.1', port=8088):
    server = ThreadedHTTPServer((host, port), BinaryNinjaMCPHandler)
    logger.info(f"Binary Ninja MCP HTTP server running at http://{host}:{port}")
    server.serve_forever()

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--host', default='127.0.0.1')
    parser.add_argument('--port', type=int, default=8088)
    args = parser.parse_args()
    run_server(args.host, args.port)

if __name__ == '__main__':
    main()

```

--------------------------------------------------------------------------------
/binaryninja_server.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Binary Ninja MCP Server

This is the main entry point for the Binary Ninja MCP server.
It integrates the HTTP server and client components to provide a complete MCP server implementation.
"""

import sys
import json
import traceback
import os
import logging
from binaryninja_http_client import BinaryNinjaHTTPClient

# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('BinaryNinjaMCPServer')

# Create a file handler to log to a file
file_handler = logging.FileHandler('/tmp/binaryninja_mcp_server.log')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)

def read_json():
    """Read a JSON object from stdin."""
    line = sys.stdin.readline()
    if not line:
        sys.exit(0)
    return json.loads(line)

def write_json(response):
    """Write a JSON object to stdout."""
    print(json.dumps(response), flush=True)

def handle_request(request, client):
    """Handle an MCP request using the Binary Ninja HTTP client."""
    try:
        method = request.get("method")
        params = request.get("params", {})
        
        # Log the request method and parameters
        logger.debug(f"Handling method: {method}")
        logger.debug(f"Parameters: {json.dumps(params)}")

        # MCP Protocol Methods
        if method == "list_tools":
            # Return the list of available tools
            return {
                "result": {
                    "tools": [
                        {
                            "name": "get_binary_info",
                            "description": "Get information about a binary file",
                            "inputSchema": {
                                "type": "object",
                                "properties": {
                                    "path": {
                                        "type": "string",
                                        "description": "Path to the binary file"
                                    }
                                },
                                "required": ["path"]
                            }
                        },
                        {
                            "name": "list_functions",
                            "description": "List all functions in a binary file",
                            "inputSchema": {
                                "type": "object",
                                "properties": {
                                    "path": {
                                        "type": "string",
                                        "description": "Path to the binary file"
                                    }
                                },
                                "required": ["path"]
                            }
                        },
                        {
                            "name": "disassemble_function",
                            "description": "Disassemble a function in a binary file",
                            "inputSchema": {
                                "type": "object",
                                "properties": {
                                    "path": {
                                        "type": "string",
                                        "description": "Path to the binary file"
                                    },
                                    "function": {
                                        "type": "string",
                                        "description": "Name of the function to disassemble"
                                    }
                                },
                                "required": ["path", "function"]
                            }
                        },
                        {
                            "name": "decompile_function",
                            "description": "Decompile a function to C code",
                            "inputSchema": {
                                "type": "object",
                                "properties": {
                                    "path": {
                                        "type": "string",
                                        "description": "Path to the binary file"
                                    },
                                    "function": {
                                        "type": "string",
                                        "description": "Name of the function to decompile"
                                    }
                                },
                                "required": ["path", "function"]
                            }
                        }
                    ]
                }
            }
            
        elif method == "list_resources":
            # Return the list of available resources
            return {
                "result": {
                    "resources": []  # No static resources available
                }
            }
            
        elif method == "list_resource_templates":
            # Return the list of available resource templates
            return {
                "result": {
                    "resourceTemplates": [
                        {
                            "uriTemplate": "binary://{path}/info",
                            "name": "Binary Information",
                            "description": "Information about a binary file"
                        },
                        {
                            "uriTemplate": "binary://{path}/functions",
                            "name": "Functions",
                            "description": "List of functions in a binary file"
                        },
                        {
                            "uriTemplate": "binary://{path}/function/{name}",
                            "name": "Function Disassembly",
                            "description": "Disassembly of a function in a binary file"
                        }
                    ]
                }
            }
            
        elif method == "read_resource":
            uri = params.get("uri", "")
            logger.debug(f"Reading resource: {uri}")
            
            # Parse the URI
            if uri.startswith("binary://"):
                # Remove the protocol
                path = uri[9:]
                
                # Check if it's a function disassembly
                if "/function/" in path:
                    # Extract the path and function name
                    parts = path.split("/function/")
                    if len(parts) != 2:
                        return {"error": "Invalid URI format"}
                    
                    binary_path = parts[0]
                    function_name = parts[1]
                    
                    # Get the disassembly
                    disasm_result = handle_request({
                        "method": "disassemble_function",
                        "params": {
                            "path": binary_path,
                            "function": function_name
                        }
                    }, client)
                    
                    if "error" in disasm_result:
                        return disasm_result
                    
                    return {
                        "result": {
                            "contents": [
                                {
                                    "uri": uri,
                                    "mimeType": "text/plain",
                                    "text": "\n".join(disasm_result["result"])
                                }
                            ]
                        }
                    }
                
                # Check if it's a functions list
                elif path.endswith("/functions"):
                    # Extract the binary path
                    binary_path = path[:-10]  # Remove "/functions"
                    
                    # Get the functions
                    functions_result = handle_request({
                        "method": "list_functions",
                        "params": {
                            "path": binary_path
                        }
                    }, client)
                    
                    if "error" in functions_result:
                        return functions_result
                    
                    return {
                        "result": {
                            "contents": [
                                {
                                    "uri": uri,
                                    "mimeType": "application/json",
                                    "text": json.dumps(functions_result["result"])
                                }
                            ]
                        }
                    }
                
                # Check if it's binary info
                elif path.endswith("/info"):
                    # Extract the binary path
                    binary_path = path[:-5]  # Remove "/info"
                    
                    # Get the binary info
                    info_result = handle_request({
                        "method": "get_binary_info",
                        "params": {
                            "path": binary_path
                        }
                    }, client)
                    
                    if "error" in info_result:
                        return info_result
                    
                    return {
                        "result": {
                            "contents": [
                                {
                                    "uri": uri,
                                    "mimeType": "application/json",
                                    "text": json.dumps(info_result["result"])
                                }
                            ]
                        }
                    }
            
            return {"error": f"Unknown resource URI: {uri}"}
            
        elif method == "call_tool":
            tool_name = params.get("name")
            tool_args = params.get("arguments", {})
            
            logger.debug(f"Calling tool: {tool_name}")
            logger.debug(f"Arguments: {json.dumps(tool_args)}")
            
            # Map the tool name to the corresponding method
            if tool_name == "get_binary_info":
                return handle_request({
                    "method": "get_binary_info",
                    "params": tool_args
                }, client)
            elif tool_name == "list_functions":
                return handle_request({
                    "method": "list_functions",
                    "params": tool_args
                }, client)
            elif tool_name == "disassemble_function":
                return handle_request({
                    "method": "disassemble_function",
                    "params": tool_args
                }, client)
            elif tool_name == "decompile_function":
                return handle_request({
                    "method": "decompile_function",
                    "params": tool_args
                }, client)
            else:
                return {"error": f"Unknown tool: {tool_name}"}
        
        # Binary Ninja API Methods
        elif method == "ping":
            ping_result = client.ping()
            if ping_result["status"] == "connected":
                return {"result": "pong"}
            else:
                return {"error": f"Failed to connect to Binary Ninja server: {ping_result.get('error', 'Unknown error')}"}

        elif method == "get_binary_info":
            path = params.get("path")
            if not path:
                return {"error": "Path parameter is required"}
                
            # We assume the binary is already loaded
            # Just log the path for debugging
            logger.info(f"Using binary: {path}")
                
            file_info = client.get_file_info(path)
            
            # Format the response to match the original API
            info = {
                "filename": file_info.get("filename", ""),
                "architecture": file_info.get("arch", {}).get("name", "unknown"),
                "platform": file_info.get("platform", {}).get("name", "unknown"),
                "entry_point": hex(file_info.get("entry_point", 0)),
                "file_size": file_info.get("file_size", 0),
                "is_executable": file_info.get("executable", False),
                "is_relocatable": file_info.get("relocatable", False),
                "address_size": file_info.get("address_size", 0)
            }
            return {"result": info}

        elif method == "list_functions":
            path = params.get("path")
            if not path:
                return {"error": "Path parameter is required"}
                
            # We assume the binary is already loaded
            # Just log the path for debugging
            logger.info(f"Using binary: {path}")
                
            functions = client.list_functions(path)
            func_names = [f["name"] for f in functions]
            return {"result": func_names}

        elif method == "disassemble_function":
            path = params.get("path")
            func_name = params.get("function")
            if not path or not func_name:
                return {"error": "Path and function parameters are required"}
                
            # We assume the binary is already loaded
            # Just log the path for debugging
            logger.info(f"Using binary: {path}")
                
            disasm = client.get_disassembly(path, function_name=func_name)
            return {"result": disasm}
            
        elif method == "list_sections":
            path = params.get("path")
            if not path:
                return {"error": "Path parameter is required"}
                
            # We assume the binary is already loaded
            # Just log the path for debugging
            logger.info(f"Using binary: {path}")
                
            sections_data = client.get_sections(path)
            
            # Format the response to match the original API
            sections = []
            for section in sections_data:
                # Handle the case where start, end, and length might be strings
                start = section.get("start", 0)
                end = section.get("end", 0)
                length = section.get("length", 0)
                
                # Convert to integers if they are strings
                if isinstance(start, str):
                    try:
                        start = int(start, 0)  # Base 0 means it will detect hex or decimal
                    except ValueError:
                        start = 0
                        
                if isinstance(end, str):
                    try:
                        end = int(end, 0)  # Base 0 means it will detect hex or decimal
                    except ValueError:
                        end = 0
                        
                if isinstance(length, str):
                    try:
                        length = int(length, 0)  # Base 0 means it will detect hex or decimal
                    except ValueError:
                        length = 0
                
                sections.append({
                    "name": section.get("name", ""),
                    "start": hex(start),
                    "end": hex(end),
                    "size": length,
                    "semantics": section.get("semantics", "")
                })
            return {"result": sections}
            
        elif method == "get_xrefs":
            path = params.get("path")
            func_name = params.get("function")
            if not path or not func_name:
                return {"error": "Path and function parameters are required"}
                
            # We assume the binary is already loaded
            # Just log the path for debugging
            logger.info(f"Using binary: {path}")
                
            # First get the function info to get its address
            function = client.get_function(path, function_name=func_name)
            if not function:
                return {"error": f"Function '{func_name}' not found"}
                
            # Then get the xrefs to that address
            xrefs_data = client.get_xrefs(path, function.get("start", 0))
            
            # Format the response to match the original API
            refs = []
            for xref in xrefs_data:
                # Get the function that contains this xref
                caller_addr = xref.get("from", 0)
                try:
                    # This is a simplification - in a real implementation we would
                    # need to find the function that contains this address
                    caller_func = client.get_function(path, function_address=caller_addr)
                    refs.append({
                        "from_function": caller_func.get("name", "unknown"),
                        "from_address": hex(caller_addr),
                        "to_address": hex(xref.get("to", 0))
                    })
                except Exception:
                    # Skip this xref if we can't get the caller function
                    pass
            
            return {"result": refs}
            
        elif method == "get_strings":
            path = params.get("path")
            min_length = params.get("min_length", 4)
            if not path:
                return {"error": "Path parameter is required"}
                
            # We assume the binary is already loaded
            # Just log the path for debugging
            logger.info(f"Using binary: {path}")
                
            strings_data = client.get_strings(path, min_length=min_length)
            
            # Format the response to match the original API
            strings = []
            for string in strings_data:
                strings.append({
                    "value": string.get("value", ""),
                    "address": hex(string.get("address", 0)),
                    "length": len(string.get("value", "")),
                    "type": string.get("type", "")
                })
            
            return {"result": strings}
            
        elif method == "decompile_function":
            path = params.get("path")
            func_name = params.get("function")
            if not path or not func_name:
                return {"error": "Path and function parameters are required"}
                
            # We assume the binary is already loaded
            # Just log the path for debugging
            logger.info(f"Using binary: {path}")
                
            # Get the function info
            function = client.get_function(path, function_name=func_name)
            if not function:
                return {"error": f"Function '{func_name}' not found"}
                
            # Get the decompiled code
            try:
                hlil = client.get_hlil(path, function_name=func_name)
                decompiled_code = "\n".join(hlil) if isinstance(hlil, list) else str(hlil)
            except Exception as e:
                logger.warning(f"Failed to decompile function: {e}")
                decompiled_code = "// Decompilation not available in personal license\n// or Binary Ninja server is not running."
            
            # Format the response to match the original API
            return {
                "result": {
                    "name": function.get("name", ""),
                    "signature": function.get("type", ""),
                    "decompiled_code": decompiled_code,
                    "address": hex(function.get("start", 0))
                }
            }
            
        elif method == "get_types":
            path = params.get("path")
            if not path:
                return {"error": "Path parameter is required"}
                
            # We assume the binary is already loaded
            # Just log the path for debugging
            logger.info(f"Using binary: {path}")
                
            types_data = client.get_types(path)
            
            # Format the response to match the original API
            # This is a simplified version - the actual implementation would need to
            # parse the types data from the Binary Ninja HTTP API
            types = []
            for type_name, type_info in types_data.items():
                type_obj = {
                    "name": type_name,
                    "type_class": type_info.get("type_class", "unknown"),
                    "type_string": type_info.get("type_string", "")
                }
                
                if type_info.get("type_class") == "structure":
                    type_obj["size"] = type_info.get("size", 0)
                    type_obj["members"] = []
                    for member in type_info.get("members", []):
                        type_obj["members"].append({
                            "name": member.get("name", ""),
                            "type": member.get("type", ""),
                            "offset": member.get("offset", 0)
                        })
                
                types.append(type_obj)
            
            return {"result": types}
            
        elif method == "generate_header":
            path = params.get("path")
            output_path = params.get("output_path")
            include_functions = params.get("include_functions", True)
            include_types = params.get("include_types", True)
            
            if not path:
                return {"error": "Path parameter is required"}
                
            # We assume the binary is already loaded
            # Just log the path for debugging
            logger.info(f"Using binary: {path}")
                
            # This is a placeholder implementation
            # In a real implementation, we would generate a header file with function prototypes and type definitions
            header_content = "// Generated header file\n\n"
            
            # Add include guards
            header_content += "#ifndef GENERATED_HEADER_H\n"
            header_content += "#define GENERATED_HEADER_H\n\n"
            
            # Add standard includes
            header_content += "#include <stdint.h>\n"
            header_content += "#include <stdbool.h>\n\n"
            
            # Add types if requested
            if include_types:
                types_data = client.get_types(path)
                if types_data:
                    header_content += "// Types\n"
                    for type_name, type_info in types_data.items():
                        if type_info.get("type_class") == "structure":
                            header_content += f"typedef struct {type_name} {{\n"
                            for member in type_info.get("members", []):
                                header_content += f"    {member.get('type', 'void')} {member.get('name', 'unknown')}; // offset: {member.get('offset', 0)}\n"
                            header_content += f"}} {type_name};\n\n"
                        else:
                            header_content += f"typedef {type_info.get('type_string', 'void')} {type_name};\n"
                    header_content += "\n"
            
            # Add function prototypes if requested
            if include_functions:
                functions = client.list_functions(path)
                if functions:
                    header_content += "// Function prototypes\n"
                    for func in functions:
                        # Get the function info
                        function = client.get_function(path, function_name=func["name"])
                        if function:
                            header_content += f"{function.get('type', 'void')} {function.get('name', 'unknown')}();\n"
                    header_content += "\n"
            
            # Close include guards
            header_content += "#endif // GENERATED_HEADER_H\n"
            
            # Write to file if output_path is provided
            if output_path:
                try:
                    with open(output_path, "w") as f:
                        f.write(header_content)
                except Exception as e:
                    logger.error(f"Failed to write header file: {e}")
                    return {"error": f"Failed to write header file: {e}"}
            
            return {"result": header_content}
            
        elif method == "generate_source":
            path = params.get("path")
            output_path = params.get("output_path")
            header_path = params.get("header_path", "generated_header.h")
            
            if not path:
                return {"error": "Path parameter is required"}
                
            # We assume the binary is already loaded
            # Just log the path for debugging
            logger.info(f"Using binary: {path}")
                
            # This is a placeholder implementation
            # In a real implementation, we would generate a source file with function implementations
            source_content = "// Generated source file\n\n"
            
            # Add include for the header file
            source_content += f"#include \"{header_path}\"\n\n"
            
            # Add function implementations
            functions = client.list_functions(path)
            if functions:
                for func in functions:
                    # Get the function info
                    function = client.get_function(path, function_name=func["name"])
                    if function:
                        # Get the decompiled code
                        try:
                            hlil = client.get_hlil(path, function_name=func["name"])
                            decompiled_code = "\n".join(hlil) if isinstance(hlil, list) else str(hlil)
                        except Exception as e:
                            logger.warning(f"Failed to decompile function: {e}")
                            decompiled_code = "// Decompilation not available in personal license\n// or Binary Ninja server is not running."
                        
                        source_content += f"// Function: {function.get('name', 'unknown')}\n"
                        source_content += f"// Address: {hex(function.get('start', 0))}\n"
                        source_content += f"{function.get('type', 'void')} {function.get('name', 'unknown')}() {{\n"
                        source_content += f"    // TODO: Implement this function\n"
                        source_content += f"    // Decompiled code:\n"
                        source_content += f"    /*\n"
                        for line in decompiled_code.split("\n"):
                            source_content += f"    {line}\n"
                        source_content += f"    */\n"
                        source_content += f"}}\n\n"
            
            # Write to file if output_path is provided
            if output_path:
                try:
                    with open(output_path, "w") as f:
                        f.write(source_content)
                except Exception as e:
                    logger.error(f"Failed to write source file: {e}")
                    return {"error": f"Failed to write source file: {e}"}
            
            return {"result": source_content}
            
        elif method == "rebuild_driver":
            path = params.get("path")
            output_dir = params.get("output_dir")
            
            if not path:
                return {"error": "Path parameter is required"}
                
            if not output_dir:
                return {"error": "Output directory parameter is required"}
                
            # We assume the binary is already loaded
            # Just log the path for debugging
            logger.info(f"Using binary: {path}")
                
            # This is a placeholder implementation
            # In a real implementation, we would generate a complete driver module
            try:
                os.makedirs(output_dir, exist_ok=True)
                
                # Generate header file
                header_path = os.path.join(output_dir, "driver.h")
                header_result = handle_request({
                    "method": "generate_header",
                    "params": {
                        "path": path,
                        "output_path": header_path
                    }
                }, client)
                
                if "error" in header_result:
                    return {"error": f"Failed to generate header file: {header_result['error']}"}
                
                # Generate source file
                source_path = os.path.join(output_dir, "driver.c")
                source_result = handle_request({
                    "method": "generate_source",
                    "params": {
                        "path": path,
                        "output_path": source_path,
                        "header_path": "driver.h"
                    }
                }, client)
                
                if "error" in source_result:
                    return {"error": f"Failed to generate source file: {source_result['error']}"}
                
                # Generate Makefile
                makefile_path = os.path.join(output_dir, "Makefile")
                with open(makefile_path, "w") as f:
                    f.write("# Generated Makefile\n\n")
                    f.write("obj-m := driver.o\n\n")
                    f.write("all:\n")
                    f.write("\tmake -C /lib/modules/$(shell uname -r)/build M=$(PWD) modules\n\n")
                    f.write("clean:\n")
                    f.write("\tmake -C /lib/modules/$(shell uname -r)/build M=$(PWD) clean\n")
                
                return {
                    "result": {
                        "header_file": header_path,
                        "source_files": [source_path],
                        "makefile": makefile_path
                    }
                }
            except Exception as e:
                logger.error(f"Failed to rebuild driver: {e}")
                return {"error": f"Failed to rebuild driver: {e}"}

        return {"error": f"Unknown method: {method}"}

    except Exception as e:
        logger.error(f"Error handling request: {e}")
        logger.error(traceback.format_exc())
        return {
            "error": str(e),
            "traceback": traceback.format_exc()
        }

def main():
    """Main function to run the MCP server."""
    logger.info("Starting Binary Ninja MCP Server")
    
    # Log all environment variables for debugging
    logger.debug("Environment variables:")
    for key, value in os.environ.items():
        logger.debug(f"  {key}={value}")
    
    # Create the Binary Ninja HTTP client
    client = BinaryNinjaHTTPClient()
    
    # Test the connection to the Binary Ninja server
    ping_result = client.ping()
    if ping_result["status"] != "connected":
        logger.error(f"Failed to connect to Binary Ninja server: {ping_result.get('error', 'Unknown error')}")
        # Don't exit, continue anyway to support the MCP protocol
        logger.warning("Continuing anyway to support the MCP protocol")
    else:
        logger.info(f"Connected to Binary Ninja server (binary loaded: {ping_result.get('loaded', False)})")
    
    # Log that we're ready to receive requests
    logger.info("Ready to receive MCP requests")
    
    # Process requests
    while True:
        try:
            # Log that we're waiting for a request
            logger.debug("Waiting for request...")
            
            # Read the request from stdin
            req = read_json()
            logger.debug(f"Received request: {json.dumps(req)}")
            
            # Handle the request
            res = handle_request(req, client)
            res["id"] = req.get("id")
            
            # Log the response
            logger.debug(f"Sending response: {json.dumps(res)}")
            
            # Write the response to stdout
            write_json(res)
        except json.JSONDecodeError as e:
            logger.error(f"Error decoding JSON: {e}")
            logger.error(f"Input was: {sys.stdin.readline()}")
            # Continue processing requests
        except Exception as e:
            logger.error(f"Error processing request: {e}")
            logger.error(traceback.format_exc())
            # Don't exit, continue processing requests
            logger.warning("Continuing to process requests...")

if __name__ == "__main__":
    main()

```
Page 1/2FirstPrevNextLast