# Directory Structure
```
├── .github
│ ├── FUNDING.yml
│ └── logo.png
├── .gitignore
├── .python-version
├── ida-plugin.json
├── LICENSE
├── pyproject.toml
├── README.md
├── src
│ └── ida_pro_mcp
│ ├── __init__.py
│ ├── __main__.py
│ ├── idalib_server.py
│ ├── mcp-plugin.py
│ └── server.py
├── uv-package.sh
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
1 | 3.11
2 |
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | # Python-generated files
2 | __pycache__/
3 | *.py[oc]
4 | build/
5 | dist/
6 | wheels/
7 | *.egg-info
8 |
9 | # Virtual environments
10 | .venv*/
11 | venv*/
12 |
13 | # Logs
14 | *.log
15 |
16 | # Test files
17 | *.idb
18 | *.i64
19 | *.nam
20 | *.id0
21 | *.id1
22 | *.id2
23 | *.til
24 | *.elf
25 |
26 | # Generated server files
27 | server_generated.py
28 |
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
1 | # IDA Pro MCP
2 |
3 | Simple [MCP Server](https://modelcontextprotocol.io/introduction) to allow vibe reversing in IDA Pro.
4 |
5 | https://github.com/user-attachments/assets/6ebeaa92-a9db-43fa-b756-eececce2aca0
6 |
7 | The binaries and prompt for the video are available in the [mcp-reversing-dataset](https://github.com/mrexodia/mcp-reversing-dataset) repository.
8 |
9 | Available functionality:
10 |
11 | - `check_connection()`: Check if the IDA plugin is running.
12 | - `get_metadata()`: Get metadata about the current IDB.
13 | - `get_function_by_name(name)`: Get a function by its name.
14 | - `get_function_by_address(address)`: Get a function by its address.
15 | - `get_current_address()`: Get the address currently selected by the user.
16 | - `get_current_function()`: Get the function currently selected by the user.
17 | - `convert_number(text, size)`: Convert a number (decimal, hexadecimal) to different representations.
18 | - `list_functions_filter(offset, count, filter)`: List matching functions in the database (paginated).
19 | - `list_functions(offset, count)`: List all functions in the database (paginated).
20 | - `list_globals_filter(offset, count, filter)`: List matching globals in the database (paginated, filtered).
21 | - `list_globals(offset, count)`: List all globals in the database (paginated).
22 | - `list_imports(offset, count)`: List all imported symbols with their name and module (paginated).
23 | - `list_strings_filter(offset, count, filter)`: List matching strings in the database (paginated, filtered).
24 | - `list_strings(offset, count)`: List all strings in the database (paginated).
25 | - `list_local_types()`: List all Local types in the database.
26 | - `decompile_function(address)`: Decompile a function at the given address.
27 | - `disassemble_function(start_address)`: Get assembly code for a function (API-compatible with older IDA builds).
28 | - `get_xrefs_to(address)`: Get all cross references to the given address.
29 | - `get_xrefs_to_field(struct_name, field_name)`: Get all cross references to a named struct field (member).
30 | - `get_callees(function_address)`: Get all the functions called (callees) by the function at function_address.
31 | - `get_callers(function_address)`: Get all callers of the given address.
32 | - `get_entry_points()`: Get all entry points in the database.
33 | - `set_comment(address, comment)`: Set a comment for a given address in the function disassembly and pseudocode.
34 | - `rename_local_variable(function_address, old_name, new_name)`: Rename a local variable in a function.
35 | - `rename_global_variable(old_name, new_name)`: Rename a global variable.
36 | - `set_global_variable_type(variable_name, new_type)`: Set a global variable's type.
37 | - `patch_address_assembles(address, instructions)`: <no description>.
38 | - `get_global_variable_value_by_name(variable_name)`: Read a global variable's value (if known at compile-time).
39 | - `get_global_variable_value_at_address(address)`: Read a global variable's value by its address (if known at compile-time).
40 | - `rename_function(function_address, new_name)`: Rename a function.
41 | - `set_function_prototype(function_address, prototype)`: Set a function's prototype.
42 | - `declare_c_type(c_declaration)`: Create or update a local type from a C declaration.
43 | - `set_local_variable_type(function_address, variable_name, new_type)`: Set a local variable's type.
44 | - `get_stack_frame_variables(function_address)`: Retrieve the stack frame variables for a given function.
45 | - `get_defined_structures()`: Returns a list of all defined structures.
46 | - `analyze_struct_detailed(name)`: Detailed analysis of a structure with all fields.
47 | - `get_struct_at_address(address, struct_name)`: Get structure field values at a specific address.
48 | - `get_struct_info_simple(name)`: Simple function to get basic structure information.
49 | - `search_structures(filter)`: Search for structures by name pattern.
50 | - `rename_stack_frame_variable(function_address, old_name, new_name)`: Change the name of a stack variable for an IDA
51 | function.
52 | - `create_stack_frame_variable(function_address, offset, variable_name, type_name)`: For a given function, create a stack variable at an offset and with a specific type.
53 | - `set_stack_frame_variable_type(function_address, variable_name, type_name)`: For a given disassembled function, set
54 | the type of a stack variable.
55 | - `delete_stack_frame_variable(function_address, variable_name)`: Delete the named stack variable for a given function.
56 | - `read_memory_bytes(memory_address, size)`: Read bytes at a given address.
57 | - `data_read_byte(address)`: Read the 1 byte value at the specified address.
58 | - `data_read_word(address)`: Read the 2 byte value at the specified address as a WORD.
59 | - `data_read_dword(address)`: Read the 4 byte value at the specified address as a DWORD.
60 | - `data_read_qword(address)`: Read the 8 byte value at the specified address as a QWORD.
61 | - `data_read_string(address)`: Read the string at the specified address.
62 |
63 | Unsafe functions (`--unsafe` flag required):
64 |
65 | - `dbg_get_registers()`: Get all registers and their values. This function is only available when debugging.
66 | - `dbg_get_call_stack()`: Get the current call stack.
67 | - `dbg_list_breakpoints()`: List all breakpoints in the program.
68 | - `dbg_start_process()`: Start the debugger, returns the current instruction pointer.
69 | - `dbg_exit_process()`: Exit the debugger.
70 | - `dbg_continue_process()`: Continue the debugger, returns the current instruction pointer.
71 | - `dbg_run_to(address)`: Run the debugger to the specified address.
72 | - `dbg_set_breakpoint(address)`: Set a breakpoint at the specified address.
73 | - `dbg_step_into()`: Step into the current instruction.
74 | - `dbg_step_over()`: Step over the current instruction.
75 | - `dbg_delete_breakpoint(address)`: Delete a breakpoint at the specified address.
76 | - `dbg_enable_breakpoint(address, enable)`: Enable or disable a breakpoint at the specified address.
77 |
78 | ## Prerequisites
79 |
80 | - [Python](https://www.python.org/downloads/) (**3.11 or higher**)
81 | - Use `idapyswitch` to switch to the newest Python version
82 | - [IDA Pro](https://hex-rays.com/ida-pro) (8.3 or higher, 9 recommended), **IDA Free is not supported**
83 | - Supported MCP Client (pick one you like)
84 | - [Cline](https://cline.bot)
85 | - [Roo Code](https://roocode.com)
86 | - [Claude](https://claude.ai/download)
87 | - [Cursor](https://cursor.com)
88 | - [VSCode Agent Mode](https://github.blog/news-insights/product-news/github-copilot-agent-mode-activated/)
89 | - [Windsurf](https://windsurf.com)
90 | - [Other MCP Clients](https://modelcontextprotocol.io/clients#example-clients): Run `ida-pro-mcp --config` to get the JSON config for your client.
91 |
92 | ## Installation
93 |
94 | Install the latest version of the IDA Pro MCP package:
95 |
96 | ```sh
97 | pip uninstall ida-pro-mcp
98 | pip install https://github.com/mrexodia/ida-pro-mcp/archive/refs/heads/main.zip
99 | ```
100 |
101 | Configure the MCP servers and install the IDA Plugin:
102 |
103 | ```
104 | ida-pro-mcp --install
105 | ```
106 |
107 | **Important**: Make sure you completely restart IDA/Visual Studio Code/Claude for the installation to take effect. Claude runs in the background and you need to quit it from the tray icon.
108 |
109 | https://github.com/user-attachments/assets/65ed3373-a187-4dd5-a807-425dca1d8ee9
110 |
111 | _Note_: You need to load a binary in IDA before the plugin menu will show up.
112 |
113 | ## Prompt Engineering
114 |
115 | LLMs are prone to hallucinations and you need to be specific with your prompting. For reverse engineering the conversion between integers and bytes are especially problematic. Below is a minimal example prompt, feel free to start a discussion or open an issue if you have good results with a different prompt:
116 |
117 | > Your task is to analyze a crackme in IDA Pro. You can use the MCP tools to retrieve information. In general use the following strategy:
118 | > - Inspect the decompilation and add comments with your findings
119 | > - Rename variables to more sensible names
120 | > - Change the variable and argument types if necessary (especially pointer and array types)
121 | > - Change function names to be more descriptive
122 | > - If more details are necessary, disassemble the function and add comments with your findings
123 | > - NEVER convert number bases yourself. Use the convert_number MCP tool if needed!
124 | > - Do not attempt brute forcing, derive any solutions purely from the disassembly and simple python scripts
125 | > - Create a report.md with your findings and steps taken at the end
126 | > - When you find a solution, prompt to user for feedback with the password you found
127 |
128 | This prompt was just the first experiment, please share if you found ways to improve the output!
129 |
130 | Live stream discussing prompting and showing some real-world malware analysis:
131 |
132 | [](https://www.youtube.com/watch?v=iFxNuk3kxhk)
133 |
134 | ## Tips for Enhancing LLM Accuracy
135 |
136 | Large Language Models (LLMs) are powerful tools, but they can sometimes struggle with complex mathematical calculations or exhibit "hallucinations" (making up facts). Make sure to tell the LLM to use the `conver_number` MCP and you might also need [math-mcp](https://github.com/EthanHenrickson/math-mcp) for certain operations.
137 |
138 | Another thing to keep in mind is that LLMs will not perform well on obfuscated code. Before trying to use an LLM to solve the problem, take a look around the binary and spend some time (automatically) removing the following things:
139 |
140 | - String encryption
141 | - Import hashing
142 | - Control flow flattening
143 | - Code encryption
144 | - Anti-decompilation tricks
145 |
146 | You should also use a tool like Lumina or FLIRT to try and resolve all the open source library code and the C++ STL, this will further improve the accuracy.
147 |
148 | ## SSE Transport & Headless MCP
149 |
150 | You can run an SSE server to connect to the user interface like this:
151 |
152 | ```sh
153 | uv run ida-pro-mcp --transport http://127.0.0.1:8744/sse
154 | ```
155 |
156 | After installing [`idalib`](https://docs.hex-rays.com/user-guide/idalib) you can also run a headless SSE server:
157 |
158 | ```sh
159 | uv run idalib-mcp --host 127.0.0.1 --port 8745 path/to/executable
160 | ```
161 |
162 | _Note_: The `idalib` feature was contributed by [Willi Ballenthin](https://github.com/williballenthin).
163 |
164 | ## Manual Installation
165 |
166 | _Note_: This section is for LLMs and power users who need detailed installation instructions.
167 |
168 | <details>
169 |
170 | ## Manual MCP Server Installation (Cline/Roo Code)
171 |
172 | To install the MCP server yourself, follow these steps:
173 |
174 | 1. Install [uv](https://github.com/astral-sh/uv) globally:
175 | - Windows: `pip install uv`
176 | - Linux/Mac: `curl -LsSf https://astral.sh/uv/install.sh | sh`
177 | 2. Clone this repository, for this example `C:\MCP\ida-pro-mcp`.
178 | 3. Navigate to the Cline/Roo Code _MCP Servers_ configuration (see screenshot).
179 | 4. Click on the _Installed_ tab.
180 | 5. Click on _Configure MCP Servers_, which will open `cline_mcp_settings.json`.
181 | 6. Add the `ida-pro-mcp` server:
182 |
183 | ```json
184 | {
185 | "mcpServers": {
186 | "github.com/mrexodia/ida-pro-mcp": {
187 | "command": "uv",
188 | "args": [
189 | "--directory",
190 | "c:\\MCP\\ida-pro-mcp",
191 | "run",
192 | "server.py",
193 | "--install-plugin"
194 | ],
195 | "timeout": 1800,
196 | "disabled": false
197 | }
198 | }
199 | }
200 | ```
201 |
202 | To check if the connection works you can perform the following tool call:
203 |
204 | ```
205 | <use_mcp_tool>
206 | <server_name>github.com/mrexodia/ida-pro-mcp</server_name>
207 | <tool_name>check_connection</tool_name>
208 | <arguments></arguments>
209 | </use_mcp_tool>
210 | ```
211 |
212 | ## IDA Plugin installation
213 |
214 | The IDA Pro plugin will be installed automatically when the MCP server starts. If you disabled the `--install-plugin` option, use the following steps:
215 |
216 | 1. Copy (**not move**) `src/ida_pro_mcp/mcp-plugin.py` in your plugins folder (`%appdata%\Hex-Rays\IDA Pro\plugins` on Windows).
217 | 2. Open an IDB and click `Edit -> Plugins -> MCP` to start the server.
218 |
219 | </details>
220 |
221 | ## Comparison with other MCP servers
222 |
223 | There are a few IDA Pro MCP servers floating around, but I created my own for a few reasons:
224 |
225 | 1. Installation should be fully automated.
226 | 2. The architecture of other plugins make it difficult to add new functionality quickly (too much boilerplate of unnecessary dependencies).
227 | 3. Learning new technologies is fun!
228 |
229 | If you want to check them out, here is a list (in the order I discovered them):
230 |
231 | - https://github.com/taida957789/ida-mcp-server-plugin (SSE protocol only, requires installing dependencies in IDAPython).
232 | - https://github.com/fdrechsler/mcp-server-idapro (MCP Server in TypeScript, excessive boilerplate required to add new functionality).
233 | - https://github.com/MxIris-Reverse-Engineering/ida-mcp-server (custom socket protocol, boilerplate).
234 |
235 | Feel free to open a PR to add your IDA Pro MCP server here.
236 |
237 | ## Development
238 |
239 | Adding new features is a super easy and streamlined process. All you have to do is add a new `@jsonrpc` function to [`mcp-plugin.py`](https://github.com/mrexodia/ida-pro-mcp/blob/164df8cf4ae251cc9cc0f464591fa6df8e0d9df4/src/ida_pro_mcp/mcp-plugin.py#L406-L419) and your function will be available in the MCP server without any additional boilerplate! Below is a video where I add the `get_metadata` function in less than 2 minutes (including testing):
240 |
241 | https://github.com/user-attachments/assets/951de823-88ea-4235-adcb-9257e316ae64
242 |
243 | To test the MCP server itself:
244 |
245 | ```sh
246 | uv run mcp dev src/ida_pro_mcp/server.py
247 | ```
248 |
249 | This will open a web interface at http://localhost:5173 and allow you to interact with the MCP tools for testing.
250 |
251 | For testing I create a symbolic link to the IDA plugin and then POST a JSON-RPC request directly to `http://localhost:13337/mcp`. After [enabling symbolic links](https://learn.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development) you can run the following command:
252 |
253 | ```sh
254 | uv run ida-pro-mcp --install
255 | ```
256 |
257 | Generate the changelog of direct commits to `main`:
258 |
259 | ```sh
260 | git log --first-parent --no-merges 1.2.0..main "--pretty=- %s"
261 | ```
262 |
263 |
```
--------------------------------------------------------------------------------
/src/ida_pro_mcp/__init__.py:
--------------------------------------------------------------------------------
```python
1 |
```
--------------------------------------------------------------------------------
/uv-package.sh:
--------------------------------------------------------------------------------
```bash
1 | #!/bin/bash
2 | uv run ida-pro-mcp --generate-docs
3 | rm -rf dist
4 | uv build
5 | uv publish
```
--------------------------------------------------------------------------------
/src/ida_pro_mcp/__main__.py:
--------------------------------------------------------------------------------
```python
1 | import sys
2 | from ida_pro_mcp.server import main
3 |
4 | if __name__ == "__main__":
5 | sys.argv[0] = "ida_pro_mcp"
6 | main()
7 |
```
--------------------------------------------------------------------------------
/ida-plugin.json:
--------------------------------------------------------------------------------
```json
1 | {
2 | "IDAMetadataDescriptorVersion": 1,
3 | "plugin": {
4 | "name": "IDA Pro MCP",
5 | "entryPoint": "src/ida_pro_mcp/mcp-plugin.py",
6 | "categories": ["api-scripting-and-automation", "debugging-and-tracing", "malware-analysis"],
7 | "logoPath": ".github/logo.png",
8 | "description": "AI-powered reverse engineering assistant that bridges IDA Pro with language models through MCP.",
9 | "idaVersions": ">=8.3",
10 | "version": "1.6.0"
11 | }
12 | }
```
--------------------------------------------------------------------------------
/.github/FUNDING.yml:
--------------------------------------------------------------------------------
```yaml
1 | # These are supported funding model platforms
2 |
3 | github: [mrexodia]
4 | #patreon: # Replace with a single Patreon username
5 | #open_collective: # Replace with a single Open Collective username
6 | #ko_fi: # Replace with a single Ko-fi username
7 | #tidelift: # Replace with a single Tidelift platform-name/package-name e.g., npm/babel
8 | #community_bridge: # Replace with a single Community Bridge project-name e.g., cloud-foundry
9 | #liberapay: # Replace with a single Liberapay username
10 | #issuehunt: x64dbg/x64dbg # Replace with a single IssueHunt username
11 | #otechie: # Replace with a single Otechie username
12 |
13 |
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
1 | [project]
2 | name = "ida-pro-mcp"
3 | version = "1.5.0"
4 | description = "Vibe reversing with IDA Pro"
5 | readme = "README.md"
6 | requires-python = ">=3.11"
7 | authors = [{ name = "mrexodia" }]
8 | keywords = ["ida", "mcp", "llm", "plugin"]
9 | classifiers = [
10 | "Development Status :: 5 - Production/Stable",
11 | "Intended Audience :: Developers",
12 | "Programming Language :: Python :: 3",
13 | "Programming Language :: Python :: 3.11",
14 | "Operating System :: MacOS",
15 | "Operating System :: Microsoft :: Windows",
16 | ]
17 | dependencies = [
18 | "mcp>=1.16.0",
19 | ]
20 |
21 | [project.urls]
22 | Repository = "https://github.com/mrexodia/ida-pro-mcp"
23 | Issues = "https://github.com/mrexodia/ida-pro-mcp/issues"
24 |
25 | [build-system]
26 | requires = ["setuptools"]
27 | build-backend = "setuptools.build_meta"
28 |
29 | [dependency-groups]
30 | dev = [
31 | "mcp[cli]>=1.6.0",
32 | "ty>=0.0.1a21",
33 | ]
34 |
35 | [project.scripts]
36 | ida-pro-mcp = "ida_pro_mcp.server:main"
37 | idalib-mcp = "ida_pro_mcp.idalib_server:main"
38 |
39 | [tool.pyright]
40 | include = ["src/ida_pro_mcp"]
41 | exclude = ["src/ida_pro_mcp/server_generated.py"]
42 |
```
--------------------------------------------------------------------------------
/src/ida_pro_mcp/idalib_server.py:
--------------------------------------------------------------------------------
```python
1 | import sys
2 | import inspect
3 | import logging
4 | import argparse
5 | import importlib
6 | from pathlib import Path
7 | import typing_inspection.introspection as intro
8 |
9 | from mcp.server.fastmcp import FastMCP
10 |
11 | # idapro must go first to initialize idalib
12 | import idapro
13 |
14 | import ida_auto
15 | import ida_hexrays
16 |
17 | logger = logging.getLogger(__name__)
18 |
19 | mcp = FastMCP("github.com/mrexodia/ida-pro-mcp#idalib")
20 |
21 | def fixup_tool_argument_descriptions(mcp: FastMCP):
22 | # In our tool definitions within `mcp-plugin.py`, we use `typing.Annotated` on function parameters
23 | # to attach documentation. For example:
24 | #
25 | # def get_function_by_name(
26 | # name: Annotated[str, "Name of the function to get"]
27 | # ) -> Function:
28 | # """Get a function by its name"""
29 | # ...
30 | #
31 | # However, the interpretation of Annotated is left up to static analyzers and other tools.
32 | # FastMCP doesn't have any special handling for these comments, so we splice them into the
33 | # tool metadata ourselves here.
34 | #
35 | # Example, before:
36 | #
37 | # tool.parameter={
38 | # properties: {
39 | # name: {
40 | # title: "Name",
41 | # type: "string"
42 | # }
43 | # },
44 | # required: ["name"],
45 | # title: "get_function_by_nameArguments",
46 | # type: "object"
47 | # }
48 | #
49 | # Example, after:
50 | #
51 | # tool.parameter={
52 | # properties: {
53 | # name: {
54 | # title: "Name",
55 | # type: "string"
56 | # description: "Name of the function to get"
57 | # }
58 | # },
59 | # required: ["name"],
60 | # title: "get_function_by_nameArguments",
61 | # type: "object"
62 | # }
63 | #
64 | # References:
65 | # - https://docs.python.org/3/library/typing.html#typing.Annotated
66 | # - https://fastapi.tiangolo.com/python-types/#type-hints-with-metadata-annotations
67 |
68 | # unfortunately, FastMCP.list_tools() is async, so we break with best practices and reach into `._tool_manager`
69 | # rather than spinning up an asyncio runtime just to fetch the (non-async) list of tools.
70 | for tool in mcp._tool_manager.list_tools():
71 | sig = inspect.signature(tool.fn)
72 | for name, parameter in sig.parameters.items():
73 | # this instance is a raw `typing._AnnotatedAlias` that we can't do anything with directly.
74 | # it renders like:
75 | #
76 | # typing.Annotated[str, 'Name of the function to get']
77 | if not parameter.annotation:
78 | continue
79 |
80 | # this instance will look something like:
81 | #
82 | # InspectedAnnotation(type=<class 'str'>, qualifiers=set(), metadata=['Name of the function to get'])
83 | #
84 | annotation = intro.inspect_annotation(
85 | parameter.annotation,
86 | annotation_source=intro.AnnotationSource.ANY
87 | )
88 |
89 | # for our use case, where we attach a single string annotation that is meant as documentation,
90 | # we extract that string and assign it to "description" in the tool metadata.
91 |
92 | if annotation.type is not str:
93 | continue
94 |
95 | if len(annotation.metadata) != 1:
96 | continue
97 |
98 | description = annotation.metadata[0]
99 | if not isinstance(description, str):
100 | continue
101 |
102 | logger.debug("adding parameter documentation %s(%s='%s')", tool.name, name, description)
103 | tool.parameters["properties"][name]["description"] = description
104 |
105 | def main():
106 | parser = argparse.ArgumentParser(description="MCP server for IDA Pro via idalib")
107 | parser.add_argument("--verbose", "-v", action="store_true", help="Show debug messages")
108 | parser.add_argument("--host", type=str, default="127.0.0.1", help="Host to listen on, default: 127.0.0.1")
109 | parser.add_argument("--port", type=int, default=8745, help="Port to listen on, default: 8745")
110 | parser.add_argument("--unsafe", action="store_true", help="Enable unsafe functions (DANGEROUS)")
111 | parser.add_argument("input_path", type=Path, help="Path to the input file to analyze.")
112 | args = parser.parse_args()
113 |
114 | if args.verbose:
115 | log_level = logging.DEBUG
116 | idapro.enable_console_messages(True)
117 | else:
118 | log_level = logging.INFO
119 | idapro.enable_console_messages(False)
120 |
121 | mcp.settings.log_level = logging.getLevelName(log_level)
122 | mcp.settings.host = args.host
123 | mcp.settings.port = args.port
124 | logging.basicConfig(level=log_level)
125 |
126 | # reset logging levels that might be initialized in idapythonrc.py
127 | # which is evaluated during import of idalib.
128 | logging.getLogger().setLevel(log_level)
129 |
130 | if not args.input_path.exists():
131 | raise FileNotFoundError(f"Input file not found: {args.input_path}")
132 |
133 | # TODO: add a tool for specifying the idb/input file (sandboxed)
134 | logger.info("opening database: %s", args.input_path)
135 | if idapro.open_database(str(args.input_path), run_auto_analysis=True):
136 | raise RuntimeError("failed to analyze input file")
137 |
138 | logger.debug("idalib: waiting for analysis...")
139 | ida_auto.auto_wait()
140 |
141 | if not ida_hexrays.init_hexrays_plugin():
142 | raise RuntimeError("failed to initialize Hex-Rays decompiler")
143 |
144 | plugin = importlib.import_module("ida_pro_mcp.mcp-plugin")
145 | logger.debug("adding tools...")
146 | for name, callable in plugin.rpc_registry.methods.items():
147 | if args.unsafe or name not in plugin.rpc_registry.unsafe:
148 | logger.debug("adding tool: %s: %s", name, callable)
149 | mcp.add_tool(callable, name)
150 |
151 | # NOTE: https://github.com/modelcontextprotocol/python-sdk/issues/466
152 | fixup_tool_argument_descriptions(mcp)
153 |
154 | # NOTE: npx @modelcontextprotocol/inspector for debugging
155 | logger.info("MCP Server availabile at: http://%s:%d/sse", mcp.settings.host, mcp.settings.port)
156 | try:
157 | mcp.run(transport="sse")
158 | except KeyboardInterrupt:
159 | pass
160 |
161 | if __name__ == "__main__":
162 | main()
163 |
```
--------------------------------------------------------------------------------
/src/ida_pro_mcp/server.py:
--------------------------------------------------------------------------------
```python
1 | import os
2 | import sys
3 | import ast
4 | import json
5 | import shutil
6 | import argparse
7 | import http.client
8 | from urllib.parse import urlparse
9 | from glob import glob
10 |
11 | from mcp.server.fastmcp import FastMCP
12 |
13 | # The log_level is necessary for Cline to work: https://github.com/jlowin/fastmcp/issues/81
14 | mcp = FastMCP("ida-pro-mcp", log_level="ERROR")
15 |
16 | jsonrpc_request_id = 1
17 | ida_host = "127.0.0.1"
18 | ida_port = 13337
19 |
20 | def make_jsonrpc_request(method: str, *params):
21 | """Make a JSON-RPC request to the IDA plugin"""
22 | global jsonrpc_request_id, ida_host, ida_port
23 | conn = http.client.HTTPConnection(ida_host, ida_port)
24 | request = {
25 | "jsonrpc": "2.0",
26 | "method": method,
27 | "params": list(params),
28 | "id": jsonrpc_request_id,
29 | }
30 | jsonrpc_request_id += 1
31 |
32 | try:
33 | conn.request("POST", "/mcp", json.dumps(request), {
34 | "Content-Type": "application/json"
35 | })
36 | response = conn.getresponse()
37 | data = json.loads(response.read().decode())
38 |
39 | if "error" in data:
40 | error = data["error"]
41 | code = error["code"]
42 | message = error["message"]
43 | pretty = f"JSON-RPC error {code}: {message}"
44 | if "data" in error:
45 | pretty += "\n" + error["data"]
46 | raise Exception(pretty)
47 |
48 | result = data["result"]
49 | # NOTE: LLMs do not respond well to empty responses
50 | if result is None:
51 | result = "success"
52 | return result
53 | except Exception:
54 | raise
55 | finally:
56 | conn.close()
57 |
58 | @mcp.tool()
59 | def check_connection() -> str:
60 | """Check if the IDA plugin is running"""
61 | try:
62 | metadata = make_jsonrpc_request("get_metadata")
63 | return f"Successfully connected to IDA Pro (open file: {metadata['module']})"
64 | except Exception as e:
65 | if sys.platform == "darwin":
66 | shortcut = "Ctrl+Option+M"
67 | else:
68 | shortcut = "Ctrl+Alt+M"
69 | return f"Failed to connect to IDA Pro! Did you run Edit -> Plugins -> MCP ({shortcut}) to start the server?"
70 |
71 | # Code taken from https://github.com/mrexodia/ida-pro-mcp (MIT License)
72 | class MCPVisitor(ast.NodeVisitor):
73 | def __init__(self):
74 | self.types: dict[str, ast.ClassDef] = {}
75 | self.functions: dict[str, ast.FunctionDef] = {}
76 | self.descriptions: dict[str, str] = {}
77 | self.unsafe: list[str] = []
78 |
79 | def visit_FunctionDef(self, node):
80 | for decorator in node.decorator_list:
81 | if isinstance(decorator, ast.Name):
82 | if decorator.id == "jsonrpc":
83 | for i, arg in enumerate(node.args.args):
84 | arg_name = arg.arg
85 | arg_type = arg.annotation
86 | if arg_type is None:
87 | raise Exception(f"Missing argument type for {node.name}.{arg_name}")
88 | if isinstance(arg_type, ast.Subscript):
89 | assert isinstance(arg_type.value, ast.Name)
90 | assert arg_type.value.id == "Annotated"
91 | assert isinstance(arg_type.slice, ast.Tuple)
92 | assert len(arg_type.slice.elts) == 2
93 | annot_type = arg_type.slice.elts[0]
94 | annot_description = arg_type.slice.elts[1]
95 | assert isinstance(annot_description, ast.Constant)
96 | node.args.args[i].annotation = ast.Subscript(
97 | value=ast.Name(id="Annotated", ctx=ast.Load()),
98 | slice=ast.Tuple(
99 | elts=[
100 | annot_type,
101 | ast.Call(
102 | func=ast.Name(id="Field", ctx=ast.Load()),
103 | args=[],
104 | keywords=[
105 | ast.keyword(
106 | arg="description",
107 | value=annot_description)])],
108 | ctx=ast.Load()),
109 | ctx=ast.Load())
110 | elif isinstance(arg_type, ast.Name):
111 | pass
112 | else:
113 | raise Exception(f"Unexpected type annotation for {node.name}.{arg_name} -> {type(arg_type)}")
114 |
115 | body_comment = node.body[0]
116 | if isinstance(body_comment, ast.Expr) and isinstance(body_comment.value, ast.Constant):
117 | new_body = [body_comment]
118 | self.descriptions[node.name] = body_comment.value.value
119 | else:
120 | new_body = []
121 |
122 | call_args = [ast.Constant(value=node.name)]
123 | for arg in node.args.args:
124 | call_args.append(ast.Name(id=arg.arg, ctx=ast.Load()))
125 | new_body.append(ast.Return(
126 | value=ast.Call(
127 | func=ast.Name(id="make_jsonrpc_request", ctx=ast.Load()),
128 | args=call_args,
129 | keywords=[])))
130 | decorator_list = [
131 | ast.Call(
132 | func=ast.Attribute(
133 | value=ast.Name(id="mcp", ctx=ast.Load()),
134 | attr="tool",
135 | ctx=ast.Load()),
136 | args=[],
137 | keywords=[]
138 | )
139 | ]
140 | node_nobody = ast.FunctionDef(node.name, node.args, new_body, decorator_list, node.returns, node.type_comment, lineno=node.lineno, col_offset=node.col_offset)
141 | assert node.name not in self.functions, f"Duplicate function: {node.name}"
142 | self.functions[node.name] = node_nobody
143 | elif decorator.id == "unsafe":
144 | self.unsafe.append(node.name)
145 |
146 | def visit_ClassDef(self, node):
147 | for base in node.bases:
148 | if isinstance(base, ast.Name):
149 | if base.id == "TypedDict":
150 | self.types[node.name] = node
151 |
152 |
153 | SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
154 | IDA_PLUGIN_PY = os.path.join(SCRIPT_DIR, "mcp-plugin.py")
155 | GENERATED_PY = os.path.join(SCRIPT_DIR, "server_generated.py")
156 |
157 | # NOTE: This is in the global scope on purpose
158 | if not os.path.exists(IDA_PLUGIN_PY):
159 | raise RuntimeError(f"IDA plugin not found at {IDA_PLUGIN_PY} (did you move it?)")
160 | with open(IDA_PLUGIN_PY, "r", encoding="utf-8") as f:
161 | code = f.read()
162 | module = ast.parse(code, IDA_PLUGIN_PY)
163 | visitor = MCPVisitor()
164 | visitor.visit(module)
165 | code = """# NOTE: This file has been automatically generated, do not modify!
166 | # Architecture based on https://github.com/mrexodia/ida-pro-mcp (MIT License)
167 | import sys
168 | if sys.version_info >= (3, 12):
169 | from typing import Annotated, Optional, TypedDict, Generic, TypeVar, NotRequired
170 | else:
171 | from typing_extensions import Annotated, Optional, TypedDict, Generic, TypeVar, NotRequired
172 | from pydantic import Field
173 |
174 | T = TypeVar("T")
175 |
176 | """
177 | for type in visitor.types.values():
178 | code += ast.unparse(type)
179 | code += "\n\n"
180 | for function in visitor.functions.values():
181 | code += ast.unparse(function)
182 | code += "\n\n"
183 |
184 | try:
185 | if os.path.exists(GENERATED_PY):
186 | with open(GENERATED_PY, "rb") as f:
187 | existing_code_bytes = f.read()
188 | else:
189 | existing_code_bytes = b""
190 | code_bytes = code.encode("utf-8").replace(b"\r", b"")
191 | if code_bytes != existing_code_bytes:
192 | with open(GENERATED_PY, "wb") as f:
193 | f.write(code_bytes)
194 | except:
195 | print(f"Failed to generate code: {GENERATED_PY}", file=sys.stderr, flush=True)
196 |
197 | exec(compile(code, GENERATED_PY, "exec"))
198 |
199 | MCP_FUNCTIONS = ["check_connection"] + list(visitor.functions.keys())
200 | UNSAFE_FUNCTIONS = visitor.unsafe
201 | SAFE_FUNCTIONS = [f for f in MCP_FUNCTIONS if f not in UNSAFE_FUNCTIONS]
202 |
203 | def generate_readme():
204 | print("README:")
205 | print(f"- `check_connection()`: Check if the IDA plugin is running.")
206 | def get_description(name: str):
207 | function = visitor.functions[name]
208 | signature = function.name + "("
209 | for i, arg in enumerate(function.args.args):
210 | if i > 0:
211 | signature += ", "
212 | signature += arg.arg
213 | signature += ")"
214 | description = visitor.descriptions.get(function.name, "<no description>").strip().split("\n")[0]
215 | if description[-1] != ".":
216 | description += "."
217 | return f"- `{signature}`: {description}"
218 | for safe_function in SAFE_FUNCTIONS:
219 | if safe_function != "check_connection":
220 | print(get_description(safe_function))
221 | print("\nUnsafe functions (`--unsafe` flag required):\n")
222 | for unsafe_function in UNSAFE_FUNCTIONS:
223 | print(get_description(unsafe_function))
224 | print("\nMCP Config:")
225 | mcp_config = {
226 | "mcpServers": {
227 | "github.com/mrexodia/ida-pro-mcp": {
228 | "command": "uv",
229 | "args": [
230 | "--directory",
231 | "c:\\MCP\\ida-pro-mcp",
232 | "run",
233 | "server.py",
234 | "--install-plugin"
235 | ],
236 | "timeout": 1800,
237 | "disabled": False,
238 | }
239 | }
240 | }
241 | print(json.dumps(mcp_config, indent=2))
242 |
243 | def get_python_executable():
244 | """Get the path to the Python executable"""
245 | venv = os.environ.get("VIRTUAL_ENV")
246 | if venv:
247 | if sys.platform == "win32":
248 | python = os.path.join(venv, "Scripts", "python.exe")
249 | else:
250 | python = os.path.join(venv, "bin", "python3")
251 | if os.path.exists(python):
252 | return python
253 |
254 | for path in sys.path:
255 | if sys.platform == "win32":
256 | path = path.replace("/", "\\")
257 |
258 | split = path.split(os.sep)
259 | if split[-1].endswith(".zip"):
260 | path = os.path.dirname(path)
261 | if sys.platform == "win32":
262 | python_executable = os.path.join(path, "python.exe")
263 | else:
264 | python_executable = os.path.join(path, "..", "bin", "python3")
265 | python_executable = os.path.abspath(python_executable)
266 |
267 | if os.path.exists(python_executable):
268 | return python_executable
269 | return sys.executable
270 |
271 | def copy_python_env(env: dict[str, str]):
272 | # Reference: https://docs.python.org/3/using/cmdline.html#environment-variables
273 | python_vars = [
274 | "PYTHONHOME",
275 | "PYTHONPATH",
276 | "PYTHONSAFEPATH",
277 | "PYTHONPLATLIBDIR",
278 | "PYTHONPYCACHEPREFIX",
279 | "PYTHONNOUSERSITE",
280 | "PYTHONUSERBASE",
281 | ]
282 | # MCP servers are run without inheriting the environment, so we need to forward
283 | # the environment variables that affect Python's dependency resolution by hand.
284 | # Issue: https://github.com/mrexodia/ida-pro-mcp/issues/111
285 | result = False
286 | for var in python_vars:
287 | value = os.environ.get(var)
288 | if value:
289 | result = True
290 | env[var] = value
291 | return result
292 |
293 | def print_mcp_config():
294 | mcp_config = {
295 | "command": get_python_executable(),
296 | "args": [
297 | __file__,
298 | ],
299 | "timeout": 1800,
300 | "disabled": False,
301 | }
302 | env = {}
303 | if copy_python_env(env):
304 | print(f"[WARNING] Custom Python environment variables detected")
305 | mcp_config["env"] = env
306 | print(json.dumps({
307 | "mcpServers": {
308 | mcp.name: mcp_config
309 | }
310 | }, indent=2)
311 | )
312 |
313 | def install_mcp_servers(*, uninstall=False, quiet=False, env={}):
314 | if sys.platform == "win32":
315 | configs = {
316 | "Cline": (os.path.join(os.getenv("APPDATA", ""), "Code", "User", "globalStorage", "saoudrizwan.claude-dev", "settings"), "cline_mcp_settings.json"),
317 | "Roo Code": (os.path.join(os.getenv("APPDATA", ""), "Code", "User", "globalStorage", "rooveterinaryinc.roo-cline", "settings"), "mcp_settings.json"),
318 | "Kilo Code": (os.path.join(os.getenv("APPDATA", ""), "Code", "User", "globalStorage", "kilocode.kilo-code", "settings"), "mcp_settings.json"),
319 | "Claude": (os.path.join(os.getenv("APPDATA", ""), "Claude"), "claude_desktop_config.json"),
320 | "Cursor": (os.path.join(os.path.expanduser("~"), ".cursor"), "mcp.json"),
321 | "Windsurf": (os.path.join(os.path.expanduser("~"), ".codeium", "windsurf"), "mcp_config.json"),
322 | "Claude Code": (os.path.join(os.path.expanduser("~")), ".claude.json"),
323 | "LM Studio": (os.path.join(os.path.expanduser("~"), ".lmstudio"), "mcp.json"),
324 | }
325 | elif sys.platform == "darwin":
326 | configs = {
327 | "Cline": (os.path.join(os.path.expanduser("~"), "Library", "Application Support", "Code", "User", "globalStorage", "saoudrizwan.claude-dev", "settings"), "cline_mcp_settings.json"),
328 | "Roo Code": (os.path.join(os.path.expanduser("~"), "Library", "Application Support", "Code", "User", "globalStorage", "rooveterinaryinc.roo-cline", "settings"), "mcp_settings.json"),
329 | "Kilo Code": (os.path.join(os.path.expanduser("~"), "Library", "Application Support", "Code", "User", "globalStorage", "kilocode.kilo-code", "settings"), "mcp_settings.json"),
330 | "Claude": (os.path.join(os.path.expanduser("~"), "Library", "Application Support", "Claude"), "claude_desktop_config.json"),
331 | "Cursor": (os.path.join(os.path.expanduser("~"), ".cursor"), "mcp.json"),
332 | "Windsurf": (os.path.join(os.path.expanduser("~"), ".codeium", "windsurf"), "mcp_config.json"),
333 | "Claude Code": (os.path.join(os.path.expanduser("~")), ".claude.json"),
334 | "LM Studio": (os.path.join(os.path.expanduser("~"), ".lmstudio"), "mcp.json"),
335 | }
336 | elif sys.platform == "linux":
337 | configs = {
338 | "Cline": (os.path.join(os.path.expanduser("~"), ".config", "Code", "User", "globalStorage", "saoudrizwan.claude-dev", "settings"), "cline_mcp_settings.json"),
339 | "Roo Code": (os.path.join(os.path.expanduser("~"), ".config", "Code", "User", "globalStorage", "rooveterinaryinc.roo-cline", "settings"), "mcp_settings.json"),
340 | "Kilo Code": (os.path.join(os.path.expanduser("~"), ".config", "Code", "User", "globalStorage", "kilocode.kilo-code", "settings"), "mcp_settings.json"),
341 | # Claude not supported on Linux
342 | "Cursor": (os.path.join(os.path.expanduser("~"), ".cursor"), "mcp.json"),
343 | "Windsurf": (os.path.join(os.path.expanduser("~"), ".codeium", "windsurf"), "mcp_config.json"),
344 | "Claude Code": (os.path.join(os.path.expanduser("~")), ".claude.json"),
345 | "LM Studio": (os.path.join(os.path.expanduser("~"), ".lmstudio"), "mcp.json"),
346 | }
347 | else:
348 | print(f"Unsupported platform: {sys.platform}")
349 | return
350 |
351 | installed = 0
352 | for name, (config_dir, config_file) in configs.items():
353 | config_path = os.path.join(config_dir, config_file)
354 | if not os.path.exists(config_dir):
355 | action = "uninstall" if uninstall else "installation"
356 | if not quiet:
357 | print(f"Skipping {name} {action}\n Config: {config_path} (not found)")
358 | continue
359 | if not os.path.exists(config_path):
360 | config = {}
361 | else:
362 | with open(config_path, "r", encoding="utf-8") as f:
363 | data = f.read().strip()
364 | if len(data) == 0:
365 | config = {}
366 | else:
367 | try:
368 | config = json.loads(data)
369 | except json.decoder.JSONDecodeError:
370 | if not quiet:
371 | print(f"Skipping {name} uninstall\n Config: {config_path} (invalid JSON)")
372 | continue
373 | if "mcpServers" not in config:
374 | config["mcpServers"] = {}
375 | mcp_servers = config["mcpServers"]
376 | # Migrate old name
377 | old_name = "github.com/mrexodia/ida-pro-mcp"
378 | if old_name in mcp_servers:
379 | mcp_servers[mcp.name] = mcp_servers[old_name]
380 | del mcp_servers[old_name]
381 | if uninstall:
382 | if mcp.name not in mcp_servers:
383 | if not quiet:
384 | print(f"Skipping {name} uninstall\n Config: {config_path} (not installed)")
385 | continue
386 | del mcp_servers[mcp.name]
387 | else:
388 | # Copy environment variables from the existing server if present
389 | if mcp.name in mcp_servers:
390 | for key, value in mcp_servers[mcp.name].get("env", {}).items():
391 | env[key] = value
392 | if copy_python_env(env):
393 | print(f"[WARNING] Custom Python environment variables detected")
394 | mcp_servers[mcp.name] = {
395 | "command": get_python_executable(),
396 | "args": [
397 | __file__,
398 | ],
399 | "timeout": 1800,
400 | "disabled": False,
401 | "autoApprove": SAFE_FUNCTIONS,
402 | "alwaysAllow": SAFE_FUNCTIONS,
403 | }
404 | if env:
405 | mcp_servers[mcp.name]["env"] = env
406 | with open(config_path, "w", encoding="utf-8") as f:
407 | json.dump(config, f, indent=2)
408 | if not quiet:
409 | action = "Uninstalled" if uninstall else "Installed"
410 | print(f"{action} {name} MCP server (restart required)\n Config: {config_path}")
411 | installed += 1
412 | if not uninstall and installed == 0:
413 | print("No MCP servers installed. For unsupported MCP clients, use the following config:\n")
414 | print_mcp_config()
415 |
416 | def install_ida_plugin(*, uninstall: bool = False, quiet: bool = False):
417 | if sys.platform == "win32":
418 | ida_folder = os.path.join(os.getenv("APPDATA"), "Hex-Rays", "IDA Pro")
419 | else:
420 | ida_folder = os.path.join(os.path.expanduser("~"), ".idapro")
421 | free_licenses = glob(os.path.join(ida_folder, "idafree_*.hexlic"))
422 | if len(free_licenses) > 0:
423 | print(f"IDA Free does not support plugins and cannot be used. Purchase and install IDA Pro instead.")
424 | sys.exit(1)
425 | ida_plugin_folder = os.path.join(ida_folder, "plugins")
426 | plugin_destination = os.path.join(ida_plugin_folder, "mcp-plugin.py")
427 | if uninstall:
428 | if not os.path.exists(plugin_destination):
429 | print(f"Skipping IDA plugin uninstall\n Path: {plugin_destination} (not found)")
430 | return
431 | os.remove(plugin_destination)
432 | if not quiet:
433 | print(f"Uninstalled IDA plugin\n Path: {plugin_destination}")
434 | else:
435 | # Create IDA plugins folder
436 | if not os.path.exists(ida_plugin_folder):
437 | os.makedirs(ida_plugin_folder)
438 |
439 | # Skip if symlink already up to date
440 | realpath = os.path.realpath(plugin_destination)
441 | if realpath == IDA_PLUGIN_PY:
442 | if not quiet:
443 | print(f"Skipping IDA plugin installation (symlink up to date)\n Plugin: {realpath}")
444 | else:
445 | # Remove existing plugin
446 | if os.path.lexists(plugin_destination):
447 | os.remove(plugin_destination)
448 |
449 | # Symlink or copy the plugin
450 | try:
451 | os.symlink(IDA_PLUGIN_PY, plugin_destination)
452 | except OSError:
453 | shutil.copy(IDA_PLUGIN_PY, plugin_destination)
454 |
455 | if not quiet:
456 | print(f"Installed IDA Pro plugin (IDA restart required)\n Plugin: {plugin_destination}")
457 |
458 | def main():
459 | global ida_host, ida_port
460 | parser = argparse.ArgumentParser(description="IDA Pro MCP Server")
461 | parser.add_argument("--install", action="store_true", help="Install the MCP Server and IDA plugin")
462 | parser.add_argument("--uninstall", action="store_true", help="Uninstall the MCP Server and IDA plugin")
463 | parser.add_argument("--generate-docs", action="store_true", help=argparse.SUPPRESS)
464 | parser.add_argument("--install-plugin", action="store_true", help=argparse.SUPPRESS)
465 | parser.add_argument("--transport", type=str, default="stdio", help="MCP transport protocol to use (stdio or http://127.0.0.1:8744)")
466 | parser.add_argument("--ida-rpc", type=str, default=f"http://{ida_host}:{ida_port}", help=f"IDA RPC server to use (default: http://{ida_host}:{ida_port})")
467 | parser.add_argument("--unsafe", action="store_true", help="Enable unsafe functions (DANGEROUS)")
468 | parser.add_argument("--config", action="store_true", help="Generate MCP config JSON")
469 | args = parser.parse_args()
470 |
471 | if args.install and args.uninstall:
472 | print("Cannot install and uninstall at the same time")
473 | return
474 |
475 | if args.install:
476 | install_ida_plugin()
477 | install_mcp_servers()
478 | return
479 |
480 | if args.uninstall:
481 | install_ida_plugin(uninstall=True)
482 | install_mcp_servers(uninstall=True)
483 | return
484 |
485 | # NOTE: Developers can use this to generate the README
486 | if args.generate_docs:
487 | generate_readme()
488 | return
489 |
490 | # NOTE: This is silent for automated Cline installations
491 | if args.install_plugin:
492 | install_ida_plugin(quiet=True)
493 |
494 | if args.config:
495 | print_mcp_config()
496 | return
497 |
498 | # Parse IDA RPC server argument
499 | ida_rpc = urlparse(args.ida_rpc)
500 | if ida_rpc.hostname is None or ida_rpc.port is None:
501 | raise Exception(f"Invalid IDA RPC server: {args.ida_rpc}")
502 | ida_host = ida_rpc.hostname
503 | ida_port = ida_rpc.port
504 |
505 | # Remove unsafe tools
506 | if not args.unsafe:
507 | mcp_tools = mcp._tool_manager._tools
508 | for unsafe in UNSAFE_FUNCTIONS:
509 | if unsafe in mcp_tools:
510 | del mcp_tools[unsafe]
511 |
512 | try:
513 | if args.transport == "stdio":
514 | mcp.run(transport="stdio")
515 | else:
516 | url = urlparse(args.transport)
517 | if url.hostname is None or url.port is None:
518 | raise Exception(f"Invalid transport URL: {args.transport}")
519 | mcp.settings.host = url.hostname
520 | mcp.settings.port = url.port
521 | # NOTE: npx @modelcontextprotocol/inspector for debugging
522 | print(f"MCP Server availabile at http://{mcp.settings.host}:{mcp.settings.port}/sse")
523 | mcp.settings.log_level = "INFO"
524 | mcp.run(transport="sse")
525 | except KeyboardInterrupt:
526 | pass
527 |
528 | if __name__ == "__main__":
529 | main()
530 |
```
--------------------------------------------------------------------------------
/src/ida_pro_mcp/mcp-plugin.py:
--------------------------------------------------------------------------------
```python
1 | import os
2 | import sys
3 |
4 | if sys.version_info < (3, 11):
5 | raise RuntimeError("Python 3.11 or higher is required for the MCP plugin")
6 |
7 | import json
8 | import struct
9 | import threading
10 | import http.server
11 | from urllib.parse import urlparse
12 | from typing import (
13 | Any,
14 | Callable,
15 | get_type_hints,
16 | TypedDict,
17 | Optional,
18 | Annotated,
19 | TypeVar,
20 | Generic,
21 | NotRequired,
22 | overload,
23 | Literal,
24 | )
25 |
26 | class JSONRPCError(Exception):
27 | def __init__(self, code: int, message: str, data: Any = None):
28 | self.code = code
29 | self.message = message
30 | self.data = data
31 |
32 | class RPCRegistry:
33 | def __init__(self):
34 | self.methods: dict[str, Callable] = {}
35 | self.unsafe: set[str] = set()
36 |
37 | def register(self, func: Callable) -> Callable:
38 | self.methods[func.__name__] = func
39 | return func
40 |
41 | def mark_unsafe(self, func: Callable) -> Callable:
42 | self.unsafe.add(func.__name__)
43 | return func
44 |
45 | def dispatch(self, method: str, params: Any) -> Any:
46 | if method not in self.methods:
47 | raise JSONRPCError(-32601, f"Method '{method}' not found")
48 |
49 | func = self.methods[method]
50 | hints = get_type_hints(func)
51 |
52 | # Remove return annotation if present
53 | hints.pop("return", None)
54 |
55 | if isinstance(params, list):
56 | if len(params) != len(hints):
57 | raise JSONRPCError(-32602, f"Invalid params: expected {len(hints)} arguments, got {len(params)}")
58 |
59 | # Validate and convert parameters
60 | converted_params = []
61 | for value, (param_name, expected_type) in zip(params, hints.items()):
62 | try:
63 | if not isinstance(value, expected_type):
64 | value = expected_type(value)
65 | converted_params.append(value)
66 | except (ValueError, TypeError):
67 | raise JSONRPCError(-32602, f"Invalid type for parameter '{param_name}': expected {expected_type.__name__}")
68 |
69 | return func(*converted_params)
70 | elif isinstance(params, dict):
71 | if set(params.keys()) != set(hints.keys()):
72 | raise JSONRPCError(-32602, f"Invalid params: expected {list(hints.keys())}")
73 |
74 | # Validate and convert parameters
75 | converted_params = {}
76 | for param_name, expected_type in hints.items():
77 | value = params.get(param_name)
78 | try:
79 | if not isinstance(value, expected_type):
80 | value = expected_type(value)
81 | converted_params[param_name] = value
82 | except (ValueError, TypeError):
83 | raise JSONRPCError(-32602, f"Invalid type for parameter '{param_name}': expected {expected_type.__name__}")
84 |
85 | return func(**converted_params)
86 | else:
87 | raise JSONRPCError(-32600, "Invalid Request: params must be array or object")
88 |
89 | rpc_registry = RPCRegistry()
90 |
91 | def jsonrpc(func: Callable) -> Callable:
92 | """Decorator to register a function as a JSON-RPC method"""
93 | global rpc_registry
94 | return rpc_registry.register(func)
95 |
96 | def unsafe(func: Callable) -> Callable:
97 | """Decorator to register mark a function as unsafe"""
98 | return rpc_registry.mark_unsafe(func)
99 |
100 | class JSONRPCRequestHandler(http.server.BaseHTTPRequestHandler):
101 | def send_jsonrpc_error(self, code: int, message: str, id: Any = None):
102 | response = {
103 | "jsonrpc": "2.0",
104 | "error": {
105 | "code": code,
106 | "message": message
107 | }
108 | }
109 | if id is not None:
110 | response["id"] = id
111 | response_body = json.dumps(response).encode("utf-8")
112 | self.send_response(200)
113 | self.send_header("Content-Type", "application/json")
114 | self.send_header("Content-Length", str(len(response_body)))
115 | self.end_headers()
116 | self.wfile.write(response_body)
117 |
118 | def do_POST(self):
119 | global rpc_registry
120 |
121 | parsed_path = urlparse(self.path)
122 | if parsed_path.path != "/mcp":
123 | self.send_jsonrpc_error(-32098, "Invalid endpoint", None)
124 | return
125 |
126 | content_length = int(self.headers.get("Content-Length", 0))
127 | if content_length == 0:
128 | self.send_jsonrpc_error(-32700, "Parse error: missing request body", None)
129 | return
130 |
131 | request_body = self.rfile.read(content_length)
132 | try:
133 | request = json.loads(request_body)
134 | except json.JSONDecodeError:
135 | self.send_jsonrpc_error(-32700, "Parse error: invalid JSON", None)
136 | return
137 |
138 | # Prepare the response
139 | response: dict[str, Any] = {
140 | "jsonrpc": "2.0"
141 | }
142 | if request.get("id") is not None:
143 | response["id"] = request.get("id")
144 |
145 | try:
146 | # Basic JSON-RPC validation
147 | if not isinstance(request, dict):
148 | raise JSONRPCError(-32600, "Invalid Request")
149 | if request.get("jsonrpc") != "2.0":
150 | raise JSONRPCError(-32600, "Invalid JSON-RPC version")
151 | if "method" not in request:
152 | raise JSONRPCError(-32600, "Method not specified")
153 |
154 | # Dispatch the method
155 | result = rpc_registry.dispatch(request["method"], request.get("params", []))
156 | response["result"] = result
157 |
158 | except JSONRPCError as e:
159 | response["error"] = {
160 | "code": e.code,
161 | "message": e.message
162 | }
163 | if e.data is not None:
164 | response["error"]["data"] = e.data
165 | except IDAError as e:
166 | response["error"] = {
167 | "code": -32000,
168 | "message": e.message,
169 | }
170 | except Exception as e:
171 | traceback.print_exc()
172 | response["error"] = {
173 | "code": -32603,
174 | "message": "Internal error (please report a bug)",
175 | "data": traceback.format_exc(),
176 | }
177 |
178 | try:
179 | response_body = json.dumps(response).encode("utf-8")
180 | except Exception as e:
181 | traceback.print_exc()
182 | response_body = json.dumps({
183 | "error": {
184 | "code": -32603,
185 | "message": "Internal error (please report a bug)",
186 | "data": traceback.format_exc(),
187 | }
188 | }).encode("utf-8")
189 |
190 | self.send_response(200)
191 | self.send_header("Content-Type", "application/json")
192 | self.send_header("Content-Length", str(len(response_body)))
193 | self.end_headers()
194 | self.wfile.write(response_body)
195 |
196 | def log_message(self, format, *args):
197 | # Suppress logging
198 | pass
199 |
200 | class MCPHTTPServer(http.server.HTTPServer):
201 | allow_reuse_address = False
202 |
203 | class Server:
204 | HOST = "localhost"
205 | PORT = 13337
206 |
207 | def __init__(self):
208 | self.server = None
209 | self.server_thread = None
210 | self.running = False
211 |
212 | def start(self):
213 | if self.running:
214 | print("[MCP] Server is already running")
215 | return
216 |
217 | self.server_thread = threading.Thread(target=self._run_server, daemon=True)
218 | self.running = True
219 | self.server_thread.start()
220 |
221 | def stop(self):
222 | if not self.running:
223 | return
224 |
225 | self.running = False
226 | if self.server:
227 | self.server.shutdown()
228 | self.server.server_close()
229 | if self.server_thread:
230 | self.server_thread.join()
231 | self.server = None
232 | print("[MCP] Server stopped")
233 |
234 | def _run_server(self):
235 | try:
236 | # Create server in the thread to handle binding
237 | self.server = MCPHTTPServer((Server.HOST, Server.PORT), JSONRPCRequestHandler)
238 | print(f"[MCP] Server started at http://{Server.HOST}:{Server.PORT}")
239 | self.server.serve_forever()
240 | except OSError as e:
241 | if e.errno == 98 or e.errno == 10048: # Port already in use (Linux/Windows)
242 | print("[MCP] Error: Port 13337 is already in use")
243 | else:
244 | print(f"[MCP] Server error: {e}")
245 | self.running = False
246 | except Exception as e:
247 | print(f"[MCP] Server error: {e}")
248 | finally:
249 | self.running = False
250 |
251 | # A module that helps with writing thread safe ida code.
252 | # Based on:
253 | # https://web.archive.org/web/20160305190440/http://www.williballenthin.com/blog/2015/09/04/idapython-synchronization-decorator/
254 | import logging
255 | import queue
256 | import traceback
257 | import functools
258 | from enum import IntEnum, IntFlag
259 |
260 | import ida_hexrays
261 | import ida_kernwin
262 | import ida_funcs
263 | import ida_gdl
264 | import ida_lines
265 | import ida_idaapi
266 | import idc
267 | import idaapi
268 | import idautils
269 | import ida_nalt
270 | import ida_bytes
271 | import ida_typeinf
272 | import ida_xref
273 | import ida_entry
274 | import idautils
275 | import ida_idd
276 | import ida_dbg
277 | import ida_name
278 | import ida_ida
279 | import ida_frame
280 |
281 | ida_major, ida_minor = map(int, idaapi.get_kernel_version().split("."))
282 |
283 | class IDAError(Exception):
284 | def __init__(self, message: str):
285 | super().__init__(message)
286 |
287 | @property
288 | def message(self) -> str:
289 | return self.args[0]
290 |
291 | class IDASyncError(Exception):
292 | pass
293 |
294 | # Important note: Always make sure the return value from your function f is a
295 | # copy of the data you have gotten from IDA, and not the original data.
296 | #
297 | # Example:
298 | # --------
299 | #
300 | # Do this:
301 | #
302 | # @idaread
303 | # def ts_Functions():
304 | # return list(idautils.Functions())
305 | #
306 | # Don't do this:
307 | #
308 | # @idaread
309 | # def ts_Functions():
310 | # return idautils.Functions()
311 | #
312 |
313 | logger = logging.getLogger(__name__)
314 |
315 | # Enum for safety modes. Higher means safer:
316 | class IDASafety(IntEnum):
317 | SAFE_NONE = ida_kernwin.MFF_FAST
318 | SAFE_READ = ida_kernwin.MFF_READ
319 | SAFE_WRITE = ida_kernwin.MFF_WRITE
320 |
321 | call_stack = queue.LifoQueue()
322 |
323 | def sync_wrapper(ff, safety_mode: IDASafety):
324 | """
325 | Call a function ff with a specific IDA safety_mode.
326 | """
327 | #logger.debug('sync_wrapper: {}, {}'.format(ff.__name__, safety_mode))
328 |
329 | if safety_mode not in [IDASafety.SAFE_READ, IDASafety.SAFE_WRITE]:
330 | error_str = 'Invalid safety mode {} over function {}'\
331 | .format(safety_mode, ff.__name__)
332 | logger.error(error_str)
333 | raise IDASyncError(error_str)
334 |
335 | # No safety level is set up:
336 | res_container = queue.Queue()
337 |
338 | def runned():
339 | #logger.debug('Inside runned')
340 |
341 | # Make sure that we are not already inside a sync_wrapper:
342 | if not call_stack.empty():
343 | last_func_name = call_stack.get()
344 | error_str = ('Call stack is not empty while calling the '
345 | 'function {} from {}').format(ff.__name__, last_func_name)
346 | #logger.error(error_str)
347 | raise IDASyncError(error_str)
348 |
349 | call_stack.put((ff.__name__))
350 | try:
351 | res_container.put(ff())
352 | except Exception as x:
353 | res_container.put(x)
354 | finally:
355 | call_stack.get()
356 | #logger.debug('Finished runned')
357 |
358 | ret_val = idaapi.execute_sync(runned, safety_mode)
359 | res = res_container.get()
360 | if isinstance(res, Exception):
361 | raise res
362 | return res
363 |
364 | def idawrite(f):
365 | """
366 | decorator for marking a function as modifying the IDB.
367 | schedules a request to be made in the main IDA loop to avoid IDB corruption.
368 | """
369 | @functools.wraps(f)
370 | def wrapper(*args, **kwargs):
371 | ff = functools.partial(f, *args, **kwargs)
372 | ff.__name__ = f.__name__ # type: ignore
373 | return sync_wrapper(ff, idaapi.MFF_WRITE)
374 | return wrapper
375 |
376 | def idaread(f):
377 | """
378 | decorator for marking a function as reading from the IDB.
379 | schedules a request to be made in the main IDA loop to avoid
380 | inconsistent results.
381 | MFF_READ constant via: http://www.openrce.org/forums/posts/1827
382 | """
383 | @functools.wraps(f)
384 | def wrapper(*args, **kwargs):
385 | ff = functools.partial(f, *args, **kwargs)
386 | ff.__name__ = f.__name__ # type: ignore
387 | return sync_wrapper(ff, idaapi.MFF_READ)
388 | return wrapper
389 |
390 | def is_window_active():
391 | """Returns whether IDA is currently active"""
392 | try:
393 | from PyQt5.QtWidgets import QApplication
394 | except ImportError:
395 | return False
396 |
397 | app = QApplication.instance()
398 | if app is None:
399 | return False
400 |
401 | for widget in app.topLevelWidgets():
402 | if widget.isActiveWindow():
403 | return True
404 | return False
405 |
406 | class Metadata(TypedDict):
407 | path: str
408 | module: str
409 | base: str
410 | size: str
411 | md5: str
412 | sha256: str
413 | crc32: str
414 | filesize: str
415 |
416 | def get_image_size() -> int:
417 | try:
418 | # https://www.hex-rays.com/products/ida/support/sdkdoc/structidainfo.html
419 | info = idaapi.get_inf_structure() # type: ignore
420 | omin_ea = info.omin_ea
421 | omax_ea = info.omax_ea
422 | except AttributeError:
423 | import ida_ida
424 | omin_ea = ida_ida.inf_get_omin_ea()
425 | omax_ea = ida_ida.inf_get_omax_ea()
426 | # Bad heuristic for image size (bad if the relocations are the last section)
427 | image_size = omax_ea - omin_ea
428 | # Try to extract it from the PE header
429 | header = idautils.peutils_t().header()
430 | if header and header[:4] == b"PE\0\0":
431 | image_size = struct.unpack("<I", header[0x50:0x54])[0]
432 | return image_size
433 |
434 | @jsonrpc
435 | @idaread
436 | def get_metadata() -> Metadata:
437 | """Get metadata about the current IDB"""
438 | # Fat Mach-O binaries can return a None hash:
439 | # https://github.com/mrexodia/ida-pro-mcp/issues/26
440 | def hash(f):
441 | try:
442 | return f().hex()
443 | except:
444 | return ""
445 |
446 | return Metadata(path=idaapi.get_input_file_path(),
447 | module=idaapi.get_root_filename(),
448 | base=hex(idaapi.get_imagebase()),
449 | size=hex(get_image_size()),
450 | md5=hash(ida_nalt.retrieve_input_file_md5),
451 | sha256=hash(ida_nalt.retrieve_input_file_sha256),
452 | crc32=hex(ida_nalt.retrieve_input_file_crc32()),
453 | filesize=hex(ida_nalt.retrieve_input_file_size()))
454 |
455 | def get_prototype(fn: ida_funcs.func_t) -> Optional[str]:
456 | try:
457 | prototype: ida_typeinf.tinfo_t = fn.get_prototype()
458 | if prototype is not None:
459 | return str(prototype)
460 | else:
461 | return None
462 | except AttributeError:
463 | try:
464 | return idc.get_type(fn.start_ea)
465 | except:
466 | tif = ida_typeinf.tinfo_t()
467 | if ida_nalt.get_tinfo(tif, fn.start_ea):
468 | return str(tif)
469 | return None
470 | except Exception as e:
471 | print(f"Error getting function prototype: {e}")
472 | return None
473 |
474 | class Function(TypedDict):
475 | address: str
476 | name: str
477 | size: str
478 |
479 | def parse_address(address: str | int) -> int:
480 | if isinstance(address, int):
481 | return address
482 | try:
483 | return int(address, 0)
484 | except ValueError:
485 | for ch in address:
486 | if ch not in "0123456789abcdefABCDEF":
487 | raise IDAError(f"Failed to parse address: {address}")
488 | raise IDAError(f"Failed to parse address (missing 0x prefix): {address}")
489 |
490 | @overload
491 | def get_function(address: int, *, raise_error: Literal[True]) -> Function: ...
492 |
493 | @overload
494 | def get_function(address: int) -> Function: ...
495 |
496 | @overload
497 | def get_function(address: int, *, raise_error: Literal[False]) -> Optional[Function]: ...
498 |
499 | def get_function(address, *, raise_error=True):
500 | fn = idaapi.get_func(address)
501 | if fn is None:
502 | if raise_error:
503 | raise IDAError(f"No function found at address {hex(address)}")
504 | return None
505 |
506 | try:
507 | name = fn.get_name()
508 | except AttributeError:
509 | name = ida_funcs.get_func_name(fn.start_ea)
510 |
511 | return Function(address=hex(address), name=name, size=hex(fn.end_ea - fn.start_ea))
512 |
513 | DEMANGLED_TO_EA = {}
514 |
515 | def create_demangled_to_ea_map():
516 | for ea in idautils.Functions():
517 | # Get the function name and demangle it
518 | # MNG_NODEFINIT inhibits everything except the main name
519 | # where default demangling adds the function signature
520 | # and decorators (if any)
521 | demangled = idaapi.demangle_name(
522 | idc.get_name(ea, 0), idaapi.MNG_NODEFINIT)
523 | if demangled:
524 | DEMANGLED_TO_EA[demangled] = ea
525 |
526 | def get_type_by_name(type_name: str) -> ida_typeinf.tinfo_t:
527 | # 8-bit integers
528 | if type_name in ('int8', '__int8', 'int8_t', 'char', 'signed char'):
529 | return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT8)
530 | elif type_name in ('uint8', '__uint8', 'uint8_t', 'unsigned char', 'byte', 'BYTE'):
531 | return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT8)
532 |
533 | # 16-bit integers
534 | elif type_name in ('int16', '__int16', 'int16_t', 'short', 'short int', 'signed short', 'signed short int'):
535 | return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT16)
536 | elif type_name in ('uint16', '__uint16', 'uint16_t', 'unsigned short', 'unsigned short int', 'word', 'WORD'):
537 | return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT16)
538 |
539 | # 32-bit integers
540 | elif type_name in ('int32', '__int32', 'int32_t', 'int', 'signed int', 'long', 'long int', 'signed long', 'signed long int'):
541 | return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT32)
542 | elif type_name in ('uint32', '__uint32', 'uint32_t', 'unsigned int', 'unsigned long', 'unsigned long int', 'dword', 'DWORD'):
543 | return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT32)
544 |
545 | # 64-bit integers
546 | elif type_name in ('int64', '__int64', 'int64_t', 'long long', 'long long int', 'signed long long', 'signed long long int'):
547 | return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT64)
548 | elif type_name in ('uint64', '__uint64', 'uint64_t', 'unsigned int64', 'unsigned long long', 'unsigned long long int', 'qword', 'QWORD'):
549 | return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT64)
550 |
551 | # 128-bit integers
552 | elif type_name in ('int128', '__int128', 'int128_t', '__int128_t'):
553 | return ida_typeinf.tinfo_t(ida_typeinf.BTF_INT128)
554 | elif type_name in ('uint128', '__uint128', 'uint128_t', '__uint128_t', 'unsigned int128'):
555 | return ida_typeinf.tinfo_t(ida_typeinf.BTF_UINT128)
556 |
557 | # Floating point types
558 | elif type_name in ('float', ):
559 | return ida_typeinf.tinfo_t(ida_typeinf.BTF_FLOAT)
560 | elif type_name in ('double', ):
561 | return ida_typeinf.tinfo_t(ida_typeinf.BTF_DOUBLE)
562 | elif type_name in ('long double', 'ldouble'):
563 | return ida_typeinf.tinfo_t(ida_typeinf.BTF_LDOUBLE)
564 |
565 | # Boolean type
566 | elif type_name in ('bool', '_Bool', 'boolean'):
567 | return ida_typeinf.tinfo_t(ida_typeinf.BTF_BOOL)
568 |
569 | # Void type
570 | elif type_name in ('void', ):
571 | return ida_typeinf.tinfo_t(ida_typeinf.BTF_VOID)
572 |
573 | # If not a standard type, try to get a named type
574 | tif = ida_typeinf.tinfo_t()
575 | if tif.get_named_type(None, type_name, ida_typeinf.BTF_STRUCT):
576 | return tif
577 |
578 | if tif.get_named_type(None, type_name, ida_typeinf.BTF_TYPEDEF):
579 | return tif
580 |
581 | if tif.get_named_type(None, type_name, ida_typeinf.BTF_ENUM):
582 | return tif
583 |
584 | if tif.get_named_type(None, type_name, ida_typeinf.BTF_UNION):
585 | return tif
586 |
587 | if tif := ida_typeinf.tinfo_t(type_name):
588 | return tif
589 |
590 | raise IDAError(f"Unable to retrieve {type_name} type info object")
591 |
592 | @jsonrpc
593 | @idaread
594 | def get_function_by_name(
595 | name: Annotated[str, "Name of the function to get"]
596 | ) -> Function:
597 | """Get a function by its name"""
598 | function_address = idaapi.get_name_ea(idaapi.BADADDR, name)
599 | if function_address == idaapi.BADADDR:
600 | # If map has not been created yet, create it
601 | if len(DEMANGLED_TO_EA) == 0:
602 | create_demangled_to_ea_map()
603 | # Try to find the function in the map, else raise an error
604 | if name in DEMANGLED_TO_EA:
605 | function_address = DEMANGLED_TO_EA[name]
606 | else:
607 | raise IDAError(f"No function found with name {name}")
608 | return get_function(function_address)
609 |
610 | @jsonrpc
611 | @idaread
612 | def get_function_by_address(
613 | address: Annotated[str, "Address of the function to get"],
614 | ) -> Function:
615 | """Get a function by its address"""
616 | return get_function(parse_address(address))
617 |
618 | @jsonrpc
619 | @idaread
620 | def get_current_address() -> str:
621 | """Get the address currently selected by the user"""
622 | return hex(idaapi.get_screen_ea())
623 |
624 | @jsonrpc
625 | @idaread
626 | def get_current_function() -> Optional[Function]:
627 | """Get the function currently selected by the user"""
628 | return get_function(idaapi.get_screen_ea())
629 |
630 | class ConvertedNumber(TypedDict):
631 | decimal: str
632 | hexadecimal: str
633 | bytes: str
634 | ascii: Optional[str]
635 | binary: str
636 |
637 | @jsonrpc
638 | def convert_number(
639 | text: Annotated[str, "Textual representation of the number to convert"],
640 | size: Annotated[Optional[int], "Size of the variable in bytes"],
641 | ) -> ConvertedNumber:
642 | """Convert a number (decimal, hexadecimal) to different representations"""
643 | try:
644 | value = int(text, 0)
645 | except ValueError:
646 | raise IDAError(f"Invalid number: {text}")
647 |
648 | # Estimate the size of the number
649 | if not size:
650 | size = 0
651 | n = abs(value)
652 | while n:
653 | size += 1
654 | n >>= 1
655 | size += 7
656 | size //= 8
657 |
658 | # Convert the number to bytes
659 | try:
660 | bytes = value.to_bytes(size, "little", signed=True)
661 | except OverflowError:
662 | raise IDAError(f"Number {text} is too big for {size} bytes")
663 |
664 | # Convert the bytes to ASCII
665 | ascii = ""
666 | for byte in bytes.rstrip(b"\x00"):
667 | if byte >= 32 and byte <= 126:
668 | ascii += chr(byte)
669 | else:
670 | ascii = None
671 | break
672 |
673 | return ConvertedNumber(
674 | decimal=str(value),
675 | hexadecimal=hex(value),
676 | bytes=bytes.hex(" "),
677 | ascii=ascii,
678 | binary=bin(value),
679 | )
680 |
681 | T = TypeVar("T")
682 |
683 | class Page(TypedDict, Generic[T]):
684 | data: list[T]
685 | next_offset: Optional[int]
686 |
687 | def paginate(data: list[T], offset: int, count: int) -> Page[T]:
688 | if count == 0:
689 | count = len(data)
690 | next_offset = offset + count
691 | if next_offset >= len(data):
692 | next_offset = None
693 | return {
694 | "data": data[offset:offset + count],
695 | "next_offset": next_offset,
696 | }
697 |
698 | def pattern_filter(data: list[T], pattern: str, key: str) -> list[T]:
699 | if not pattern:
700 | return data
701 |
702 | # TODO: implement /regex/ matching
703 |
704 | def matches(item) -> bool:
705 | return pattern.lower() in item[key].lower()
706 | return list(filter(matches, data))
707 |
708 | @jsonrpc
709 | @idaread
710 | def list_functions_filter(
711 | offset: Annotated[int, "Offset to start listing from (start at 0)"],
712 | count: Annotated[int, "Number of functions to list (100 is a good default, 0 means remainder)"],
713 | filter: Annotated[str, "Filter to apply to the list (required parameter, empty string for no filter). Case-insensitive contains or /regex/ syntax"],
714 | ) -> Page[Function]:
715 | """List matching functions in the database (paginated, filtered)"""
716 | functions = [get_function(address) for address in idautils.Functions()]
717 | functions = pattern_filter(functions, filter, "name")
718 | return paginate(functions, offset, count)
719 |
720 | @jsonrpc
721 | @idaread
722 | def list_functions(
723 | offset: Annotated[int, "Offset to start listing from (start at 0)"],
724 | count: Annotated[int, "Number of functions to list (100 is a good default, 0 means remainder)"],
725 | ) -> Page[Function]:
726 | """List all functions in the database (paginated)"""
727 | return list_functions_filter(offset, count, "")
728 |
729 | class Global(TypedDict):
730 | address: str
731 | name: str
732 |
733 | @jsonrpc
734 | @idaread
735 | def list_globals_filter(
736 | offset: Annotated[int, "Offset to start listing from (start at 0)"],
737 | count: Annotated[int, "Number of globals to list (100 is a good default, 0 means remainder)"],
738 | filter: Annotated[str, "Filter to apply to the list (required parameter, empty string for no filter). Case-insensitive contains or /regex/ syntax"],
739 | ) -> Page[Global]:
740 | """List matching globals in the database (paginated, filtered)"""
741 | globals: list[Global] = []
742 | for addr, name in idautils.Names():
743 | # Skip functions and none
744 | if not idaapi.get_func(addr) or name is None:
745 | globals += [Global(address=hex(addr), name=name)]
746 |
747 | globals = pattern_filter(globals, filter, "name")
748 | return paginate(globals, offset, count)
749 |
750 | @jsonrpc
751 | def list_globals(
752 | offset: Annotated[int, "Offset to start listing from (start at 0)"],
753 | count: Annotated[int, "Number of globals to list (100 is a good default, 0 means remainder)"],
754 | ) -> Page[Global]:
755 | """List all globals in the database (paginated)"""
756 | return list_globals_filter(offset, count, "")
757 |
758 | class Import(TypedDict):
759 | address: str
760 | imported_name: str
761 | module: str
762 |
763 | @jsonrpc
764 | @idaread
765 | def list_imports(
766 | offset: Annotated[int, "Offset to start listing from (start at 0)"],
767 | count: Annotated[int, "Number of imports to list (100 is a good default, 0 means remainder)"],
768 | ) -> Page[Import]:
769 | """ List all imported symbols with their name and module (paginated) """
770 | nimps = ida_nalt.get_import_module_qty()
771 |
772 | rv = []
773 | for i in range(nimps):
774 | module_name = ida_nalt.get_import_module_name(i)
775 | if not module_name:
776 | module_name = "<unnamed>"
777 |
778 | def imp_cb(ea, symbol_name, ordinal, acc):
779 | if not symbol_name:
780 | symbol_name = f"#{ordinal}"
781 |
782 | acc += [Import(address=hex(ea), imported_name=symbol_name, module=module_name)]
783 |
784 | return True
785 |
786 | imp_cb_w_context = lambda ea, symbol_name, ordinal: imp_cb(ea, symbol_name, ordinal, rv)
787 | ida_nalt.enum_import_names(i, imp_cb_w_context)
788 |
789 | return paginate(rv, offset, count)
790 |
791 | class String(TypedDict):
792 | address: str
793 | length: int
794 | string: str
795 |
796 | @jsonrpc
797 | @idaread
798 | def list_strings_filter(
799 | offset: Annotated[int, "Offset to start listing from (start at 0)"],
800 | count: Annotated[int, "Number of strings to list (100 is a good default, 0 means remainder)"],
801 | filter: Annotated[str, "Filter to apply to the list (required parameter, empty string for no filter). Case-insensitive contains or /regex/ syntax"],
802 | ) -> Page[String]:
803 | """List matching strings in the database (paginated, filtered)"""
804 | strings: list[String] = []
805 | for item in idautils.Strings():
806 | if item is None:
807 | continue
808 | try:
809 | string = str(item)
810 | if string:
811 | strings += [
812 | String(address=hex(item.ea), length=item.length, string=string),
813 | ]
814 | except:
815 | continue
816 | strings = pattern_filter(strings, filter, "string")
817 | return paginate(strings, offset, count)
818 |
819 | @jsonrpc
820 | def list_strings(
821 | offset: Annotated[int, "Offset to start listing from (start at 0)"],
822 | count: Annotated[int, "Number of strings to list (100 is a good default, 0 means remainder)"],
823 | ) -> Page[String]:
824 | """List all strings in the database (paginated)"""
825 | return list_strings_filter(offset, count, "")
826 |
827 | @jsonrpc
828 | @idaread
829 | def list_local_types():
830 | """List all Local types in the database"""
831 | error = ida_hexrays.hexrays_failure_t()
832 | locals = []
833 | idati = ida_typeinf.get_idati()
834 | type_count = ida_typeinf.get_ordinal_limit(idati)
835 | for ordinal in range(1, type_count):
836 | try:
837 | tif = ida_typeinf.tinfo_t()
838 | if tif.get_numbered_type(idati, ordinal):
839 | type_name = tif.get_type_name()
840 | if not type_name:
841 | type_name = f"<Anonymous Type #{ordinal}>"
842 | locals.append(f"\nType #{ordinal}: {type_name}")
843 | if tif.is_udt():
844 | c_decl_flags = (ida_typeinf.PRTYPE_MULTI | ida_typeinf.PRTYPE_TYPE | ida_typeinf.PRTYPE_SEMI | ida_typeinf.PRTYPE_DEF | ida_typeinf.PRTYPE_METHODS | ida_typeinf.PRTYPE_OFFSETS)
845 | c_decl_output = tif._print(None, c_decl_flags)
846 | if c_decl_output:
847 | locals.append(f" C declaration:\n{c_decl_output}")
848 | else:
849 | simple_decl = tif._print(None, ida_typeinf.PRTYPE_1LINE | ida_typeinf.PRTYPE_TYPE | ida_typeinf.PRTYPE_SEMI)
850 | if simple_decl:
851 | locals.append(f" Simple declaration:\n{simple_decl}")
852 | else:
853 | message = f"\nType #{ordinal}: Failed to retrieve information."
854 | if error.str:
855 | message += f": {error.str}"
856 | if error.errea != idaapi.BADADDR:
857 | message += f"from (address: {hex(error.errea)})"
858 | raise IDAError(message)
859 | except:
860 | continue
861 | return locals
862 |
863 | def decompile_checked(address: int) -> ida_hexrays.cfunc_t:
864 | if not ida_hexrays.init_hexrays_plugin():
865 | raise IDAError("Hex-Rays decompiler is not available")
866 | error = ida_hexrays.hexrays_failure_t()
867 | cfunc = ida_hexrays.decompile_func(address, error, ida_hexrays.DECOMP_WARNINGS)
868 | if not cfunc:
869 | if error.code == ida_hexrays.MERR_LICENSE:
870 | raise IDAError("Decompiler license is not available. Use `disassemble_function` to get the assembly code instead.")
871 |
872 | message = f"Decompilation failed at {hex(address)}"
873 | if error.str:
874 | message += f": {error.str}"
875 | if error.errea != idaapi.BADADDR:
876 | message += f" (address: {hex(error.errea)})"
877 | raise IDAError(message)
878 | return cfunc # type: ignore (this is a SWIG issue)
879 |
880 | @jsonrpc
881 | @idaread
882 | def decompile_function(
883 | address: Annotated[str, "Address of the function to decompile"],
884 | ) -> str:
885 | """Decompile a function at the given address"""
886 | start = parse_address(address)
887 | cfunc = decompile_checked(start)
888 | if is_window_active():
889 | ida_hexrays.open_pseudocode(start, ida_hexrays.OPF_REUSE)
890 | sv = cfunc.get_pseudocode()
891 | pseudocode = ""
892 | for i, sl in enumerate(sv):
893 | sl: ida_kernwin.simpleline_t
894 | item = ida_hexrays.ctree_item_t()
895 | addr = None if i > 0 else cfunc.entry_ea
896 | if cfunc.get_line_item(sl.line, 0, False, None, item, None): # type: ignore (IDA SDK type hint wrong)
897 | dstr: str | None = item.dstr()
898 | if dstr:
899 | ds = dstr.split(": ")
900 | if len(ds) == 2:
901 | try:
902 | addr = int(ds[0], 16)
903 | except ValueError:
904 | pass
905 | line = ida_lines.tag_remove(sl.line)
906 | if len(pseudocode) > 0:
907 | pseudocode += "\n"
908 | if not addr:
909 | pseudocode += f"/* line: {i} */ {line}"
910 | else:
911 | pseudocode += f"/* line: {i}, address: {hex(addr)} */ {line}"
912 |
913 | return pseudocode
914 |
915 | class DisassemblyLine(TypedDict):
916 | segment: NotRequired[str]
917 | address: str
918 | label: NotRequired[str]
919 | instruction: str
920 | comments: NotRequired[list[str]]
921 |
922 | class Argument(TypedDict):
923 | name: str
924 | type: str
925 |
926 | class StackFrameVariable(TypedDict):
927 | name: str
928 | offset: str
929 | size: str
930 | type: str
931 |
932 | class DisassemblyFunction(TypedDict):
933 | name: str
934 | start_ea: str
935 | return_type: NotRequired[str]
936 | arguments: NotRequired[list[Argument]]
937 | stack_frame: list[StackFrameVariable]
938 | lines: list[DisassemblyLine]
939 |
940 | @jsonrpc
941 | @idaread
942 | def disassemble_function(
943 | start_address: Annotated[str, "Address of the function to disassemble"],
944 | ) -> DisassemblyFunction:
945 | """Get assembly code for a function (API-compatible with older IDA builds)"""
946 | start = parse_address(start_address)
947 | func = idaapi.get_func(start)
948 | if not func:
949 | raise IDAError(f"No function found at address {hex(start)}")
950 | if is_window_active():
951 | ida_kernwin.jumpto(start)
952 |
953 | func_name: str = ida_funcs.get_func_name(func.start_ea) or "<unnamed>"
954 |
955 | lines: list[DisassemblyLine] = []
956 | for ea in idautils.FuncItems(func.start_ea):
957 | if ea == idaapi.BADADDR:
958 | continue
959 |
960 | seg = idaapi.getseg(ea)
961 | segment: str | None = idaapi.get_segm_name(seg) if seg else None
962 |
963 | label: str | None = idc.get_name(ea, 0)
964 | if not label or (label == func_name and ea == func.start_ea):
965 | label = None
966 |
967 | comments: list[str] = []
968 | c: str | None = idaapi.get_cmt(ea, False)
969 | if c:
970 | comments.append(c)
971 | c = idaapi.get_cmt(ea, True)
972 | if c:
973 | comments.append(c)
974 |
975 | mnem: str = idc.print_insn_mnem(ea) or ""
976 | ops: list[str] = []
977 | for n in range(8):
978 | if idc.get_operand_type(ea, n) == idaapi.o_void:
979 | break
980 | ops.append(idc.print_operand(ea, n) or "")
981 | instruction = f"{mnem} {', '.join(ops)}".rstrip()
982 |
983 | line: DisassemblyLine = {
984 | "address": hex(ea),
985 | "instruction": instruction
986 | }
987 | if segment:
988 | line["segment"] = segment
989 | if label:
990 | line["label"] = label
991 | if comments:
992 | line["comments"] = comments
993 | lines.append(line)
994 |
995 | # prototype and args via tinfo (safe across versions)
996 | rettype = None
997 | args: Optional[list[Argument]] = None
998 | tif = ida_typeinf.tinfo_t()
999 | if ida_nalt.get_tinfo(tif, func.start_ea) and tif.is_func():
1000 | ftd = ida_typeinf.func_type_data_t()
1001 | if tif.get_func_details(ftd):
1002 | rettype = str(ftd.rettype)
1003 | args = [Argument(name=(a.name or f"arg{i}"), type=str(a.type))
1004 | for i, a in enumerate(ftd)]
1005 |
1006 | out: DisassemblyFunction = {
1007 | "name": func_name,
1008 | "start_ea": hex(func.start_ea),
1009 | "stack_frame": get_stack_frame_variables_internal(func.start_ea, False),
1010 | "lines": lines,
1011 | }
1012 | if rettype:
1013 | out["return_type"] = rettype
1014 | if args is not None:
1015 | out["arguments"] = args
1016 | return out
1017 |
1018 | class Xref(TypedDict):
1019 | address: str
1020 | type: str
1021 | function: Optional[Function]
1022 |
1023 | @jsonrpc
1024 | @idaread
1025 | def get_xrefs_to(
1026 | address: Annotated[str, "Address to get cross references to"],
1027 | ) -> list[Xref]:
1028 | """Get all cross references to the given address"""
1029 | xrefs = []
1030 | xref: ida_xref.xrefblk_t
1031 | for xref in idautils.XrefsTo(parse_address(address)): # type: ignore (IDA SDK type hints are incorrect)
1032 | xrefs += [
1033 | Xref(address=hex(xref.frm),
1034 | type="code" if xref.iscode else "data",
1035 | function=get_function(xref.frm, raise_error=False))
1036 | ]
1037 | return xrefs
1038 |
1039 | @jsonrpc
1040 | @idaread
1041 | def get_xrefs_to_field(
1042 | struct_name: Annotated[str, "Name of the struct (type) containing the field"],
1043 | field_name: Annotated[str, "Name of the field (member) to get xrefs to"],
1044 | ) -> list[Xref]:
1045 | """Get all cross references to a named struct field (member)"""
1046 |
1047 | # Get the type library
1048 | til = ida_typeinf.get_idati()
1049 | if not til:
1050 | raise IDAError("Failed to retrieve type library.")
1051 |
1052 | # Get the structure type info
1053 | tif = ida_typeinf.tinfo_t()
1054 | if not tif.get_named_type(til, struct_name, ida_typeinf.BTF_STRUCT, True, False):
1055 | print(f"Structure '{struct_name}' not found.")
1056 | return []
1057 |
1058 | # Get The field index
1059 | idx = ida_typeinf.get_udm_by_fullname(None, struct_name + '.' + field_name) # type: ignore (IDA SDK type hints are incorrect)
1060 | if idx == -1:
1061 | print(f"Field '{field_name}' not found in structure '{struct_name}'.")
1062 | return []
1063 |
1064 | # Get the type identifier
1065 | tid = tif.get_udm_tid(idx)
1066 | if tid == ida_idaapi.BADADDR:
1067 | raise IDAError(f"Unable to get tid for structure '{struct_name}' and field '{field_name}'.")
1068 |
1069 | # Get xrefs to the tid
1070 | xrefs = []
1071 | xref: ida_xref.xrefblk_t
1072 | for xref in idautils.XrefsTo(tid): # type: ignore (IDA SDK type hints are incorrect)
1073 | xrefs += [
1074 | Xref(address=hex(xref.frm),
1075 | type="code" if xref.iscode else "data",
1076 | function=get_function(xref.frm, raise_error=False))
1077 | ]
1078 | return xrefs
1079 |
1080 | @jsonrpc
1081 | @idaread
1082 | def get_callees(
1083 | function_address: Annotated[str, "Address of the function to get callee functions"],
1084 | ) -> list[dict[str, str]]:
1085 | """Get all the functions called (callees) by the function at function_address"""
1086 | func_start = parse_address(function_address)
1087 | func = idaapi.get_func(func_start)
1088 | if not func:
1089 | raise IDAError(f"No function found containing address {function_address}")
1090 | func_end = idc.find_func_end(func_start)
1091 | callees: list[dict[str, str]] = []
1092 | current_ea = func_start
1093 | while current_ea < func_end:
1094 | insn = idaapi.insn_t()
1095 | idaapi.decode_insn(insn, current_ea)
1096 | if insn.itype in [idaapi.NN_call, idaapi.NN_callfi, idaapi.NN_callni]:
1097 | target = idc.get_operand_value(current_ea, 0)
1098 | target_type = idc.get_operand_type(current_ea, 0)
1099 | # check if it's a direct call - avoid getting the indirect call offset
1100 | if target_type in [idaapi.o_mem, idaapi.o_near, idaapi.o_far]:
1101 | # in here, we do not use get_function because the target can be external function.
1102 | # but, we should mark the target as internal/external function.
1103 | func_type = (
1104 | "internal" if idaapi.get_func(target) is not None else "external"
1105 | )
1106 | func_name = idc.get_name(target)
1107 | if func_name is not None:
1108 | callees.append(
1109 | {"address": hex(target), "name": func_name, "type": func_type}
1110 | )
1111 | current_ea = idc.next_head(current_ea, func_end)
1112 |
1113 | # deduplicate callees
1114 | unique_callee_tuples = {tuple(callee.items()) for callee in callees}
1115 | unique_callees = [dict(callee) for callee in unique_callee_tuples]
1116 | return unique_callees # type: ignore
1117 |
1118 | @jsonrpc
1119 | @idaread
1120 | def get_callers(
1121 | function_address: Annotated[str, "Address of the function to get callers"],
1122 | ) -> list[Function]:
1123 | """Get all callers of the given address"""
1124 | callers = {}
1125 | for caller_address in idautils.CodeRefsTo(parse_address(function_address), 0):
1126 | # validate the xref address is a function
1127 | func = get_function(caller_address, raise_error=False)
1128 | if not func:
1129 | continue
1130 | # load the instruction at the xref address
1131 | insn = idaapi.insn_t()
1132 | idaapi.decode_insn(insn, caller_address)
1133 | # check the instruction is a call
1134 | if insn.itype not in [idaapi.NN_call, idaapi.NN_callfi, idaapi.NN_callni]:
1135 | continue
1136 | # deduplicate callers by address
1137 | callers[func["address"]] = func
1138 |
1139 | return list(callers.values())
1140 |
1141 | @jsonrpc
1142 | @idaread
1143 | def get_entry_points() -> list[Function]:
1144 | """Get all entry points in the database"""
1145 | result = []
1146 | for i in range(ida_entry.get_entry_qty()):
1147 | ordinal = ida_entry.get_entry_ordinal(i)
1148 | address = ida_entry.get_entry(ordinal)
1149 | func = get_function(address, raise_error=False)
1150 | if func is not None:
1151 | result.append(func)
1152 | return result
1153 |
1154 | @jsonrpc
1155 | @idawrite
1156 | def set_comment(
1157 | address: Annotated[str, "Address in the function to set the comment for"],
1158 | comment: Annotated[str, "Comment text"],
1159 | ):
1160 | """Set a comment for a given address in the function disassembly and pseudocode"""
1161 | ea = parse_address(address)
1162 |
1163 | if not idaapi.set_cmt(ea, comment, False):
1164 | raise IDAError(f"Failed to set disassembly comment at {hex(ea)}")
1165 |
1166 | if not ida_hexrays.init_hexrays_plugin():
1167 | return
1168 |
1169 | # Reference: https://cyber.wtf/2019/03/22/using-ida-python-to-analyze-trickbot/
1170 | # Check if the address corresponds to a line
1171 | try:
1172 | cfunc = decompile_checked(ea)
1173 | except IDAError:
1174 | # Skip decompiler comment if decompilation fails
1175 | return
1176 |
1177 | # Special case for function entry comments
1178 | if ea == cfunc.entry_ea:
1179 | idc.set_func_cmt(ea, comment, True)
1180 | cfunc.refresh_func_ctext()
1181 | return
1182 |
1183 | eamap = cfunc.get_eamap()
1184 | if ea not in eamap:
1185 | print(f"Failed to set decompiler comment at {hex(ea)}")
1186 | return
1187 | nearest_ea = eamap[ea][0].ea
1188 |
1189 | # Remove existing orphan comments
1190 | if cfunc.has_orphan_cmts():
1191 | cfunc.del_orphan_cmts()
1192 | cfunc.save_user_cmts()
1193 |
1194 | # Set the comment by trying all possible item types
1195 | tl = idaapi.treeloc_t()
1196 | tl.ea = nearest_ea
1197 | for itp in range(idaapi.ITP_SEMI, idaapi.ITP_COLON):
1198 | tl.itp = itp
1199 | cfunc.set_user_cmt(tl, comment)
1200 | cfunc.save_user_cmts()
1201 | cfunc.refresh_func_ctext()
1202 | if not cfunc.has_orphan_cmts():
1203 | return
1204 | cfunc.del_orphan_cmts()
1205 | cfunc.save_user_cmts()
1206 | print(f"Failed to set decompiler comment at {hex(ea)}")
1207 |
1208 | def refresh_decompiler_widget():
1209 | widget = ida_kernwin.get_current_widget()
1210 | if widget is not None:
1211 | vu = ida_hexrays.get_widget_vdui(widget)
1212 | if vu is not None:
1213 | vu.refresh_ctext()
1214 |
1215 | def refresh_decompiler_ctext(function_address: int):
1216 | error = ida_hexrays.hexrays_failure_t()
1217 | cfunc: ida_hexrays.cfunc_t = ida_hexrays.decompile_func(function_address, error, ida_hexrays.DECOMP_WARNINGS)
1218 | if cfunc:
1219 | cfunc.refresh_func_ctext()
1220 |
1221 | @jsonrpc
1222 | @idawrite
1223 | def rename_local_variable(
1224 | function_address: Annotated[str, "Address of the function containing the variable"],
1225 | old_name: Annotated[str, "Current name of the variable"],
1226 | new_name: Annotated[str, "New name for the variable (empty for a default name)"],
1227 | ):
1228 | """Rename a local variable in a function"""
1229 | func = idaapi.get_func(parse_address(function_address))
1230 | if not func:
1231 | raise IDAError(f"No function found at address {function_address}")
1232 | if not ida_hexrays.rename_lvar(func.start_ea, old_name, new_name):
1233 | raise IDAError(f"Failed to rename local variable {old_name} in function {hex(func.start_ea)}")
1234 | refresh_decompiler_ctext(func.start_ea)
1235 |
1236 | @jsonrpc
1237 | @idawrite
1238 | def rename_global_variable(
1239 | old_name: Annotated[str, "Current name of the global variable"],
1240 | new_name: Annotated[str, "New name for the global variable (empty for a default name)"],
1241 | ):
1242 | """Rename a global variable"""
1243 | ea = idaapi.get_name_ea(idaapi.BADADDR, old_name)
1244 | if not idaapi.set_name(ea, new_name):
1245 | raise IDAError(f"Failed to rename global variable {old_name} to {new_name}")
1246 | refresh_decompiler_ctext(ea)
1247 |
1248 | @jsonrpc
1249 | @idawrite
1250 | def set_global_variable_type(
1251 | variable_name: Annotated[str, "Name of the global variable"],
1252 | new_type: Annotated[str, "New type for the variable"],
1253 | ):
1254 | """Set a global variable's type"""
1255 | ea = idaapi.get_name_ea(idaapi.BADADDR, variable_name)
1256 | tif = get_type_by_name(new_type)
1257 | if not tif:
1258 | raise IDAError(f"Parsed declaration is not a variable type")
1259 | if not ida_typeinf.apply_tinfo(ea, tif, ida_typeinf.PT_SIL):
1260 | raise IDAError(f"Failed to apply type")
1261 |
1262 | def patch_address_assemble(
1263 | ea: int,
1264 | assemble: str,
1265 | ) -> int:
1266 | """Patch Address Assemble"""
1267 | (check_assemble, bytes_to_patch) = idautils.Assemble(ea, assemble)
1268 | if check_assemble == False:
1269 | raise IDAError(f"Failed to assemble instruction: {assemble}")
1270 | try:
1271 | ida_bytes.patch_bytes(ea, bytes_to_patch)
1272 | except:
1273 | raise IDAError(f"Failed to patch bytes at address {hex(ea)}")
1274 |
1275 | return len(bytes_to_patch)
1276 |
1277 | @jsonrpc
1278 | @idawrite
1279 | def patch_address_assembles(
1280 | address: Annotated[str, "Starting Address to apply patch"],
1281 | instructions: Annotated[str, "Assembly instructions separated by ';'"],
1282 | ) -> str:
1283 | ea = parse_address(address)
1284 | assembles = instructions.split(";")
1285 | for assemble in assembles:
1286 | assemble = assemble.strip()
1287 | try:
1288 | patch_bytes_len = patch_address_assemble(ea, assemble)
1289 | except IDAError as e:
1290 | raise IDAError(f"Failed to patch bytes at address {hex(ea)}: {e}")
1291 | ea += patch_bytes_len
1292 | return f"Patched {len(assembles)} instructions"
1293 |
1294 | @jsonrpc
1295 | @idaread
1296 | def get_global_variable_value_by_name(variable_name: Annotated[str, "Name of the global variable"]) -> str:
1297 | """
1298 | Read a global variable's value (if known at compile-time)
1299 |
1300 | Prefer this function over the `data_read_*` functions.
1301 | """
1302 | ea = idaapi.get_name_ea(idaapi.BADADDR, variable_name)
1303 | if ea == idaapi.BADADDR:
1304 | raise IDAError(f"Global variable {variable_name} not found")
1305 |
1306 | return get_global_variable_value_internal(ea)
1307 |
1308 | @jsonrpc
1309 | @idaread
1310 | def get_global_variable_value_at_address(address: Annotated[str, "Address of the global variable"]) -> str:
1311 | """
1312 | Read a global variable's value by its address (if known at compile-time)
1313 |
1314 | Prefer this function over the `data_read_*` functions.
1315 | """
1316 | ea = parse_address(address)
1317 | return get_global_variable_value_internal(ea)
1318 |
1319 | def get_global_variable_value_internal(ea: int) -> str:
1320 | # Get the type information for the variable
1321 | tif = ida_typeinf.tinfo_t()
1322 | if not ida_nalt.get_tinfo(tif, ea):
1323 | # No type info, maybe we can figure out its size by its name
1324 | if not ida_bytes.has_any_name(ea):
1325 | raise IDAError(f"Failed to get type information for variable at {ea:#x}")
1326 |
1327 | size = ida_bytes.get_item_size(ea)
1328 | if size == 0:
1329 | raise IDAError(f"Failed to get type information for variable at {ea:#x}")
1330 | else:
1331 | # Determine the size of the variable
1332 | size = tif.get_size()
1333 |
1334 | # Read the value based on the size
1335 | if size == 0 and tif.is_array() and tif.get_array_element().is_decl_char():
1336 | return_string = idaapi.get_strlit_contents(ea, -1, 0).decode("utf-8").strip()
1337 | return f"\"{return_string}\""
1338 | elif size == 1:
1339 | return hex(ida_bytes.get_byte(ea))
1340 | elif size == 2:
1341 | return hex(ida_bytes.get_word(ea))
1342 | elif size == 4:
1343 | return hex(ida_bytes.get_dword(ea))
1344 | elif size == 8:
1345 | return hex(ida_bytes.get_qword(ea))
1346 | else:
1347 | # For other sizes, return the raw bytes
1348 | return ' '.join(hex(x) for x in ida_bytes.get_bytes(ea, size))
1349 |
1350 | @jsonrpc
1351 | @idawrite
1352 | def rename_function(
1353 | function_address: Annotated[str, "Address of the function to rename"],
1354 | new_name: Annotated[str, "New name for the function (empty for a default name)"],
1355 | ):
1356 | """Rename a function"""
1357 | func = idaapi.get_func(parse_address(function_address))
1358 | if not func:
1359 | raise IDAError(f"No function found at address {function_address}")
1360 | if not idaapi.set_name(func.start_ea, new_name):
1361 | raise IDAError(f"Failed to rename function {hex(func.start_ea)} to {new_name}")
1362 | refresh_decompiler_ctext(func.start_ea)
1363 |
1364 | @jsonrpc
1365 | @idawrite
1366 | def set_function_prototype(
1367 | function_address: Annotated[str, "Address of the function"],
1368 | prototype: Annotated[str, "New function prototype"],
1369 | ):
1370 | """Set a function's prototype"""
1371 | func = idaapi.get_func(parse_address(function_address))
1372 | if not func:
1373 | raise IDAError(f"No function found at address {function_address}")
1374 | try:
1375 | tif = ida_typeinf.tinfo_t(prototype, None, ida_typeinf.PT_SIL)
1376 | if not tif.is_func():
1377 | raise IDAError(f"Parsed declaration is not a function type")
1378 | if not ida_typeinf.apply_tinfo(func.start_ea, tif, ida_typeinf.PT_SIL):
1379 | raise IDAError(f"Failed to apply type")
1380 | refresh_decompiler_ctext(func.start_ea)
1381 | except Exception as e:
1382 | raise IDAError(f"Failed to parse prototype string: {prototype}")
1383 |
1384 | class my_modifier_t(ida_hexrays.user_lvar_modifier_t):
1385 | def __init__(self, var_name: str, new_type: ida_typeinf.tinfo_t):
1386 | ida_hexrays.user_lvar_modifier_t.__init__(self)
1387 | self.var_name = var_name
1388 | self.new_type = new_type
1389 |
1390 | def modify_lvars(self, lvinf):
1391 | for lvar_saved in lvinf.lvvec:
1392 | lvar_saved: ida_hexrays.lvar_saved_info_t
1393 | if lvar_saved.name == self.var_name:
1394 | lvar_saved.type = self.new_type
1395 | return True
1396 | return False
1397 |
1398 | # NOTE: This is extremely hacky, but necessary to get errors out of IDA
1399 | def parse_decls_ctypes(decls: str, hti_flags: int) -> tuple[int, list[str]]:
1400 | if sys.platform == "win32":
1401 | import ctypes
1402 |
1403 | assert isinstance(decls, str), "decls must be a string"
1404 | assert isinstance(hti_flags, int), "hti_flags must be an int"
1405 | c_decls = decls.encode("utf-8")
1406 | c_til = None
1407 | ida_dll = ctypes.CDLL("ida")
1408 | ida_dll.parse_decls.argtypes = [
1409 | ctypes.c_void_p,
1410 | ctypes.c_char_p,
1411 | ctypes.c_void_p,
1412 | ctypes.c_int,
1413 | ]
1414 | ida_dll.parse_decls.restype = ctypes.c_int
1415 |
1416 | messages: list[str] = []
1417 |
1418 | @ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_char_p, ctypes.c_char_p)
1419 | def magic_printer(fmt: bytes, arg1: bytes):
1420 | if fmt.count(b"%") == 1 and b"%s" in fmt:
1421 | formatted = fmt.replace(b"%s", arg1)
1422 | messages.append(formatted.decode("utf-8"))
1423 | return len(formatted) + 1
1424 | else:
1425 | messages.append(f"unsupported magic_printer fmt: {repr(fmt)}")
1426 | return 0
1427 |
1428 | errors = ida_dll.parse_decls(c_til, c_decls, magic_printer, hti_flags)
1429 | else:
1430 | # NOTE: The approach above could also work on other platforms, but it's
1431 | # not been tested and there are differences in the vararg ABIs.
1432 | errors = ida_typeinf.parse_decls(None, decls, False, hti_flags)
1433 | messages = []
1434 | return errors, messages
1435 |
1436 | @jsonrpc
1437 | @idawrite
1438 | def declare_c_type(
1439 | c_declaration: Annotated[str, "C declaration of the type. Examples include: typedef int foo_t; struct bar { int a; bool b; };"],
1440 | ):
1441 | """Create or update a local type from a C declaration"""
1442 | # PT_SIL: Suppress warning dialogs (although it seems unnecessary here)
1443 | # PT_EMPTY: Allow empty types (also unnecessary?)
1444 | # PT_TYP: Print back status messages with struct tags
1445 | flags = ida_typeinf.PT_SIL | ida_typeinf.PT_EMPTY | ida_typeinf.PT_TYP
1446 | errors, messages = parse_decls_ctypes(c_declaration, flags)
1447 |
1448 | pretty_messages = "\n".join(messages)
1449 | if errors > 0:
1450 | raise IDAError(f"Failed to parse type:\n{c_declaration}\n\nErrors:\n{pretty_messages}")
1451 | return f"success\n\nInfo:\n{pretty_messages}"
1452 |
1453 | @jsonrpc
1454 | @idawrite
1455 | def set_local_variable_type(
1456 | function_address: Annotated[str, "Address of the decompiled function containing the variable"],
1457 | variable_name: Annotated[str, "Name of the variable"],
1458 | new_type: Annotated[str, "New type for the variable"],
1459 | ):
1460 | """Set a local variable's type"""
1461 | try:
1462 | # Some versions of IDA don't support this constructor
1463 | new_tif = ida_typeinf.tinfo_t(new_type, None, ida_typeinf.PT_SIL)
1464 | except Exception:
1465 | try:
1466 | new_tif = ida_typeinf.tinfo_t()
1467 | # parse_decl requires semicolon for the type
1468 | ida_typeinf.parse_decl(new_tif, None, new_type + ";", ida_typeinf.PT_SIL) # type: ignore (IDA SDK type hints are incorrect)
1469 | except Exception:
1470 | raise IDAError(f"Failed to parse type: {new_type}")
1471 | func = idaapi.get_func(parse_address(function_address))
1472 | if not func:
1473 | raise IDAError(f"No function found at address {function_address}")
1474 | if not ida_hexrays.rename_lvar(func.start_ea, variable_name, variable_name):
1475 | raise IDAError(f"Failed to find local variable: {variable_name}")
1476 | modifier = my_modifier_t(variable_name, new_tif)
1477 | if not ida_hexrays.modify_user_lvars(func.start_ea, modifier):
1478 | raise IDAError(f"Failed to modify local variable: {variable_name}")
1479 | refresh_decompiler_ctext(func.start_ea)
1480 |
1481 | @jsonrpc
1482 | @idaread
1483 | def get_stack_frame_variables(
1484 | function_address: Annotated[str, "Address of the disassembled function to retrieve the stack frame variables"]
1485 | ) -> list[StackFrameVariable]:
1486 | """ Retrieve the stack frame variables for a given function """
1487 | return get_stack_frame_variables_internal(parse_address(function_address), True)
1488 |
1489 | def get_stack_frame_variables_internal(function_address: int, raise_error: bool) -> list[StackFrameVariable]:
1490 | # TODO: IDA 8.3 does not support tif.get_type_by_tid
1491 | if ida_major < 9:
1492 | return []
1493 |
1494 | func = idaapi.get_func(function_address)
1495 | if not func:
1496 | if raise_error:
1497 | raise IDAError(f"No function found at address {function_address}")
1498 | return []
1499 |
1500 | tif = ida_typeinf.tinfo_t()
1501 | if not tif.get_type_by_tid(func.frame) or not tif.is_udt():
1502 | return []
1503 |
1504 | members: list[StackFrameVariable] = []
1505 | udt = ida_typeinf.udt_type_data_t()
1506 | tif.get_udt_details(udt)
1507 | for udm in udt:
1508 | if not udm.is_gap():
1509 | name = udm.name
1510 | offset = udm.offset // 8
1511 | size = udm.size // 8
1512 | type = str(udm.type)
1513 | members.append(StackFrameVariable(
1514 | name=name,
1515 | offset=hex(offset),
1516 | size=hex(size),
1517 | type=type
1518 | ))
1519 | return members
1520 |
1521 | class StructureMember(TypedDict):
1522 | name: str
1523 | offset: str
1524 | size: str
1525 | type: str
1526 |
1527 | class StructureDefinition(TypedDict):
1528 | name: str
1529 | size: str
1530 | members: list[StructureMember]
1531 |
1532 | @jsonrpc
1533 | @idaread
1534 | def get_defined_structures() -> list[StructureDefinition]:
1535 | """ Returns a list of all defined structures """
1536 |
1537 | rv = []
1538 | limit = ida_typeinf.get_ordinal_limit()
1539 | for ordinal in range(1, limit):
1540 | tif = ida_typeinf.tinfo_t()
1541 | tif.get_numbered_type(None, ordinal)
1542 | if tif.is_udt():
1543 | udt = ida_typeinf.udt_type_data_t()
1544 | members = []
1545 | if tif.get_udt_details(udt):
1546 | members = [
1547 | StructureMember(name=x.name,
1548 | offset=hex(x.offset // 8),
1549 | size=hex(x.size // 8),
1550 | type=str(x.type))
1551 | for _, x in enumerate(udt)
1552 | ]
1553 |
1554 | rv += [StructureDefinition(name=tif.get_type_name(), # type: ignore (IDA SDK type hints are incorrect)
1555 | size=hex(tif.get_size()),
1556 | members=members)]
1557 |
1558 | return rv
1559 |
1560 | @jsonrpc
1561 | @idaread
1562 | def analyze_struct_detailed(name: Annotated[str, "Name of the structure to analyze"]) -> dict:
1563 | """Detailed analysis of a structure with all fields"""
1564 | # Get tinfo object
1565 | tif = ida_typeinf.tinfo_t()
1566 | if not tif.get_named_type(None, name):
1567 | raise IDAError(f"Structure '{name}' not found!")
1568 |
1569 | result = {
1570 | "name": name,
1571 | "type": str(tif._print()),
1572 | "size": tif.get_size(),
1573 | "is_udt": tif.is_udt()
1574 | }
1575 |
1576 | if not tif.is_udt():
1577 | result["error"] = "This is not a user-defined type!"
1578 | return result
1579 |
1580 | # Get UDT (User Defined Type) details
1581 | udt_data = ida_typeinf.udt_type_data_t()
1582 | if not tif.get_udt_details(udt_data):
1583 | result["error"] = "Failed to get structure details!"
1584 | return result
1585 |
1586 | result["member_count"] = udt_data.size()
1587 | result["is_union"] = udt_data.is_union
1588 | result["udt_type"] = "Union" if udt_data.is_union else "Struct"
1589 |
1590 | # Output information about each field
1591 | members = []
1592 | for i, member in enumerate(udt_data):
1593 | offset = member.begin() // 8 # Convert bits to bytes
1594 | size = member.size // 8 if member.size > 0 else member.type.get_size()
1595 | member_type = member.type._print()
1596 | member_name = member.name
1597 |
1598 | member_info = {
1599 | "index": i,
1600 | "offset": f"0x{offset:08X}",
1601 | "size": size,
1602 | "type": member_type,
1603 | "name": member_name,
1604 | "is_nested_udt": member.type.is_udt()
1605 | }
1606 |
1607 | # If this is a nested structure, show additional information
1608 | if member.type.is_udt():
1609 | member_info["nested_size"] = member.type.get_size()
1610 |
1611 | members.append(member_info)
1612 |
1613 | result["members"] = members
1614 | result["total_size"] = tif.get_size()
1615 |
1616 | return result
1617 |
1618 | @jsonrpc
1619 | @idaread
1620 | def get_struct_at_address(address: Annotated[str, "Address to analyze structure at"],
1621 | struct_name: Annotated[str, "Name of the structure"]) -> dict:
1622 | """Get structure field values at a specific address"""
1623 | addr = parse_address(address)
1624 |
1625 | # Get structure tinfo
1626 | tif = ida_typeinf.tinfo_t()
1627 | if not tif.get_named_type(None, struct_name):
1628 | raise IDAError(f"Structure '{struct_name}' not found!")
1629 |
1630 | # Get structure details
1631 | udt_data = ida_typeinf.udt_type_data_t()
1632 | if not tif.get_udt_details(udt_data):
1633 | raise IDAError("Failed to get structure details!")
1634 |
1635 | result = {
1636 | "struct_name": struct_name,
1637 | "address": f"0x{addr:X}",
1638 | "members": []
1639 | }
1640 |
1641 | for member in udt_data:
1642 | offset = member.begin() // 8
1643 | member_addr = addr + offset
1644 | member_type = member.type._print()
1645 | member_name = member.name
1646 | member_size = member.type.get_size()
1647 |
1648 | # Try to get value based on size
1649 | try:
1650 | if member.type.is_ptr():
1651 | # Pointer
1652 | is_64bit = ida_ida.inf_is_64bit() if ida_major >= 9 else idaapi.get_inf_structure().is_64bit()
1653 | if is_64bit:
1654 | value = idaapi.get_qword(member_addr)
1655 | value_str = f"0x{value:016X}"
1656 | else:
1657 | value = idaapi.get_dword(member_addr)
1658 | value_str = f"0x{value:08X}"
1659 | elif member_size == 1:
1660 | value = idaapi.get_byte(member_addr)
1661 | value_str = f"0x{value:02X} ({value})"
1662 | elif member_size == 2:
1663 | value = idaapi.get_word(member_addr)
1664 | value_str = f"0x{value:04X} ({value})"
1665 | elif member_size == 4:
1666 | value = idaapi.get_dword(member_addr)
1667 | value_str = f"0x{value:08X} ({value})"
1668 | elif member_size == 8:
1669 | value = idaapi.get_qword(member_addr)
1670 | value_str = f"0x{value:016X} ({value})"
1671 | else:
1672 | # For large structures, read first few bytes
1673 | bytes_data = []
1674 | for i in range(min(member_size, 16)):
1675 | try:
1676 | byte_val = idaapi.get_byte(member_addr + i)
1677 | bytes_data.append(f"{byte_val:02X}")
1678 | except:
1679 | break
1680 | value_str = f"[{' '.join(bytes_data)}{'...' if member_size > 16 else ''}]"
1681 | except:
1682 | value_str = "<failed to read>"
1683 |
1684 | member_info = {
1685 | "offset": f"0x{offset:08X}",
1686 | "type": member_type,
1687 | "name": member_name,
1688 | "value": value_str
1689 | }
1690 |
1691 | result["members"].append(member_info)
1692 |
1693 | return result
1694 |
1695 | @jsonrpc
1696 | @idaread
1697 | def get_struct_info_simple(name: Annotated[str, "Name of the structure"]) -> dict:
1698 | """Simple function to get basic structure information"""
1699 | tif = ida_typeinf.tinfo_t()
1700 | if not tif.get_named_type(None, name):
1701 | raise IDAError(f"Structure '{name}' not found!")
1702 |
1703 | info = {
1704 | 'name': name,
1705 | 'type': tif._print(),
1706 | 'size': tif.get_size(),
1707 | 'is_udt': tif.is_udt()
1708 | }
1709 |
1710 | if tif.is_udt():
1711 | udt_data = ida_typeinf.udt_type_data_t()
1712 | if tif.get_udt_details(udt_data):
1713 | info['member_count'] = udt_data.size()
1714 | info['is_union'] = udt_data.is_union
1715 |
1716 | members = []
1717 | for member in udt_data:
1718 | members.append({
1719 | 'name': member.name,
1720 | 'type': member.type._print(),
1721 | 'offset': member.begin() // 8,
1722 | 'size': member.type.get_size()
1723 | })
1724 | info['members'] = members
1725 |
1726 | return info
1727 |
1728 | @jsonrpc
1729 | @idaread
1730 | def search_structures(filter: Annotated[str, "Filter pattern to search for structures (case-insensitive)"]) -> list[dict]:
1731 | """Search for structures by name pattern"""
1732 | results = []
1733 | limit = ida_typeinf.get_ordinal_limit()
1734 |
1735 | for ordinal in range(1, limit):
1736 | tif = ida_typeinf.tinfo_t()
1737 | if tif.get_numbered_type(None, ordinal):
1738 | type_name: str = tif.get_type_name() # type: ignore (IDA SDK type hints are incorrect)
1739 | if type_name and filter.lower() in type_name.lower():
1740 | if tif.is_udt():
1741 | udt_data = ida_typeinf.udt_type_data_t()
1742 | member_count = 0
1743 | if tif.get_udt_details(udt_data):
1744 | member_count = udt_data.size()
1745 |
1746 | results.append({
1747 | "name": type_name,
1748 | "size": tif.get_size(),
1749 | "member_count": member_count,
1750 | "is_union": udt_data.is_union if tif.get_udt_details(udt_data) else False,
1751 | "ordinal": ordinal
1752 | })
1753 |
1754 | return results
1755 |
1756 | @jsonrpc
1757 | @idawrite
1758 | def rename_stack_frame_variable(
1759 | function_address: Annotated[str, "Address of the disassembled function to set the stack frame variables"],
1760 | old_name: Annotated[str, "Current name of the variable"],
1761 | new_name: Annotated[str, "New name for the variable (empty for a default name)"]
1762 | ):
1763 | """ Change the name of a stack variable for an IDA function """
1764 | func = idaapi.get_func(parse_address(function_address))
1765 | if not func:
1766 | raise IDAError(f"No function found at address {function_address}")
1767 |
1768 | frame_tif = ida_typeinf.tinfo_t()
1769 | if not ida_frame.get_func_frame(frame_tif, func):
1770 | raise IDAError("No frame returned.")
1771 |
1772 | idx, udm = frame_tif.get_udm(old_name) # type: ignore (IDA SDK type hints are incorrect)
1773 | if not udm:
1774 | raise IDAError(f"{old_name} not found.")
1775 |
1776 | tid = frame_tif.get_udm_tid(idx)
1777 | if ida_frame.is_special_frame_member(tid):
1778 | raise IDAError(f"{old_name} is a special frame member. Will not change the name.")
1779 |
1780 | udm = ida_typeinf.udm_t()
1781 | frame_tif.get_udm_by_tid(udm, tid)
1782 | offset = udm.offset // 8
1783 | if ida_frame.is_funcarg_off(func, offset):
1784 | raise IDAError(f"{old_name} is an argument member. Will not change the name.")
1785 |
1786 | sval = ida_frame.soff_to_fpoff(func, offset)
1787 | if not ida_frame.define_stkvar(func, new_name, sval, udm.type):
1788 | raise IDAError("failed to rename stack frame variable")
1789 |
1790 | @jsonrpc
1791 | @idawrite
1792 | def create_stack_frame_variable(
1793 | function_address: Annotated[str, "Address of the disassembled function to set the stack frame variables"],
1794 | offset: Annotated[str, "Offset of the stack frame variable"],
1795 | variable_name: Annotated[str, "Name of the stack variable"],
1796 | type_name: Annotated[str, "Type of the stack variable"]
1797 | ):
1798 | """ For a given function, create a stack variable at an offset and with a specific type """
1799 |
1800 | func = idaapi.get_func(parse_address(function_address))
1801 | if not func:
1802 | raise IDAError(f"No function found at address {function_address}")
1803 |
1804 | ea = parse_address(offset)
1805 |
1806 | frame_tif = ida_typeinf.tinfo_t()
1807 | if not ida_frame.get_func_frame(frame_tif, func):
1808 | raise IDAError("No frame returned.")
1809 |
1810 | tif = get_type_by_name(type_name)
1811 | if not ida_frame.define_stkvar(func, variable_name, ea, tif):
1812 | raise IDAError("failed to define stack frame variable")
1813 |
1814 | @jsonrpc
1815 | @idawrite
1816 | def set_stack_frame_variable_type(
1817 | function_address: Annotated[str, "Address of the disassembled function to set the stack frame variables"],
1818 | variable_name: Annotated[str, "Name of the stack variable"],
1819 | type_name: Annotated[str, "Type of the stack variable"]
1820 | ):
1821 | """ For a given disassembled function, set the type of a stack variable """
1822 |
1823 | func = idaapi.get_func(parse_address(function_address))
1824 | if not func:
1825 | raise IDAError(f"No function found at address {function_address}")
1826 |
1827 | frame_tif = ida_typeinf.tinfo_t()
1828 | if not ida_frame.get_func_frame(frame_tif, func):
1829 | raise IDAError("No frame returned.")
1830 |
1831 | idx, udm = frame_tif.get_udm(variable_name) # type: ignore (IDA SDK type hints are incorrect)
1832 | if not udm:
1833 | raise IDAError(f"{variable_name} not found.")
1834 |
1835 | tid = frame_tif.get_udm_tid(idx)
1836 | udm = ida_typeinf.udm_t()
1837 | frame_tif.get_udm_by_tid(udm, tid)
1838 | offset = udm.offset // 8
1839 |
1840 | tif = get_type_by_name(type_name)
1841 | if not ida_frame.set_frame_member_type(func, offset, tif):
1842 | raise IDAError("failed to set stack frame variable type")
1843 |
1844 | @jsonrpc
1845 | @idawrite
1846 | def delete_stack_frame_variable(
1847 | function_address: Annotated[str, "Address of the function to set the stack frame variables"],
1848 | variable_name: Annotated[str, "Name of the stack variable"]
1849 | ):
1850 | """ Delete the named stack variable for a given function """
1851 |
1852 | func = idaapi.get_func(parse_address(function_address))
1853 | if not func:
1854 | raise IDAError(f"No function found at address {function_address}")
1855 |
1856 | frame_tif = ida_typeinf.tinfo_t()
1857 | if not ida_frame.get_func_frame(frame_tif, func):
1858 | raise IDAError("No frame returned.")
1859 |
1860 | idx, udm = frame_tif.get_udm(variable_name) # type: ignore (IDA SDK type hints are incorrect)
1861 | if not udm:
1862 | raise IDAError(f"{variable_name} not found.")
1863 |
1864 | tid = frame_tif.get_udm_tid(idx)
1865 | if ida_frame.is_special_frame_member(tid):
1866 | raise IDAError(f"{variable_name} is a special frame member. Will not delete.")
1867 |
1868 | udm = ida_typeinf.udm_t()
1869 | frame_tif.get_udm_by_tid(udm, tid)
1870 | offset = udm.offset // 8
1871 | size = udm.size // 8
1872 | if ida_frame.is_funcarg_off(func, offset):
1873 | raise IDAError(f"{variable_name} is an argument member. Will not delete.")
1874 |
1875 | if not ida_frame.delete_frame_members(func, offset, offset+size):
1876 | raise IDAError("failed to delete stack frame variable")
1877 |
1878 | @jsonrpc
1879 | @idaread
1880 | def read_memory_bytes(
1881 | memory_address: Annotated[str, "Address of the memory value to be read"],
1882 | size: Annotated[int, "size of memory to read"]
1883 | ) -> str:
1884 | """
1885 | Read bytes at a given address.
1886 |
1887 | Only use this function if `get_global_variable_at` and `get_global_variable_by_name`
1888 | both failed.
1889 | """
1890 | return ' '.join(f'{x:#02x}' for x in ida_bytes.get_bytes(parse_address(memory_address), size))
1891 |
1892 | @jsonrpc
1893 | @idaread
1894 | def data_read_byte(
1895 | address: Annotated[str, "Address to get 1 byte value from"],
1896 | ) -> int:
1897 | """
1898 | Read the 1 byte value at the specified address.
1899 |
1900 | Only use this function if `get_global_variable_at` failed.
1901 | """
1902 | ea = parse_address(address)
1903 | return ida_bytes.get_wide_byte(ea)
1904 |
1905 | @jsonrpc
1906 | @idaread
1907 | def data_read_word(
1908 | address: Annotated[str, "Address to get 2 bytes value from"],
1909 | ) -> int:
1910 | """
1911 | Read the 2 byte value at the specified address as a WORD.
1912 |
1913 | Only use this function if `get_global_variable_at` failed.
1914 | """
1915 | ea = parse_address(address)
1916 | return ida_bytes.get_wide_word(ea)
1917 |
1918 | @jsonrpc
1919 | @idaread
1920 | def data_read_dword(
1921 | address: Annotated[str, "Address to get 4 bytes value from"],
1922 | ) -> int:
1923 | """
1924 | Read the 4 byte value at the specified address as a DWORD.
1925 |
1926 | Only use this function if `get_global_variable_at` failed.
1927 | """
1928 | ea = parse_address(address)
1929 | return ida_bytes.get_wide_dword(ea)
1930 |
1931 | @jsonrpc
1932 | @idaread
1933 | def data_read_qword(
1934 | address: Annotated[str, "Address to get 8 bytes value from"]
1935 | ) -> int:
1936 | """
1937 | Read the 8 byte value at the specified address as a QWORD.
1938 |
1939 | Only use this function if `get_global_variable_at` failed.
1940 | """
1941 | ea = parse_address(address)
1942 | return ida_bytes.get_qword(ea)
1943 |
1944 | @jsonrpc
1945 | @idaread
1946 | def data_read_string(
1947 | address: Annotated[str, "Address to get string from"]
1948 | ) -> str:
1949 | """
1950 | Read the string at the specified address.
1951 |
1952 | Only use this function if `get_global_variable_at` failed.
1953 | """
1954 | try:
1955 | return idaapi.get_strlit_contents(parse_address(address),-1,0).decode("utf-8")
1956 | except Exception as e:
1957 | return "Error:" + str(e)
1958 |
1959 | class RegisterValue(TypedDict):
1960 | name: str
1961 | value: str
1962 |
1963 | class ThreadRegisters(TypedDict):
1964 | thread_id: int
1965 | registers: list[RegisterValue]
1966 |
1967 | def dbg_ensure_running() -> "ida_idd.debugger_t":
1968 | dbg = ida_idd.get_dbg()
1969 | if not dbg:
1970 | raise IDAError("Debugger not running")
1971 | if ida_dbg.get_ip_val() is None:
1972 | raise IDAError("Debugger not running")
1973 | return dbg
1974 |
1975 | @jsonrpc
1976 | @idaread
1977 | @unsafe
1978 | def dbg_get_registers() -> list[ThreadRegisters]:
1979 | """Get all registers and their values. This function is only available when debugging."""
1980 | result: list[ThreadRegisters] = []
1981 | dbg = dbg_ensure_running()
1982 | for thread_index in range(ida_dbg.get_thread_qty()):
1983 | tid = ida_dbg.getn_thread(thread_index)
1984 | regs = []
1985 | regvals: ida_idd.regvals_t = ida_dbg.get_reg_vals(tid)
1986 | for reg_index, rv in enumerate(regvals):
1987 | rv: ida_idd.regval_t
1988 | reg_info = dbg.regs(reg_index)
1989 |
1990 | # NOTE: Apparently this can fail under some circumstances
1991 | try:
1992 | reg_value = rv.pyval(reg_info.dtype)
1993 | except ValueError:
1994 | reg_value = ida_idaapi.BADADDR
1995 |
1996 | if isinstance(reg_value, int):
1997 | reg_value = hex(reg_value)
1998 | if isinstance(reg_value, bytes):
1999 | reg_value = reg_value.hex(" ")
2000 | else:
2001 | reg_value = str(reg_value)
2002 | regs.append({
2003 | "name": reg_info.name,
2004 | "value": reg_value,
2005 | })
2006 | result.append({
2007 | "thread_id": tid,
2008 | "registers": regs,
2009 | })
2010 | return result
2011 |
2012 | @jsonrpc
2013 | @idaread
2014 | @unsafe
2015 | def dbg_get_call_stack() -> list[dict[str, str]]:
2016 | """Get the current call stack."""
2017 | callstack = []
2018 | try:
2019 | tid = ida_dbg.get_current_thread()
2020 | trace = ida_idd.call_stack_t()
2021 |
2022 | if not ida_dbg.collect_stack_trace(tid, trace):
2023 | return []
2024 | for frame in trace:
2025 | frame_info = {
2026 | "address": hex(frame.callea),
2027 | }
2028 | try:
2029 | module_info = ida_idd.modinfo_t()
2030 | if ida_dbg.get_module_info(frame.callea, module_info):
2031 | frame_info["module"] = os.path.basename(module_info.name)
2032 | else:
2033 | frame_info["module"] = "<unknown>"
2034 |
2035 | name = (
2036 | ida_name.get_nice_colored_name(
2037 | frame.callea,
2038 | ida_name.GNCN_NOCOLOR
2039 | | ida_name.GNCN_NOLABEL
2040 | | ida_name.GNCN_NOSEG
2041 | | ida_name.GNCN_PREFDBG,
2042 | )
2043 | or "<unnamed>"
2044 | )
2045 | frame_info["symbol"] = name
2046 |
2047 | except Exception as e:
2048 | frame_info["module"] = "<error>"
2049 | frame_info["symbol"] = str(e)
2050 |
2051 | callstack.append(frame_info)
2052 |
2053 | except Exception as e:
2054 | pass
2055 | return callstack
2056 |
2057 | class Breakpoint(TypedDict):
2058 | ea: str
2059 | enabled: bool
2060 | condition: Optional[str]
2061 |
2062 | def list_breakpoints():
2063 | breakpoints: list[Breakpoint] = []
2064 | for i in range(ida_dbg.get_bpt_qty()):
2065 | bpt = ida_dbg.bpt_t()
2066 | if ida_dbg.getn_bpt(i, bpt):
2067 | breakpoints.append(Breakpoint(
2068 | ea=hex(bpt.ea),
2069 | enabled=bpt.flags & ida_dbg.BPT_ENABLED,
2070 | condition=str(bpt.condition) if bpt.condition else None,
2071 | ))
2072 | return breakpoints
2073 |
2074 | @jsonrpc
2075 | @idaread
2076 | @unsafe
2077 | def dbg_list_breakpoints():
2078 | """List all breakpoints in the program."""
2079 | return list_breakpoints()
2080 |
2081 | @jsonrpc
2082 | @idaread
2083 | @unsafe
2084 | def dbg_start_process():
2085 | """Start the debugger, returns the current instruction pointer"""
2086 |
2087 | if len(list_breakpoints()) == 0:
2088 | for i in range(ida_entry.get_entry_qty()):
2089 | ordinal = ida_entry.get_entry_ordinal(i)
2090 | address = ida_entry.get_entry(ordinal)
2091 | if address != ida_idaapi.BADADDR:
2092 | ida_dbg.add_bpt(address, 0, idaapi.BPT_SOFT)
2093 |
2094 | if idaapi.start_process("", "", "") == 1:
2095 | ip = ida_dbg.get_ip_val()
2096 | if ip is not None:
2097 | return hex(ip)
2098 | raise IDAError("Failed to start debugger (did the user configure the debugger manually one time?)")
2099 |
2100 | @jsonrpc
2101 | @idaread
2102 | @unsafe
2103 | def dbg_exit_process():
2104 | """Exit the debugger"""
2105 | dbg_ensure_running()
2106 | if idaapi.exit_process():
2107 | return
2108 | raise IDAError("Failed to exit debugger")
2109 |
2110 | @jsonrpc
2111 | @idaread
2112 | @unsafe
2113 | def dbg_continue_process() -> str:
2114 | """Continue the debugger, returns the current instruction pointer"""
2115 | dbg_ensure_running()
2116 | if idaapi.continue_process():
2117 | ip = ida_dbg.get_ip_val()
2118 | if ip is not None:
2119 | return hex(ip)
2120 | raise IDAError("Failed to continue debugger")
2121 |
2122 | @jsonrpc
2123 | @idaread
2124 | @unsafe
2125 | def dbg_run_to(
2126 | address: Annotated[str, "Run the debugger to the specified address"],
2127 | ):
2128 | """Run the debugger to the specified address"""
2129 | dbg_ensure_running()
2130 | ea = parse_address(address)
2131 | if idaapi.run_to(ea):
2132 | ip = ida_dbg.get_ip_val()
2133 | if ip is not None:
2134 | return hex(ip)
2135 | raise IDAError(f"Failed to run to address {hex(ea)}")
2136 |
2137 | @jsonrpc
2138 | @idaread
2139 | @unsafe
2140 | def dbg_set_breakpoint(
2141 | address: Annotated[str, "Set a breakpoint at the specified address"],
2142 | ):
2143 | """Set a breakpoint at the specified address"""
2144 | ea = parse_address(address)
2145 | if idaapi.add_bpt(ea, 0, idaapi.BPT_SOFT):
2146 | return f"Breakpoint set at {hex(ea)}"
2147 | breakpoints = list_breakpoints()
2148 | for bpt in breakpoints:
2149 | if bpt["ea"] == hex(ea):
2150 | return
2151 | raise IDAError(f"Failed to set breakpoint at address {hex(ea)}")
2152 |
2153 | @jsonrpc
2154 | @idaread
2155 | @unsafe
2156 | def dbg_step_into():
2157 | """Step into the current instruction"""
2158 | dbg_ensure_running()
2159 | if idaapi.step_into():
2160 | ip = ida_dbg.get_ip_val()
2161 | if ip is not None:
2162 | return hex(ip)
2163 | raise IDAError("Failed to step into")
2164 |
2165 | @jsonrpc
2166 | @idaread
2167 | @unsafe
2168 | def dbg_step_over():
2169 | """Step over the current instruction"""
2170 | dbg_ensure_running()
2171 | if idaapi.step_over():
2172 | ip = ida_dbg.get_ip_val()
2173 | if ip is not None:
2174 | return hex(ip)
2175 | raise IDAError("Failed to step over")
2176 |
2177 | @jsonrpc
2178 | @idaread
2179 | @unsafe
2180 | def dbg_delete_breakpoint(
2181 | address: Annotated[str, "del a breakpoint at the specified address"],
2182 | ):
2183 | """Delete a breakpoint at the specified address"""
2184 | ea = parse_address(address)
2185 | if idaapi.del_bpt(ea):
2186 | return
2187 | raise IDAError(f"Failed to delete breakpoint at address {hex(ea)}")
2188 |
2189 | @jsonrpc
2190 | @idaread
2191 | @unsafe
2192 | def dbg_enable_breakpoint(
2193 | address: Annotated[str, "Enable or disable a breakpoint at the specified address"],
2194 | enable: Annotated[bool, "Enable or disable a breakpoint"],
2195 | ):
2196 | """Enable or disable a breakpoint at the specified address"""
2197 | ea = parse_address(address)
2198 | if idaapi.enable_bpt(ea, enable):
2199 | return
2200 | raise IDAError(f"Failed to {'' if enable else 'disable '}breakpoint at address {hex(ea)}")
2201 |
2202 | class MCP(idaapi.plugin_t):
2203 | flags = idaapi.PLUGIN_KEEP
2204 | comment = "MCP Plugin"
2205 | help = "MCP"
2206 | wanted_name = "MCP"
2207 | wanted_hotkey = "Ctrl-Alt-M"
2208 |
2209 | def init(self):
2210 | self.server = Server()
2211 | hotkey = MCP.wanted_hotkey.replace("-", "+")
2212 | if sys.platform == "darwin":
2213 | hotkey = hotkey.replace("Alt", "Option")
2214 | print(f"[MCP] Plugin loaded, use Edit -> Plugins -> MCP ({hotkey}) to start the server")
2215 | return idaapi.PLUGIN_KEEP
2216 |
2217 | def run(self, arg):
2218 | self.server.start()
2219 |
2220 | def term(self):
2221 | self.server.stop()
2222 |
2223 | def PLUGIN_ENTRY():
2224 | return MCP()
2225 |
```