#
tokens: 11362/50000 12/12 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .env_example
├── .gitignore
├── images
│   ├── pic.png
│   └── pic2.png
├── pyproject.toml
├── README.md
├── src
│   └── headless_ida_mcp_server
│       ├── __init__.py
│       ├── __main__.py
│       ├── helper.py
│       ├── logger.py
│       └── server.py
└── test
    ├── heap
    │   ├── main
    │   └── main.c
    ├── stack
    │   ├── main
    │   └── main.c
    └── test_ida_agent.py
```

# Files

--------------------------------------------------------------------------------
/.env_example:
--------------------------------------------------------------------------------

```
1 | IDA_PATH=/home/ubuntu/idapro-9.0/idat
2 | PORT=8888
3 | HOST=0.0.0.0
4 | TRANSPORT=sse
5 | # TRANSPORT=stdio
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
 1 | # Python-generated files
 2 | __pycache__/
 3 | *.py[oc]
 4 | build/
 5 | dist/
 6 | wheels/
 7 | *.egg-info
 8 | 
 9 | # Virtual environments
10 | .venv/
11 | .vscode/
12 | .python-version
13 | .cursor/
14 | .env
15 | exploit.py
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
 1 | # Acknowledgments
 2 | 
 3 | This project builds upon the work of:
 4 | - Tools code adapted from [ida-pro-mcp](https://github.com/mrexodia/ida-pro-mcp) by mrexodia
 5 | - Utilizes the [headless-ida](https://github.com/DennyDai/headless-ida) library by DennyDai
 6 | 
 7 | # Headless IDA MCP Server
 8 | 
 9 | If you want to run the server directly as a cli app, rather than an IDA plugin interactively,you can chose it.
10 | 
11 | ## Project Description
12 | 
13 | This project uses IDA Pro's headless mode to analyze binary files and provides a suite of tools via MCP to manage and manipulate functions, variables, and more.
14 | 
15 | ## Prerequisites
16 | 
17 | - Python 3.12 or higher
18 | - IDA Pro with headless support (idat) https://github.com/DennyDai/headless-ida
19 | 
20 | ## Installation
21 | 
22 | 1. Clone the project locally:
23 | 
24 |    ```bash
25 |    git clone https://github.com/cnitlrt/headless-ida-mcp-server.git 
26 |    cd headless-ida-mcp-server
27 |    ```
28 | 
29 | 2. Install dependencies:
30 | 
31 |    ```bash
32 |    uv python install 3.12
33 |    uv venv --python 3.12
34 |    uv pip install -e .
35 |    ```
36 | 
37 | ## Configuration
38 | 
39 | 1. Copy the example environment file:
40 |    ```bash
41 |    cp .env_example .env
42 |    ```
43 | 
44 | 2. Configure the following environment variables in `.env`:
45 | 
46 |    - `IDA_PATH`: Path to IDA Pro's headless executable (idat), e.g., `/home/ubuntu/idapro/idat`
47 |    - `PORT`: Port number for the MCP server, e.g., `8888`
48 |    - `HOST`: Host address for the MCP server, e.g., `127.0.0.1`
49 |    - `TRANSPORT`: MCP transport mode (`sse` or `stdio`)
50 | 
51 | ## Usage
52 | 
53 | 1. Start the server:
54 |    ```bash
55 |    uv run headless_ida_mcp_server
56 |    ```
57 | 
58 | 2. Connect to the server using an MCP client:
59 | 
60 |    Debug it: 
61 |    ```bash
62 |    npx -y @modelcontextprotocol/inspector
63 |    ```
64 |    or
65 |    ```json
66 |    {
67 |    "mcpServers": {
68 |       "ida": {
69 |          "command": "/path/to/uv",
70 |          "args": ["--directory","path/to/headless-ida-mcp-server","run","headless_ida_mcp_server"]
71 |       }
72 |    }
73 |    }
74 |    ```
75 | ![](./images/pic.png)
76 | 
77 | ![](./images/pic2.png)
```

--------------------------------------------------------------------------------
/src/headless_ida_mcp_server/__main__.py:
--------------------------------------------------------------------------------

```python
1 | from .server import main
2 | 
3 | if __name__ == "__main__":
4 |     main() 
```

--------------------------------------------------------------------------------
/test/stack/main.c:
--------------------------------------------------------------------------------

```cpp
 1 | #include <stdio.h>
 2 | #include <unistd.h>
 3 | #include <stdlib.h>
 4 | #include <string.h>
 5 | // 栈溢出漏洞,english
 6 | void vuln() {
 7 |     char buf[100];
 8 |     printf("Please enter a string: ");
 9 |     read(0,buf,0x200);
10 | }
11 | 
12 | int main() {
13 |     setvbuf(stdout, 0,2,0);
14 |     setvbuf(stdin, 0,2,0);
15 |     setvbuf(stderr, 0,2,0);
16 |     vuln();
17 |     return 0;
18 | }
19 | 
```

--------------------------------------------------------------------------------
/src/headless_ida_mcp_server/logger.py:
--------------------------------------------------------------------------------

```python
 1 | 
 2 | import logging
 3 | #### LOGGING ####
 4 | logging.basicConfig(
 5 |     level=logging.INFO,
 6 |     format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
 7 |     datefmt='%Y-%m-%d %H:%M:%S'
 8 | )
 9 | logger = logging.getLogger('headless_ida_mcp_server')
10 | logger.setLevel(logging.DEBUG)
11 | logging.getLogger("mcp.server.lowlevel.server").setLevel(logging.ERROR)
12 | 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [project]
 2 | name = "headless-ida-mcp-server"
 3 | version = "0.1.0"
 4 | description = "Add your description here"
 5 | readme = "README.md"
 6 | requires-python = ">=3.12"
 7 | dependencies = [
 8 |     "dotenv>=0.9.9",
 9 |     "fastmcp>=0.4.1",
10 |     "headless-ida>=0.6.1",
11 |     "mcp>=1.6.0",
12 |     "pytest>=8.3.5",
13 |     "pytest-asyncio>=0.26.0",
14 | ]
15 | [project.scripts]
16 | headless_ida_mcp_server = "headless_ida_mcp_server.server:main"
17 | 
```

--------------------------------------------------------------------------------
/src/headless_ida_mcp_server/__init__.py:
--------------------------------------------------------------------------------

```python
 1 | """
 2 | headless-ida-mcp-server
 3 | """
 4 | from dotenv import load_dotenv,find_dotenv,dotenv_values
 5 | import os
 6 | from .logger import*
 7 | 
 8 | __all__ = ['PORT', 'HOST', 'TRANSPORT', 'BINARY_PATH', 'IDA_PATH']
 9 | 
10 | load_dotenv(find_dotenv(),override=True)
11 | PORT = os.environ.get("PORT", 8888)
12 | HOST = os.environ.get("HOST", "0.0.0.0")
13 | TRANSPORT = os.environ.get("TRANSPORT", "sse")
14 | IDA_PATH = os.environ.get("IDA_PATH", "")
15 | if IDA_PATH == "":
16 |     raise ValueError("IDA_PATH is not set")
17 | 
18 | 
```

--------------------------------------------------------------------------------
/test/test_ida_agent.py:
--------------------------------------------------------------------------------

```python
 1 | import pytest
 2 | from langchain_mcp_adapters.client import MultiServerMCPClient
 3 | from langgraph.prebuilt import create_react_agent
 4 | from langchain_openai import ChatOpenAI
 5 | from langchain_core.messages import HumanMessage
 6 | 
 7 | async def test_ida_function_query():
 8 |     model = ChatOpenAI(model="gpt-4")
 9 |     
10 |     async with MultiServerMCPClient(
11 |         {
12 |             "ida": {
13 |                 "url": "http://localhost:8888/sse",
14 |                 "transport": "sse",
15 |             }
16 |         }
17 |     ) as client:
18 |         agent = create_react_agent(model, client.get_tools())
19 |         response = await agent.ainvoke(
20 |             {"messages": HumanMessage(content="get function in 4905")}
21 |         )
22 |         
23 |         assert response is not None
24 |         assert "messages" in response
25 |         assert len(response["messages"]) > 0
26 |         
27 |         content = response["messages"][-1].content
28 |         assert isinstance(content, str)
29 |         assert len(content) > 0
30 | 
31 | async def test_ida_server_connection():
32 |     async with MultiServerMCPClient(
33 |         {
34 |             "ida": {
35 |                 "url": "http://localhost:8888/sse",
36 |                 "transport": "sse",
37 |             }
38 |         }
39 |     ) as client:
40 |         assert client is not None
41 |         tools = client.get_tools()
42 |         assert len(tools) > 0
43 | 
```

--------------------------------------------------------------------------------
/test/heap/main.c:
--------------------------------------------------------------------------------

```cpp
  1 | #include <stdio.h>
  2 | #include <stdlib.h>
  3 | #include <string.h>
  4 | #include <time.h>
  5 | 
  6 | #define MAX_NOTES 100
  7 | #define MAX_TITLE_LEN 50
  8 | #define MAX_CONTENT_LEN 500
  9 | #define FILENAME "notes.dat"
 10 | 
 11 | // Note structure
 12 | typedef struct {
 13 |     int id;
 14 |     char title[MAX_TITLE_LEN];
 15 |     char content[MAX_CONTENT_LEN];
 16 |     char date[20];
 17 |     int isDeleted;  // Flag to mark if note is deleted
 18 | } Note;
 19 | 
 20 | // Global variables
 21 | Note notes[MAX_NOTES];
 22 | int noteCount = 0;
 23 | int lastId = 0;
 24 | 
 25 | // Function declarations
 26 | void loadNotes();
 27 | void saveNotes();
 28 | void addNote();
 29 | void deleteNote();
 30 | void updateNote();
 31 | void searchNote();
 32 | void listAllNotes();
 33 | void getCurrentDate(char *dateStr);
 34 | void clearInputBuffer();
 35 | int findNoteById(int id);
 36 | void printNote(Note note);
 37 | void printMenu();
 38 | 
 39 | int main() {
 40 |     loadNotes();  // Load existing notes
 41 |     
 42 |     int choice;
 43 |     do {
 44 |         printMenu();
 45 |         scanf("%d", &choice);
 46 |         clearInputBuffer();
 47 |         
 48 |         switch(choice) {
 49 |             case 1:
 50 |                 addNote();
 51 |                 break;
 52 |             case 2:
 53 |                 deleteNote();
 54 |                 break;
 55 |             case 3:
 56 |                 updateNote();
 57 |                 break;
 58 |             case 4:
 59 |                 searchNote();
 60 |                 break;
 61 |             case 5:
 62 |                 listAllNotes();
 63 |                 break;
 64 |             case 0:
 65 |                 saveNotes();
 66 |                 printf("Thank you for using Note Manager! Goodbye!\n");
 67 |                 break;
 68 |             default:
 69 |                 printf("Invalid choice, please try again.\n");
 70 |         }
 71 |     } while(choice != 0);
 72 |     
 73 |     return 0;
 74 | }
 75 | 
 76 | // Print menu
 77 | void printMenu() {
 78 |     printf("\n===== Note Manager =====\n");
 79 |     printf("1. Add Note\n");
 80 |     printf("2. Delete Note\n");
 81 |     printf("3. Update Note\n");
 82 |     printf("4. Search Note\n");
 83 |     printf("5. List All Notes\n");
 84 |     printf("0. Exit\n");
 85 |     printf("Please select an operation: ");
 86 | }
 87 | 
 88 | // Clear input buffer
 89 | void clearInputBuffer() {
 90 |     int c;
 91 |     while ((c = getchar()) != '\n' && c != EOF);
 92 | }
 93 | 
 94 | // Get current date
 95 | void getCurrentDate(char *dateStr) {
 96 |     time_t now = time(NULL);
 97 |     struct tm *t = localtime(&now);
 98 |     sprintf(dateStr, "%04d-%02d-%02d", 
 99 |             t->tm_year + 1900, t->tm_mon + 1, t->tm_mday);
100 | }
101 | 
102 | // Load notes data
103 | void loadNotes() {
104 |     FILE *file = fopen(FILENAME, "rb");
105 |     if (file == NULL) {
106 |         printf("Note file not found, creating new file.\n");
107 |         return;
108 |     }
109 |     
110 |     noteCount = 0;
111 |     lastId = 0;
112 |     
113 |     while (fread(&notes[noteCount], sizeof(Note), 1, file) == 1 && noteCount < MAX_NOTES) {
114 |         if (notes[noteCount].id > lastId) {
115 |             lastId = notes[noteCount].id;
116 |         }
117 |         noteCount++;
118 |     }
119 |     
120 |     fclose(file);
121 |     printf("Successfully loaded %d notes.\n", noteCount);
122 | }
123 | 
124 | // Save notes data
125 | void saveNotes() {
126 |     FILE *file = fopen(FILENAME, "wb");
127 |     if (file == NULL) {
128 |         printf("Cannot open file to save notes.\n");
129 |         return;
130 |     }
131 |     
132 |     for (int i = 0; i < noteCount; i++) {
133 |         if (!notes[i].isDeleted) {
134 |             fwrite(&notes[i], sizeof(Note), 1, file);
135 |         }
136 |     }
137 |     
138 |     fclose(file);
139 |     printf("Notes have been saved.\n");
140 | }
141 | 
142 | // Add note
143 | void addNote() {
144 |     if (noteCount >= MAX_NOTES) {
145 |         printf("Maximum number of notes reached, cannot add more.\n");
146 |         return;
147 |     }
148 |     
149 |     Note newNote;
150 |     newNote.id = ++lastId;
151 |     newNote.isDeleted = 0;
152 |     
153 |     printf("Enter note title: ");
154 |     fgets(newNote.title, MAX_TITLE_LEN, stdin);
155 |     newNote.title[strcspn(newNote.title, "\n")] = 0;  // Remove newline
156 |     
157 |     printf("Enter note content: ");
158 |     fgets(newNote.content, MAX_CONTENT_LEN, stdin);
159 |     newNote.content[strcspn(newNote.content, "\n")] = 0;  // Remove newline
160 |     
161 |     getCurrentDate(newNote.date);
162 |     
163 |     notes[noteCount++] = newNote;
164 |     printf("Note added, ID: %d\n", newNote.id);
165 |     saveNotes();
166 | }
167 | 
168 | // Find note
169 | int findNoteById(int id) {
170 |     for (int i = 0; i < noteCount; i++) {
171 |         if (notes[i].id == id && !notes[i].isDeleted) {
172 |             return i;
173 |         }
174 |     }
175 |     return -1;
176 | }
177 | 
178 | // Print note
179 | void printNote(Note note) {
180 |     printf("ID: %d\n", note.id);
181 |     printf("Title: %s\n", note.title);
182 |     printf("Content: %s\n", note.content);
183 |     printf("Date: %s\n", note.date);
184 |     printf("-----------------------\n");
185 | }
186 | 
187 | // Delete note
188 | void deleteNote() {
189 |     int id;
190 |     printf("Enter the ID of note to delete: ");
191 |     scanf("%d", &id);
192 |     clearInputBuffer();
193 |     
194 |     int index = findNoteById(id);
195 |     if (index == -1) {
196 |         printf("Note does not exist.\n");
197 |         return;
198 |     }
199 |     
200 |     notes[index].isDeleted = 1;
201 |     printf("Note has been deleted.\n");
202 |     saveNotes();
203 | }
204 | 
205 | // Update note
206 | void updateNote() {
207 |     int id;
208 |     printf("Enter the ID of note to update: ");
209 |     scanf("%d", &id);
210 |     clearInputBuffer();
211 |     
212 |     int index = findNoteById(id);
213 |     if (index == -1) {
214 |         printf("Note does not exist.\n");
215 |         return;
216 |     }
217 |     
218 |     printf("Current note content:\n");
219 |     printNote(notes[index]);
220 |     
221 |     printf("Enter new title (leave empty to keep unchanged): ");
222 |     char newTitle[MAX_TITLE_LEN];
223 |     fgets(newTitle, MAX_TITLE_LEN, stdin);
224 |     newTitle[strcspn(newTitle, "\n")] = 0;
225 |     
226 |     printf("Enter new content (leave empty to keep unchanged): ");
227 |     char newContent[MAX_CONTENT_LEN];
228 |     fgets(newContent, MAX_CONTENT_LEN, stdin);
229 |     newContent[strcspn(newContent, "\n")] = 0;
230 |     
231 |     if (strlen(newTitle) > 0) {
232 |         strcpy(notes[index].title, newTitle);
233 |     }
234 |     
235 |     if (strlen(newContent) > 0) {
236 |         strcpy(notes[index].content, newContent);
237 |     }
238 |     
239 |     printf("Note has been updated.\n");
240 |     saveNotes();
241 | }
242 | 
243 | // Search note
244 | void searchNote() {
245 |     printf("1. Search by ID\n");
246 |     printf("2. Search by title keyword\n");
247 |     printf("Choose search method: ");
248 |     
249 |     int choice;
250 |     scanf("%d", &choice);
251 |     clearInputBuffer();
252 |     
253 |     if (choice == 1) {
254 |         int id;
255 |         printf("Enter note ID: ");
256 |         scanf("%d", &id);
257 |         clearInputBuffer();
258 |         
259 |         int index = findNoteById(id);
260 |         if (index == -1) {
261 |             printf("Note does not exist.\n");
262 |             return;
263 |         }
264 |         
265 |         printf("\nMatching note found:\n");
266 |         printNote(notes[index]);
267 |     } 
268 |     else if (choice == 2) {
269 |         char keyword[MAX_TITLE_LEN];
270 |         printf("Enter title keyword: ");
271 |         fgets(keyword, MAX_TITLE_LEN, stdin);
272 |         keyword[strcspn(keyword, "\n")] = 0;
273 |         
274 |         int found = 0;
275 |         printf("\nFound notes:\n");
276 |         
277 |         for (int i = 0; i < noteCount; i++) {
278 |             if (!notes[i].isDeleted && strstr(notes[i].title, keyword) != NULL) {
279 |                 printNote(notes[i]);
280 |                 found = 1;
281 |             }
282 |         }
283 |         
284 |         if (!found) {
285 |             printf("No matching notes found.\n");
286 |         }
287 |     } 
288 |     else {
289 |         printf("Invalid choice.\n");
290 |     }
291 | }
292 | 
293 | // List all notes
294 | void listAllNotes() {
295 |     int count = 0;
296 |     printf("\nAll notes:\n");
297 |     
298 |     for (int i = 0; i < noteCount; i++) {
299 |         if (!notes[i].isDeleted) {
300 |             printNote(notes[i]);
301 |             count++;
302 |         }
303 |     }
304 |     
305 |     if (count == 0) {
306 |         printf("No notes available.\n");
307 |     } else {
308 |         printf("Total %d notes.\n", count);
309 |     }
310 | }
```

--------------------------------------------------------------------------------
/src/headless_ida_mcp_server/server.py:
--------------------------------------------------------------------------------

```python
  1 | # -*- coding: utf-8 -*-
  2 | from mcp.server import FastMCP
  3 | from mcp.server.fastmcp.prompts import base
  4 | from functools import wraps
  5 | from typing import Any, Callable, get_type_hints, TypedDict, Optional, Annotated
  6 | import struct
  7 | from headless_ida_mcp_server.helper import IDA
  8 | from headless_ida_mcp_server.logger import logger
  9 | from headless_ida_mcp_server import PORT,TRANSPORT
 10 | 
 11 | mcp = FastMCP("IDA MCP Server", port=PORT)
 12 | ida = None
 13 | 
 14 | @mcp.tool()
 15 | def set_binary_path(path: Annotated[str, "Path to the binary file"]):
 16 |     """Set the path to the binary file"""
 17 |     global ida
 18 |     ida = IDA(path)
 19 |     return "Binary path set"
 20 | 
 21 | @mcp.tool()
 22 | def get_function(address: Annotated[int, "Address of the function"]):
 23 |     """Get a function by address"""
 24 |     if ida is None:
 25 |         raise ValueError("Binary path not set")
 26 |     return ida.get_function(address)
 27 |     
 28 | @mcp.tool()
 29 | def get_function_by_name(name: Annotated[str, "Name of the function"]):
 30 |     """Get a function by name"""
 31 |     if ida is None:
 32 |         raise ValueError("Binary path not set")
 33 |     return ida.get_function_by_name(name)
 34 | 
 35 | @mcp.tool()
 36 | def get_function_by_address(address: Annotated[int, "Address of the function"]):
 37 |     """Get a function by address"""
 38 |     if ida is None:
 39 |         raise ValueError("Binary path not set")
 40 |     return ida.get_function_by_address(address)
 41 | 
 42 | @mcp.tool()
 43 | def get_current_address():
 44 |     """Get the current address"""
 45 |     if ida is None:
 46 |         raise ValueError("Binary path not set")
 47 |     return ida.get_current_address()
 48 | 
 49 | @mcp.tool()
 50 | def get_current_function():
 51 |     """Get the current function"""
 52 |     if ida is None:
 53 |         raise ValueError("Binary path not set")
 54 |     return ida.get_current_function()
 55 | 
 56 | @mcp.tool()
 57 | def convert_number(text: Annotated[str, "Textual representation of the number to convert"],size: Annotated[Optional[int], "Size of the variable in bytes"]):
 58 |     """Convert a number to a different representation"""
 59 |     if ida is None:
 60 |         raise ValueError("Binary path not set")
 61 |     return ida.convert_number(text, size)
 62 | 
 63 | @mcp.tool()
 64 | def list_functions():
 65 |     """List all functions"""
 66 |     if ida is None:
 67 |         raise ValueError("Binary path not set")
 68 |     return ida.list_functions()
 69 | 
 70 | @mcp.tool()
 71 | def decompile_checked(address: Annotated[int, "Address of the function to decompile"]):
 72 |     """Decompile a function at the given address"""
 73 |     if ida is None:
 74 |         raise ValueError("Binary path not set")
 75 |     return ida.decompile_checked(address)
 76 | 
 77 | @mcp.tool()
 78 | def decompile_function(address: Annotated[int, "Address of the function to decompile"]):
 79 |     """Decompile a function at the given address"""
 80 |     if ida is None:
 81 |         raise ValueError("Binary path not set")
 82 |     return ida.decompile_function(address)
 83 | 
 84 | @mcp.tool()
 85 | def disassemble_function(address: Annotated[int, "Address of the function to disassemble"]):
 86 |     """Disassemble a function at the given address"""
 87 |     if ida is None:
 88 |         raise ValueError("Binary path not set")
 89 |     return ida.disassemble_function(address)
 90 | 
 91 | @mcp.tool()
 92 | def get_xrefs_to(address: Annotated[int, "Address to get cross references to"]):
 93 |     """Get cross references to a given address"""
 94 |     if ida is None:
 95 |         raise ValueError("Binary path not set")
 96 |     return ida.get_xrefs_to(address)
 97 | 
 98 | @mcp.tool()
 99 | def get_entry_points():
100 |     """Get all entry points of the binary"""
101 |     if ida is None:
102 |         raise ValueError("Binary path not set")
103 |     return ida.get_entry_points()
104 | 
105 | @mcp.tool()
106 | def set_decompiler_comment(address: Annotated[int, "Address in the function to set the comment for"],comment: Annotated[str, "Comment text (not shown in the disassembly)"]):
107 |     """Set a comment for a given address in the function pseudocode"""
108 |     if ida is None:
109 |         raise ValueError("Binary path not set")
110 |     return ida.set_decompiler_comment(address, comment)
111 | 
112 | @mcp.tool()
113 | def set_disassembly_comment(address: Annotated[int, "Address in the function to set the comment for"],comment: Annotated[str, "Comment text (not shown in the pseudocode)"]):
114 |     """Set a comment for a given address in the function disassembly"""
115 |     if ida is None:
116 |         raise ValueError("Binary path not set")
117 |     return ida.set_disassembly_comment(address, comment)
118 | 
119 | @mcp.tool()
120 | def refresh_decompiler_widget():
121 |     """Refresh the decompiler widget"""
122 |     if ida is None:
123 |         raise ValueError("Binary path not set")
124 |     return ida.refresh_decompiler_widget()
125 | 
126 | @mcp.tool()
127 | def refresh_decompiler_ctext(function_address: Annotated[int, "Address of the function to refresh the decompiler ctext for"]):
128 |     """Refresh the decompiler ctext for a given function"""
129 |     if ida is None:
130 |         raise ValueError("Binary path not set")
131 |     return ida.refresh_decompiler_ctext(function_address)
132 | 
133 | @mcp.tool()
134 | def rename_local_variable(function_address: Annotated[int, "Address of the function containing the variable"],old_name: Annotated[str, "Current name of the variable"],new_name: Annotated[str, "New name for the variable"]):
135 |     """Rename a local variable in a function"""
136 |     if ida is None:
137 |         raise ValueError("Binary path not set")
138 |     return ida.rename_local_variable(function_address, old_name, new_name)
139 | 
140 | @mcp.tool()
141 | def rename_function(function_address: Annotated[int, "Address of the function to rename"],new_name: Annotated[str, "New name for the function"]):
142 |     """Rename a function"""
143 |     if ida is None:
144 |         raise ValueError("Binary path not set")
145 |     return ida.rename_function(function_address, new_name)
146 | 
147 | @mcp.tool()
148 | def set_function_prototype(function_address: Annotated[int, "Address of the function"],prototype: Annotated[str, "New function prototype"]):
149 |     """Set a function's prototype"""
150 |     if ida is None:
151 |         raise ValueError("Binary path not set")
152 |     return ida.set_function_prototype(function_address, prototype)
153 | 
154 | @mcp.tool()
155 | def save_idb_file(save_path: Annotated[str, "Path to save the IDB file"]):
156 |     """Save the IDB file"""
157 |     if ida is None:
158 |         raise ValueError("Binary path not set")
159 |     return ida.save_idb_file(save_path)
160 | 
161 | @mcp.prompt()
162 | def exploit_prompt():
163 |     """Exploit prompt"""
164 |     
165 |     messages = [
166 |         base.UserMessage("You are a helpful assistant that can help me with my exploit."),
167 |         base.UserMessage("""
168 |         You need to follow these steps to complete the exploit:
169 |         1. Reverse analyze the binary file to locate vulnerabilities and analyze vulnerability types
170 |             - Locate vulnerabilities: Use IDA Pro's analysis features to find vulnerability locations
171 |             - Need to first gather binary file information, such as using checksec tool to check binary protection mechanisms
172 |                 - If you find NX protection is disabled, you can use ret2shellcode method to get a shell
173 |         2. Choose appropriate exploitation method based on vulnerability type
174 |             - For stack overflow vulnerabilities, analyze the overflow pattern and check if there are backdoor functions in the binary. If there are backdoor functions, modify the return address to point to the backdoor function address
175 |             - If there are no backdoor functions, need to use ret2libc method. Don't assume the binary contains the gadgets you want - you need to use ROPgadget to find gadgets and combine them to construct the pop chain.
176 |         """),
177 |     ]
178 |     return messages
179 | 
180 | def main():
181 |     mcp.run(transport = TRANSPORT)
182 | 
183 | if __name__ == "__main__":
184 |     main()
```

--------------------------------------------------------------------------------
/src/headless_ida_mcp_server/helper.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Helper module for IDA Python integration.
  3 | Provides configuration and IDA interface initialization.
  4 | """
  5 | 
  6 | from headless_ida import HeadlessIda
  7 | import logging
  8 | from headless_ida_mcp_server import IDA_PATH
  9 | import struct
 10 | from typing import Optional, TypedDict, Annotated
 11 | 
 12 | class Function(TypedDict):
 13 |     start_address: int
 14 |     end_address: int
 15 |     name: str
 16 |     prototype: Optional[str]
 17 | 
 18 | class IDAError(Exception):
 19 |     def __init__(self, message: str):
 20 |         super().__init__(message)
 21 | 
 22 |     @property
 23 |     def message(self) -> str:
 24 |         return self.args[0]
 25 | 
 26 | class Xref(TypedDict):
 27 |     address: int
 28 |     type: str
 29 |     function: Optional[Function]
 30 | 
 31 | class ConvertedNumber(TypedDict):
 32 |     decimal: str
 33 |     hexadecimal: str
 34 |     bytes: str
 35 |     ascii: Optional[str]
 36 |     binary: str
 37 | 
 38 | class IDA():
 39 |     def __init__(self, binary_path: Annotated[str, "Path to the binary file"]):
 40 |         self.headlessida = HeadlessIda(IDA_PATH,binary_path)
 41 |         self.idaapi = self.headlessida.import_module("idaapi")
 42 |         self.idautils = self.headlessida.import_module("idautils")
 43 |         self.ida_entry = self.headlessida.import_module("ida_entry")
 44 |         self.ida_funcs = self.headlessida.import_module("ida_funcs")
 45 |         self.ida_hexrays = self.headlessida.import_module("ida_hexrays")
 46 |         self.ida_kernwin = self.headlessida.import_module("ida_kernwin")
 47 |         self.ida_lines = self.headlessida.import_module("ida_lines")
 48 |         self.ida_nalt = self.headlessida.import_module("ida_nalt")
 49 |         self.ida_name = self.headlessida.import_module("ida_name")
 50 |         self.ida_typeinf = self.headlessida.import_module("ida_typeinf")
 51 |         self.ida_xref = self.headlessida.import_module("ida_xref")
 52 |         self.idc = self.headlessida.import_module("idc")
 53 |         self.ida_loader = self.headlessida.import_module("ida_loader")
 54 |     ######## COPY from https://github.com/mrexodia/ida-pro-mcp ########
 55 |     def get_image_size(self):
 56 |         try:
 57 |             # https://www.hex-rays.com/products/ida/support/sdkdoc/structidainfo.html
 58 |             info = self.idaapi.get_inf_structure()
 59 |             omin_ea = info.omin_ea
 60 |             omax_ea = info.omax_ea
 61 |         except AttributeError:
 62 |             import ida_ida
 63 |             omin_ea = ida_ida.inf_get_omin_ea()
 64 |             omax_ea = ida_ida.inf_get_omax_ea()
 65 |         # Bad heuristic for image size (bad if the relocations are the last section)
 66 |         image_size = omax_ea - omin_ea
 67 |         # Try to extract it from the PE header
 68 |         header = self.idautils.peutils_t().header()
 69 |         if header and header[:4] == b"PE\0\0":
 70 |             image_size = struct.unpack("<I", header[0x50:0x54])[0]
 71 |         return image_size
 72 | 
 73 |     def get_prototype(self,fn) -> Optional[str]:
 74 |         try:
 75 |             if isinstance(fn, self.ida_funcs.func_t):
 76 |                 func = fn
 77 |             else:
 78 |                 func = self.idaapi.get_func(fn)
 79 |             if func is None:
 80 |                 return None
 81 |             tif = self.ida_typeinf.tinfo_t()
 82 |             if self.ida_nalt.get_tinfo(tif, func.start_ea):
 83 |                 return str(tif)
 84 |             return self.idc.get_type(func.start_ea)
 85 |         except Exception as e:
 86 |             print(f"Error getting function prototype: {e}")
 87 |             return None
 88 |     
 89 |     def get_function(self,address: int, *, raise_error=True) -> Optional[Function]:
 90 |         fn = self.idaapi.get_func(address)
 91 |         if fn is None:
 92 |             if raise_error:
 93 |                 raise IDAError(f"No function found at address {address}")
 94 |             return None
 95 | 
 96 |         try:
 97 |             name = fn.get_name()
 98 |         except AttributeError:
 99 |             name = self.ida_funcs.get_func_name(fn.start_ea)
100 |         return {
101 |             "address": fn.start_ea,
102 |             "end_address": fn.end_ea,
103 |             "name": name,
104 |             "prototype": self.get_prototype(fn),
105 |         }
106 | 
107 |     def get_function_by_name(self,name: Annotated[str, "Name of the function to get"]) -> Function:
108 |         """Get a function by its name"""
109 |         function_address = self.idaapi.get_name_ea(self.idaapi.BADADDR, name)
110 |         if function_address == self.idaapi.BADADDR:
111 |             raise IDAError(f"No function found with name {name}")
112 |         return self.get_function(function_address)
113 | 
114 |     def get_function_by_address(self,address: Annotated[int, "Address of the function to get"]) -> Function:
115 |         """Get a function by its address"""
116 |         return self.get_function(address)
117 |     
118 |     def get_current_address(self) -> int:
119 |         """Get the address currently selected by the user"""
120 |         return self.idaapi.get_screen_ea()
121 | 
122 |     def get_current_function(self) -> Optional[Function]:
123 |         """Get the function currently selected by the user"""
124 |         return self.get_function(self.idaapi.get_screen_ea())
125 |     
126 |     def convert_number(self,text: Annotated[str, "Textual representation of the number to convert"],size: Annotated[Optional[int], "Size of the variable in bytes"]) -> ConvertedNumber:
127 |         """Convert a number (decimal, hexadecimal) to different representations"""
128 |         try:
129 |             value = int(text, 0)
130 |         except ValueError:
131 |             raise IDAError(f"Invalid number: {text}")
132 | 
133 |         # Estimate the size of the number
134 |         if not size:
135 |             size = 0
136 |             n = abs(value)
137 |             while n:
138 |                 size += 1
139 |                 n >>= 1
140 |             size += 7
141 |             size //= 8
142 | 
143 |         # Convert the number to bytes
144 |         try:
145 |             bytes = value.to_bytes(size, "little", signed=True)
146 |         except OverflowError:
147 |             raise IDAError(f"Number {text} is too big for {size} bytes")
148 | 
149 |         # Convert the bytes to ASCII
150 |         ascii = ""
151 |         for byte in bytes.rstrip(b"\x00"):
152 |             if byte >= 32 and byte <= 126:
153 |                 ascii += chr(byte)
154 |             else:
155 |                 ascii = None
156 |                 break
157 | 
158 |         return {
159 |             "decimal": str(value),
160 |             "hexadecimal": hex(value),
161 |             "bytes": bytes.hex(" "),
162 |             "ascii": ascii,
163 |             "binary": bin(value)
164 |         }
165 | 
166 |     def list_functions(self) -> list[Function]:
167 |         """List all functions in the database"""
168 |         return [self.get_function(address) for address in self.idautils.Functions()]
169 |     
170 |     def decompile_checked(self,address: int):
171 |         if not self.ida_hexrays.init_hexrays_plugin():
172 |             raise IDAError("Hex-Rays decompiler is not available")
173 |         error = self.ida_hexrays.hexrays_failure_t()
174 |         cfunc: self.ida_hexrays.cfunc_t = self.ida_hexrays.decompile_func(address, error, self.ida_hexrays.DECOMP_WARNINGS)
175 |         if not cfunc:
176 |             message = f"Decompilation failed at {address}"
177 |             if error.str:
178 |                 message += f": {error.str}"
179 |             if error.errea != self.idaapi.BADADDR:
180 |                 message += f" (address: {error.errea})"
181 |             raise IDAError(message)
182 |         return cfunc
183 | 
184 |     def decompile_function(self,address: Annotated[int, "Address of the function to decompile"]) -> str:
185 |         """Decompile a function at the given address"""
186 |         cfunc = self.decompile_checked(address)
187 |         sv = cfunc.get_pseudocode()
188 |         pseudocode = ""
189 |         for i, sl in enumerate(sv):
190 |             sl: self.ida_kernwin.simpleline_t
191 |             item = self.ida_hexrays.ctree_item_t()
192 |             addr = None if i > 0 else cfunc.entry_ea
193 |             if cfunc.get_line_item(sl.line, 0, False, None, item, None):
194 |                 ds = item.dstr().split(": ")
195 |                 if len(ds) == 2:
196 |                     try:
197 |                         addr = int(ds[0], 16)
198 |                     except ValueError:
199 |                         pass
200 |             line = self.ida_lines.tag_remove(sl.line)
201 |             if len(pseudocode) > 0:
202 |                 pseudocode += "\n"
203 |             if addr is None:
204 |                 pseudocode += f"/* line: {i} */ {line}"
205 |             else:
206 |                 pseudocode += f"/* line: {i}, address: {addr} */ {line}"
207 | 
208 |         return pseudocode
209 | 
210 |     def disassemble_function(self,address: Annotated[int, "Address of the function to disassemble"]) -> str:
211 |         """Get assembly code (address: instruction; comment) for a function"""
212 |         func = self.idaapi.get_func(address)
213 |         if not func:
214 |             raise IDAError(f"No function found at address {address}")
215 | 
216 |         # TODO: add labels
217 |         disassembly = ""
218 |         for address in self.ida_funcs.func_item_iterator_t(func):
219 |             if len(disassembly) > 0:
220 |                 disassembly += "\n"
221 |             disassembly += f"{address}: "
222 |             disassembly += self.idaapi.generate_disasm_line(address, self.idaapi.GENDSM_REMOVE_TAGS)
223 |             comment = self.idaapi.get_cmt(address, False)
224 |             if not comment:
225 |                 comment = self.idaapi.get_cmt(address, True)
226 |             if comment:
227 |                 disassembly += f"; {comment}"
228 |         return disassembly
229 | 
230 |     def get_xrefs_to(self,address: Annotated[int, "Address to get cross references to"]) -> list[Xref]:
231 |         """Get all cross references to the given address"""
232 |         xrefs = []
233 |         xref: self.ida_xref.xrefblk_t
234 |         for xref in self.idautils.XrefsTo(address):
235 |             xrefs.append({
236 |                 "address": xref.frm,
237 |                 "type": "code" if xref.iscode else "data",
238 |                 "function": self.get_function(xref.frm, raise_error=False),
239 |             })
240 |         return xrefs
241 | 
242 |     def get_entry_points(self) -> list[Function]:
243 |         """Get all entry points in the database"""
244 |         result = []
245 |         for i in range(self.ida_entry.get_entry_qty()):
246 |             ordinal = self.ida_entry.get_entry_ordinal(i)
247 |             address = self.ida_entry.get_entry(ordinal)
248 |             func = self.get_function(address, raise_error=False)
249 |             if func is not None:
250 |                 result.append(func)
251 |         return result
252 | 
253 |     def set_decompiler_comment(self,address: Annotated[int, "Address in the function to set the comment for"],comment: Annotated[str, "Comment text (not shown in the disassembly)"]):
254 |         """Set a comment for a given address in the function pseudocode"""
255 | 
256 |         # Reference: https://cyber.wtf/2019/03/22/using-ida-python-to-analyze-trickbot/
257 |         # Check if the address corresponds to a line
258 |         cfunc = self.decompile_checked(address)
259 | 
260 |         # Special case for function entry comments
261 |         if address == cfunc.entry_ea:
262 |             self.idc.set_func_cmt(address, comment, True)
263 |             cfunc.refresh_func_ctext()
264 |             return
265 | 
266 |         eamap = cfunc.get_eamap()
267 |         if address not in eamap:
268 |             raise IDAError(f"Failed to set comment at {address}")
269 |         nearest_ea = eamap[address][0].ea
270 | 
271 |         # Remove existing orphan comments
272 |         if cfunc.has_orphan_cmts():
273 |             cfunc.del_orphan_cmts()
274 |             cfunc.save_user_cmts()
275 | 
276 |         # Set the comment by trying all possible item types
277 |         tl = self.idaapi.treeloc_t()
278 |         tl.ea = nearest_ea
279 |         for itp in range(self.idaapi.ITP_SEMI, self.idaapi.ITP_COLON):
280 |             tl.itp = itp
281 |             cfunc.set_user_cmt(tl, comment)
282 |             cfunc.save_user_cmts()
283 |             cfunc.refresh_func_ctext()
284 |             if not cfunc.has_orphan_cmts():
285 |                 return
286 |             cfunc.del_orphan_cmts()
287 |             cfunc.save_user_cmts()
288 |         raise IDAError(f"Failed to set comment at {address}")
289 |     
290 | 
291 |     def set_disassembly_comment(self,address: Annotated[int, "Address in the function to set the comment for"],comment: Annotated[str, "Comment text (not shown in the pseudocode)"]):
292 |         """Set a comment for a given address in the function disassembly"""
293 |         if not self.idaapi.set_cmt(address, comment, False):
294 |             raise IDAError(f"Failed to set comment at {address}")
295 | 
296 |     def refresh_decompiler_widget(self):
297 |         widget = self.ida_kernwin.get_current_widget()
298 |         if widget is not None:
299 |             vu = self.ida_hexrays.get_widget_vdui(widget)
300 |             if vu is not None:
301 |                 vu.refresh_ctext()
302 | 
303 |     def refresh_decompiler_ctext(self,function_address: int):
304 |         error = self.ida_hexrays.hexrays_failure_t()
305 |         cfunc: self.ida_hexrays.cfunc_t = self.ida_hexrays.decompile_func(function_address, error, self.ida_hexrays.DECOMP_WARNINGS)
306 |         if cfunc:
307 |             cfunc.refresh_func_ctext()
308 |             
309 |     def rename_local_variable(self,function_address: Annotated[int, "Address of the function containing the variable"],old_name: Annotated[str, "Current name of the variable"],new_name: Annotated[str, "New name for the variable"]):
310 |         """Rename a local variable in a function"""
311 |         func = self.idaapi.get_func(function_address)
312 |         if not func:
313 |             raise IDAError(f"No function found at address {function_address}")
314 |         if not self.ida_hexrays.rename_lvar(func.start_ea, old_name, new_name):
315 |             raise IDAError(f"Failed to rename local variable {old_name} in function {func.start_ea}")
316 |         self.refresh_decompiler_ctext(func.start_ea)
317 | 
318 |     def rename_function(self,function_address: Annotated[int, "Address of the function to rename"],new_name: Annotated[str, "New name for the function"]):
319 |         """Rename a function"""
320 |         fn = self.idaapi.get_func(function_address)
321 |         if not fn:
322 |             raise IDAError(f"No function found at address {function_address}")
323 |         if not self.idaapi.set_name(fn.start_ea, new_name):
324 |             raise IDAError(f"Failed to rename function {fn.start_ea} to {new_name}")
325 |         self.refresh_decompiler_ctext(fn.start_ea)
326 |     
327 |     def set_function_prototype(self,function_address: Annotated[int, "Address of the function"],prototype: Annotated[str, "New function prototype"]) -> str:
328 |         """Set a function's prototype"""
329 |         fn = self.idaapi.get_func(function_address)
330 |         if not fn:
331 |             raise IDAError(f"No function found at address {function_address}")
332 |         try:
333 |             tif = self.ida_typeinf.tinfo_t(prototype, None, self.ida_typeinf.PT_SIL)
334 |             if not tif.is_func():
335 |                 raise IDAError(f"Parsed declaration is not a function type")
336 |             if not self.ida_typeinf.apply_tinfo(fn.start_ea, tif, self.ida_typeinf.PT_SIL):
337 |                 raise IDAError(f"Failed to apply type")
338 |             self.refresh_decompiler_ctext(fn.start_ea)
339 |         except Exception as e:
340 |             raise IDAError(f"Failed to parse prototype string: {prototype}")
341 |     def save_idb_file(self,save_path: Annotated[str, "Path to save the IDB file"]):
342 |         self.ida_loader.save_database(save_path, 0)
```