#
tokens: 12046/50000 14/14 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .gitignore
├── .python-version
├── common_tools.py
├── env_example.txt
├── LICENSE
├── mcp_settings.json
├── prompts_cn.md
├── prompts_en.md
├── pyproject.toml
├── README_cn.md
├── README.md
├── requirements.txt
├── sample_cline_mcp_settings.json
├── server_tools_source.sc
├── server_tools.py
├── server_tools.sc
├── server.py
├── test_mcp_client.py
├── test_sc_tools.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
1 | 3.12
2 | 
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
 1 | # Python-generated files
 2 | __pycache__/
 3 | *.py[oc]
 4 | build/
 5 | dist/
 6 | wheels/
 7 | *.egg-info
 8 | 
 9 | # Virtual environments
10 | .venv
11 | /.scala-build/
12 | /.vscode/
13 | /.metals/
14 | /.bsp/
15 | .idea
16 | .env
17 | 
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | [![MseeP.ai Security Assessment Badge](https://mseep.net/pr/sfncat-mcp-joern-badge.png)](https://mseep.ai/app/sfncat-mcp-joern)
  2 | 
  3 | # Joern MCP Server
  4 | 
  5 | A simple MCP Server for Joern.
  6 | 
  7 | <a href="https://glama.ai/mcp/servers/@sfncat/mcp-joern">
  8 |   <img width="380" height="200" src="https://glama.ai/mcp/servers/@sfncat/mcp-joern/badge" alt="Joern Server MCP server" />
  9 | </a>
 10 | 
 11 | ## Project Introduction
 12 | 
 13 | This project is an MCP Server based on Joern, providing a series of features to help developers with code review and security analysis.
 14 | 
 15 | ## Environment Requirements
 16 | 
 17 | - Python >= 3.10 (default 3.12) & uv
 18 | - Joern
 19 | 
 20 | ## Installation Steps
 21 | 
 22 | 1. Clone the project locally:
 23 |    ```bash
 24 |    git clone https://github.com/sfncat/mcp-joern.git
 25 |    cd mcp-joern
 26 |    ```
 27 | 
 28 | 2. Install Python dependencies:
 29 |    ```bash
 30 |    uv venv .venv
 31 |    source .venv/bin/activate
 32 |    uv sync
 33 |    ```
 34 | 
 35 | ## Project Structure
 36 | 
 37 | ```
 38 | ├── server.py                       # MCP Server main program
 39 | ├── test_mcp_client.py              # Test program for joern server and mcp tool
 40 | ├── test_sc_tools.py                # Direct test program for sc tools
 41 | ├── common_tools.py                 # Common utility functions
 42 | ├── server_tools.py                 # Server utility functions
 43 | ├── server_tools.sc                 # Scala implementation of server utility functions
 44 | ├── server_tools_source.sc          # Scala implementation of server utility functions,use sourceCode to get the source code of method
 45 | ├── requirements.txt                # Python dependency file
 46 | ├── sample_cline_mcp_settings.json  # Sample cline mcp configuration file
 47 | └── env_example.txt                 # Environment variables example file
 48 | ```
 49 | 
 50 | ## Usage
 51 | 
 52 | 1. Start the Joern server:
 53 |    ```bash
 54 |    joern -J-Xmx40G --server --server-host 127.0.0.1 --server-port 16162 --server-auth-username user --server-auth-password password --import server_tools.sc
 55 |    Or
 56 |    joern -J-Xmx40G --server --server-host 127.0.0.1 --server-port 16162 --server-auth-username user --server-auth-password password --import server_tools_source.sc
 57 |    ```
 58 |     If you are using it under Windows, you may need to set the JVM system variables through the command line or in the system environment variables.
 59 |    ```
 60 |    set _JAVA_OPTIONS=-Dfile.encoding=UTF-8
 61 |    ```
 62 |    set joern logging level to ERROR
 63 |    ```
 64 |    set SL_LOGGING_LEVEL=ERROR //windows
 65 |    export SL_LOGGING_LEVEL=ERROR //linux
 66 |    ```
 67 |    if you have the following warning
 68 | 
 69 |    ```
 70 |    Unable to create a system terminal, creating a dumb terminal (enable debug logging for more information)
 71 |    ```
 72 |    you can disable it by setting the environment variable
 73 |    ```
 74 |    set TERM=dumb
 75 |    export TERM=dumb
 76 |    ```
 77 |    to restore the default behavior
 78 |    ```
 79 |    set TERM=xterm-256color
 80 |    export TERM=xterm-256color
 81 |    ```
 82 | 2. Copy env_example.txt to .env
 83 |    Modify the configuration information to match the joern server startup configuration
 84 | 
 85 | 3. Run the test connection:
 86 |    Modify the information in `test_mcp_client.py` to confirm the joern server is working properly
 87 | 
 88 |    ```bash
 89 |    uv run test_mcp_client.py
 90 |    Starting MCP server test...
 91 |    ==================================================
 92 |    Testing server connection...
 93 |    [04/16/25 20:38:54] INFO     Processing request of type CallToolRequest                                                                                                                     server.py:534
 94 |    Connection test result: Successfully connected to Joern MCP, joern server version is XXX
 95 |    ```
 96 | 
 97 | 4. Configure MCP server
 98 |    Configure the mcp server in cline, refer to `sample_cline_mcp_settings.json`.
 99 | 
100 | 5. Use MCP server
101 |    Ask questions to the large language model, refer to `prompts_en.md`
102 | 
103 | ## Development Notes
104 | 
105 | - `.env` file is used to store environment variables
106 | - `.gitignore` file defines files to be ignored by Git version control
107 | - `pyproject.toml` defines the Python configuration for the project
108 | - MCP tool development
109 |   - Implement in `server_tools.sc`, add definitions in `server_tools.py`, and add tests in `test_mcp_client.py`
110 | 
111 | ## Contribution Guidelines
112 | 
113 | Welcome to submit Issues and Pull Requests to help improve the project.
114 | 
115 | Welcome to add more tools.
116 | 
117 | ## References
118 | 
119 | https://github.com/flankerhqd/jebmcp
120 | 
121 | https://docs.joern.io/server/
122 | 
123 | https://docs.joern.io/interpreter/
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
1 | requests>=2.32.3
2 | fastmcp>=2.1.0
3 | python-dotenv>=1.1.0
4 | openai>=1.0.0
```

--------------------------------------------------------------------------------
/env_example.txt:
--------------------------------------------------------------------------------

```
1 | HOST = "127.0.0.1"
2 | PORT = "16162"
3 | USER_NAME = "user"
4 | PASSWORD = "password"
5 | LOG_LEVEL = "INFO"
6 | TIMEOUT = 1000
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [project]
 2 | name = "joern-mcp"
 3 | version = "0.1.0"
 4 | description = "A simple Joern MCP Server."
 5 | readme = "README.md"
 6 | requires-python = ">=3.10"
 7 | dependencies = [
 8 |     "python-dotenv>=1.0.1",
 9 |     "fastmcp>=2.1.0",
10 |     "requests>=2.32.3",
11 | ]
12 | 
```

--------------------------------------------------------------------------------
/sample_cline_mcp_settings.json:
--------------------------------------------------------------------------------

```json
 1 | {
 2 |   "mcpServers": {
 3 |     "joern": {
 4 |       "autoApprove": [
 5 |         "ping",
 6 |         "load_cpg",
 7 |         "get_method_callees",
 8 |         "get_method_callers",
 9 |         "get_class_full_name_by_id",
10 |         "get_class_methods_by_class_full_name",
11 |         "get_method_code_by_full_name",
12 |         "get_method_code_by_id",
13 |         "get_method_full_name_by_id",
14 |         "get_call_code_by_id",
15 |         "get_method_code_by_class_full_name_and_method_name",
16 |         "get_derived_classes_by_class_full_name",
17 |         "get_parent_classes_by_class_full_name",
18 |         "get_method_by_call_id",
19 |         "get_referenced_method_full_name_by_call_id",
20 |         "get_calls_in_method_by_method_full_name"
21 |       ],
22 |       "disabled": false,
23 |       "timeout": 1800,
24 |       "command": "uv",
25 |       "args": [
26 |         "--directory",
27 |         "/home/user/github/joern_mcp",
28 |         "run",
29 |         "server.py"
30 |       ],
31 |       "transportType": "stdio",
32 |       "config": {
33 |         "host":"127.0.0.1",
34 |         "port":"16162",
35 |         "log_level":"ERROR",
36 |         "description": "Joern mcp server"
37 |       }
38 |     }
39 |   }
40 | }
```

--------------------------------------------------------------------------------
/prompts_en.md:
--------------------------------------------------------------------------------

```markdown
 1 | ## Information
 2 | 1. cpg_filepath = /home/user/cpg/com.android.nfc.cpg  
 3 | 2. class_full_name = com.android.nfc.NfcService$6
 4 | ## Processing Requirements
 5 | 1. Review the code of the onReceive method in the current class, analyze the code logic, and examine the handling logic for different Actions.
 6 | 2. If different Action handlers call other methods, continue to review the code of the called methods, and if necessary, continue analyzing subsequent called methods.
 7 | 3. For Action handlers with security risks, describe the complete handling logic, focus on whether parameters are obtained from the intent and how they are processed, and pay attention to sensitive operations or sensitive information.
 8 | 4. If security vulnerabilities exist, describe in detail the possible causes of the vulnerabilities, provide the vulnerability-related code, and generate Intent data that can reach the vulnerable branch.
 9 | 5. If there is logging in the processing logic, find the TAG string used for printing.
10 | ## Notes
11 | 1. joern server is already started  
12 | 2. joern mcp server is already started  
13 | 3. Strings in joern must use double quotes, not single quotes
14 | ## Sanitization Rules
15 | 1. No need to consider permission checks
16 | 2. No need to verify Intent source
17 | ## Output Rules
18 | 1. Output should use markdown format  
19 | 2. Intent data should be expressed in Java language 
20 | 
```

--------------------------------------------------------------------------------
/mcp_settings.json:
--------------------------------------------------------------------------------

```json
 1 | {
 2 |   "mcpServers": {
 3 |     "joern": {
 4 |       "autoApprove": [
 5 |         "ping",
 6 |         "load_cpg",
 7 |         "get_method_callees",
 8 |         "get_method_callers",
 9 |         "get_class_full_name_by_id",
10 |         "get_class_methods_by_class_full_name",
11 |         "get_method_code_by_full_name",
12 |         "get_method_code_by_id",
13 |         "get_method_full_name_by_id",
14 |         "get_call_code_by_id",
15 |         "get_method_code_by_class_full_name_and_method_name",
16 |         "get_derived_classes_by_class_full_name",
17 |         "get_parent_classes_by_class_full_name",
18 |         "get_method_by_call_id",
19 |         "get_referenced_method_full_name_by_call_id",
20 |         "get_calls_in_method_by_method_full_name"
21 |       ],
22 |       "disabled": false,
23 |       "command": "uv",
24 |       "args": [
25 |         "--directory",
26 |         "/home/kali/ssd/workspace/mcp-joern",
27 |         "run",
28 |         "server.py"
29 |       ],
30 |       "transportType": "stdio",
31 |       "description": "Joern mcp server",
32 |       "config": {
33 |         "host":"127.0.0.1",
34 |         "port":"16162",
35 |         "log_level":"ERROR",
36 |         "timeout": 1800
37 |       }
38 |     }
39 |   },
40 |   "llm": {
41 |     "model_type": "custom",
42 |     "config": {
43 |       "openai": {
44 |         "model": "gpt-4",
45 |         "base_url": "https://api.openai.com/v1",
46 |         "timeout": 30
47 |       },
48 |       "local": {
49 |         "model": "",
50 |         "base_url": "http://localhost:8000/v1",
51 |         "timeout": 30
52 |       },
53 |       "custom": {
54 |         "model": "qwq-plus-latest",
55 |         "base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
56 |         "timeout": 60
57 |       }
58 |     }
59 |   }
60 | } 
```

--------------------------------------------------------------------------------
/test_sc_tools.py:
--------------------------------------------------------------------------------

```python
 1 | import json
 2 | import sys
 3 | import os
 4 | import requests
 5 | import re
 6 | from dotenv import load_dotenv
 7 | from urllib3 import response
 8 | from common_tools import *
 9 | load_dotenv()
10 | server_endpoint = f'{os.getenv("HOST")}:{os.getenv("PORT")}'
11 | print(server_endpoint)
12 | basic_auth = (os.getenv("USER_NAME"), os.getenv("PASSWORD"))
13 | 
14 | def joern_remote(query):
15 |     """
16 |     Execute remote query and return results
17 | 
18 |     Parameters:
19 |     query -- The query string to execute
20 | 
21 |     Returns:
22 |     Returns the server response's stdout content on success
23 |     Returns None on failure, error messages will be output to stderr
24 |     """
25 |     data = {"query": query}
26 |     headers = {'Content-Type': 'application/json'}
27 | 
28 |     try:
29 |         response = requests.post(
30 |             f'http://{server_endpoint}/query-sync',
31 |             data=json.dumps(data),
32 |             auth=basic_auth,
33 |             headers=headers,
34 |             timeout=10
35 |         )
36 |         response.raise_for_status()  # 自动处理HTTP错误状态码
37 | 
38 |         result = response.json()
39 |         return remove_ansi_escape_sequences(result.get('stdout', ''))
40 | 
41 |     except requests.exceptions.RequestException as e:
42 |         sys.stderr.write(f"Request Error: {str(e)}\n")
43 |     except json.JSONDecodeError:
44 |         sys.stderr.write("Error: Invalid JSON response\n")
45 | 
46 |     return None
47 | def get_calls_in_method_by_method_full_name(method_full_name:str) -> list[str]:
48 |     """Get the calls info by the method full name which the call is in the method
49 | 
50 |     @param method_full_name: The full name of the method
51 |     @return: The calls info of the method
52 |     """
53 |     response = joern_remote(f'get_calls_in_method_by_method_full_name("{method_full_name}")')
54 |     return extract_list(response)
55 | if __name__ == "__main__":
56 |     method_full_name = "com.android.nfc.NfcService$6.onReceive:void(android.content.Context,android.content.Intent)"
57 |     print(get_calls_in_method_by_method_full_name(method_full_name))
```

--------------------------------------------------------------------------------
/server.py:
--------------------------------------------------------------------------------

```python
  1 | import json
  2 | import sys
  3 | import os
  4 | import re
  5 | import time
  6 | from typing import Dict, Any
  7 | 
  8 | import requests
  9 | from fastmcp import FastMCP
 10 | from dotenv import load_dotenv
 11 | from common_tools import *
 12 | 
 13 | load_dotenv()
 14 | 
 15 | 
 16 | def load_server_config(config_file: str = "mcp_settings.json") -> Dict[str, Any]:
 17 |     """load server config from config file"""
 18 |     try:
 19 |         with open(config_file, 'r') as f:
 20 |             config = json.load(f)
 21 |         return config.get('mcpServers').get('joern').get('config')
 22 |     except FileNotFoundError:
 23 |         print(f"config file {config_file} not exist")
 24 |         return {}
 25 |     except json.JSONDecodeError:
 26 |         print(f"config file {config_file} format error")
 27 |         return {}
 28 | joern_config = load_server_config()
 29 | # server_endpoint = f'{os.getenv("HOST")}:{os.getenv("PORT")}'
 30 | server_endpoint = f'{joern_config.get('host')}:{joern_config.get("port")}'
 31 | log_level = joern_config.get('log_level', 'ERROR')
 32 | joern_mcp = FastMCP("joern-mcp", log_level=log_level)
 33 | # joern_mcp._tool_manager.
 34 | print(server_endpoint)
 35 | basic_auth = (os.getenv("JOERN_AUTH_USERNAME"), os.getenv("JOERN_AUTH_PASSWORD"))
 36 | timeout = int(joern_config.get('timeout', '300'))
 37 | 
 38 | def joern_remote(query):
 39 |     """
 40 |     Execute remote query and return results
 41 |     
 42 |     Parameters:
 43 |     query -- The query string to execute
 44 |     
 45 |     Returns:
 46 |     Returns the server response stdout content on success
 47 |     Returns None on failure, error message will be output to stderr
 48 |     """
 49 |     data = {"query": query}
 50 |     headers = {'Content-Type': 'application/json'}
 51 | 
 52 |     try:
 53 |         response = requests.post(
 54 |             f'http://{server_endpoint}/query-sync',
 55 |             data=json.dumps(data),
 56 |             headers=headers,
 57 |             auth=basic_auth,
 58 |             timeout=timeout
 59 |         )
 60 |         response.raise_for_status()  
 61 |         
 62 |         result = response.json()
 63 |         return remove_ansi_escape_sequences(result.get('stdout', ''))
 64 |         
 65 |     except requests.exceptions.RequestException as e:
 66 |         sys.stderr.write(f"Request Error: {str(e)}\n")
 67 |     except json.JSONDecodeError:
 68 |         sys.stderr.write("Error: Invalid JSON response\n")
 69 |     
 70 |     return None
 71 | 
 72 | 
 73 | @joern_mcp.tool()
 74 | def get_help():
 75 |     """Get help information from joern server"""
 76 |     response = joern_remote('help')
 77 |     if response:
 78 |         return response
 79 |     else:
 80 |         return 'Query Failed'
 81 | 
 82 | 
 83 | @joern_mcp.tool()
 84 | def check_connection() -> str:
 85 |     """Check if the Joern MCP plugin is running"""
 86 |     try:
 87 |         metadata = extract_value(joern_remote("version"))
 88 |         if not metadata:
 89 |             return f"Failed to connect to Joern MCP! Make sure the Joern MCP server is running."
 90 |         return f"Successfully connected to Joern MCP, joern server version is {metadata}"
 91 |     except Exception as e:
 92 |         return f"Failed to connect to Joern MCP! Make sure the Joern MCP server is running."
 93 | 
 94 | SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
 95 | GENERATED_PY = os.path.join(SCRIPT_DIR, "server_tools.py")
 96 | def generate():
 97 |     """Generate and execute additional server tools from server_tools.py file.
 98 |     
 99 |     This function reads the content of server_tools.py and executes it to add
100 |     more functionality to the server.
101 |     """
102 |     with open(GENERATED_PY, "r") as f:
103 |         code = f.read()
104 |         exec(compile(code, GENERATED_PY, "exec"))
105 | 
106 | generate()
107 | 
108 | 
109 | def main():
110 |     """Start the MCP server using stdio transport.
111 |     
112 |     This is the main entry point for running the Joern MCP server.
113 |     """
114 |     joern_mcp.run(transport="stdio")
115 | 
116 | if __name__ == "__main__":
117 |     main()
118 | 
```

--------------------------------------------------------------------------------
/common_tools.py:
--------------------------------------------------------------------------------

```python
  1 | import os
  2 | import requests
  3 | import json
  4 | import re
  5 | import sys
  6 | 
  7 | def remove_ansi_escape_sequences(text: str) -> str:
  8 |     """Remove ANSI escape sequences from a string.
  9 |     
 10 |     Args:
 11 |         text (str): Input string containing ANSI escape sequences
 12 |         
 13 |     Returns:
 14 |         str: String with ANSI escape sequences removed
 15 |     """
 16 |     ansi_escape = re.compile(r'\x1b\[[0-9;]*[a-zA-Z]')
 17 |     return ansi_escape.sub('', text)
 18 | 
 19 | def extract_code_between_triple_quotes(input_str):
 20 |     """Extract content between triple quotes from a string.
 21 |     
 22 |     Args:
 23 |         input_str (str): Input string containing triple-quoted content
 24 |         
 25 |     Returns:
 26 |         str: Extracted content between triple quotes, or empty string if not found
 27 |     """
 28 |     import re
 29 |     
 30 |     # Find content between triple quotes
 31 |     pattern = r'"""(.*?)"""'
 32 |     match = re.search(pattern, input_str, re.DOTALL)
 33 |     
 34 |     if match:
 35 |         return match.group(1).strip()
 36 |     return ''
 37 | 
 38 | def extract_list(input_str):    
 39 |     """Extract a list of elements from a string representation of a Scala List.
 40 |     
 41 |     Parameters:
 42 |     input_str -- The input string containing a Scala List representation
 43 |     
 44 |     Returns:
 45 |     A Python list containing the extracted elements with cleaned data
 46 |     """
 47 |     # Check if input is empty or None
 48 |     if not input_str:
 49 |         return []
 50 |     
 51 |     # Use regex to match List content
 52 |     list_pattern = r'List\((.*?)\)$'
 53 |     list_match = re.search(list_pattern, input_str, re.DOTALL)
 54 |     if not list_match:
 55 |         return []
 56 |         
 57 |     content = list_match.group(1).strip()
 58 |     
 59 |     # Try to match content within triple quotes
 60 |     triple_quote_pattern = r'"""(.*?)"""'
 61 |     triple_quote_matches = re.findall(triple_quote_pattern, content, re.DOTALL)
 62 |     
 63 |     if triple_quote_matches:
 64 |         return triple_quote_matches
 65 |     
 66 |     # If no triple-quoted content found, try to match content within regular quotes
 67 |     single_quote_pattern = r'"((?:\\.|[^"\\])*?)"'
 68 |     single_quote_matches = re.findall(single_quote_pattern, content, re.DOTALL)
 69 |     
 70 |     elements = []
 71 |     for item in single_quote_matches:
 72 |         if item.strip():
 73 |             # Handle escape characters
 74 |             cleaned = item.replace('\\"', '"').replace('\\\\', '\\')
 75 |             elements.append(cleaned)
 76 |     
 77 |     return elements
 78 | 
 79 | def extract_quoted_string(input_str: str) -> str:
 80 |     """Extract content between quotes from a string.
 81 |     
 82 |     Args:
 83 |         input_str (str): Input string containing quoted content
 84 |         
 85 |     Returns:
 86 |         str: Extracted content between quotes, or empty string if not found
 87 |     """
 88 |     pattern = r'"(.*?)"'
 89 |     match = re.search(pattern, input_str)
 90 |     
 91 |     if match:
 92 |         return match.group(1)
 93 |     return ''
 94 | 
 95 | def extract_long_value(input_str: str) -> str:
 96 |     """Extract Long value from a string representation of a Scala Long variable.
 97 |     
 98 |     Args:
 99 |         input_str (str): Input string containing a Scala Long value (e.g. 'val res4: Long = 90194313219L')
100 |         
101 |     Returns:
102 |         str: Extracted Long value including the 'L' suffix, or empty string if not found
103 |     """
104 |     pattern = r'= (\d+L)'
105 |     match = re.search(pattern, input_str)
106 |     
107 |     if match:
108 |         return match.group(1)
109 |     return ''
110 | 
111 | def extract_value(input_str: str) -> str:
112 |     """Extract value from a string based on its pattern.
113 |     
114 |     This function automatically selects the appropriate extraction method based on
115 |     the input string format:
116 |     * If contains 'Long =', uses extract_long_value
117 |     * If contains triple quotes, uses extract_code_between_triple_quotes
118 |     * If contains single quotes, uses extract_quoted_string
119 |     
120 |     Args:
121 |         input_str (str): Input string containing a value to extract
122 |         
123 |     Returns:
124 |         str: Extracted value based on the detected pattern
125 |     """
126 |     if 'Long =' in input_str:
127 |         return extract_long_value(input_str)
128 |     elif 'String = """' in input_str:
129 |         return extract_code_between_triple_quotes(input_str)
130 |     elif 'String = "' in input_str:
131 |         return extract_quoted_string(input_str)
132 |     else:
133 |         return input_str
```

--------------------------------------------------------------------------------
/server_tools.py:
--------------------------------------------------------------------------------

```python
  1 | # NOTE: This file has been automatically generated, do not modify!
  2 | # Architecture based on https://github.com/mrexodia/ida-pro-mcp (MIT License)
  3 | from http.client import responses
  4 | from typing import Annotated, Optional, TypedDict, Generic, TypeVar
  5 | from pydantic import Field
  6 | 
  7 | T = TypeVar("T")
  8 | 
  9 | @joern_mcp.tool()
 10 | def ping()->str:
 11 |     """Checks if the Joern server is running and responsive by querying its version
 12 |     
 13 |     @return: The Joern server version if successful, 'Query Failed' if the server is not responding
 14 |     """
 15 |     response = joern_remote('version')
 16 |     if response:
 17 |         return extract_value(response)
 18 |     else:
 19 |         return 'Query Failed'
 20 | 
 21 | @joern_mcp.tool()
 22 | def load_cpg(cpg_filepath: str) -> str:
 23 |     """
 24 |     Loads a CPG from a file if the cpg is not loaded or the cpg is not the same as the filepath.
 25 | 
 26 |     Args:
 27 |         cpg_filepath (str): The path to the CPG file, the filepath is absolute path.
 28 | 
 29 |     Returns:
 30 |         str: True if the CPG is loaded successfully, False otherwise.
 31 |     """
 32 |     
 33 |     # return extract_value(joern_remote(f'val cpg = CpgLoader.load("{cpg_filepath}")'))
 34 |     return extract_value(joern_remote(f'load_cpg("{cpg_filepath}")'))
 35 | 
 36 | @joern_mcp.tool()
 37 | def get_method_callees(method_full_name: str) -> list[str]:
 38 |     """Retrieves a list of methods info that are called by the specified method
 39 |     
 40 |    @param method_full_name: The fully qualified name of the source method(e.g., com.android.nfc.NfcService$6.onReceive:void(android.content.Context,android.content.Intent))
 41 |    @return: List of full name, name, signature and id of methods which call the source method
 42 |     """
 43 |     # responses =  joern_remote(f'cpg.method.fullNameExact("{method_full_name}").head.callee.distinct.map(m => (s"methodFullName=$' + '{m.fullName} methodId=${m.id}L")).l')
 44 |     responses = joern_remote(f'get_method_callees("{method_full_name}")')
 45 |     return extract_list(responses)  
 46 | 
 47 | @joern_mcp.tool()
 48 | def get_method_callers(method_full_name: str) -> list[str]:
 49 |     """Retrieves a list of methods that call the specified method
 50 |     
 51 |     @param method_full_name: The fully qualified name of the source method(e.g., com.android.nfc.NfcService$6.onReceive:void(android.content.Context,android.content.Intent))
 52 |     @return: List of full name, name, signature and id of methods called by the source method
 53 |     """
 54 |     responses = joern_remote(f'get_method_callers("{method_full_name}")')
 55 |     return extract_list(responses)
 56 | 
 57 | @joern_mcp.tool()
 58 | def get_class_full_name_by_id(class_id:str) -> str:
 59 |     """Retrieves the fully name of a class by its ID
 60 |     
 61 |     @param id: The unique identifier of the class (typeDecl), the id is a Long int string, like '111669149702L'
 62 |     @return: The fully name of the class (e.g., com.android.nfc.NfcService$6)
 63 |     """
 64 |     response =  joern_remote(f'get_class_full_name_by_id("{class_id}")')
 65 |     return extract_value(response)
 66 | 
 67 | @joern_mcp.tool()
 68 | def get_class_methods_by_class_full_name(class_full_name:str) -> list[str]:
 69 |     """Get the methods of a class by its fully qualified name
 70 |   
 71 |     @param class_full_name: The fully qualified name of the class
 72 |     @return: List of full name, name, signature and id of methods in the class
 73 |     """
 74 |     response = joern_remote(f'get_class_methods_by_class_full_name("{class_full_name}")')
 75 |     return extract_list(response)
 76 | 
 77 | @joern_mcp.tool()
 78 | def get_method_code_by_full_name(method_full_name:str) -> str:
 79 |     """Get the code of a method by its fully name, If you know the full name of the method, you can use this tool to get the method code directly. 
 80 |     If you only know the full name of the class and the name of the method, you should use get_method_code_by_class_full_name_and_method_name
 81 |     @param method_full_name: The fully qualified name of the method (e.g., com.android.nfc.NfcService$6.onReceive:void(android.content.Context,android.content.Intent))
 82 |     @return: The source code of the specified method
 83 |     """
 84 |     response = joern_remote(f'get_method_code_by_method_full_name("{method_full_name}")')
 85 |     return extract_value(response)
 86 | 
 87 | @joern_mcp.tool()
 88 | def get_method_code_by_id(method_id:str) -> str:
 89 |     """Get the code of a method by its class full name and method name
 90 |   
 91 |     @param class_full_name: The fully qualified name of the class
 92 |     @param method_name: The name of the method
 93 |     @return: List of full name, name, signature and id of methods in the class
 94 |     """
 95 |     response =  joern_remote(f'get_method_code_by_id("{method_id}")')
 96 |     return extract_value(response)
 97 | 
 98 | @joern_mcp.tool()
 99 | def get_method_full_name_by_id(method_id:str) -> str:
100 |     """Retrieves the fully qualified name of a method by its ID
101 |     
102 |     @param id: The unique identifier of the method, the id is a Long int string, like '111669149702L'
103 |     @return: The fully qualified name of the method (e.g., com.android.nfc.NfcService$6.onReceive:void(android.content.Context,android.content.Intent))
104 |     """
105 |     response = joern_remote(f'get_method_full_name_by_id("{method_id}")')
106 |     return extract_value(response)
107 | 
108 | @joern_mcp.tool()
109 | def get_call_code_by_id(code_id:str) -> str:
110 |     """Get the source code of a specific call node from the loaded CPG by the call id
111 |     
112 |     @param id: The unique identifier of the call node, the id is a Long int string, like '111669149702L'
113 |     @return: The source code of the specified call
114 |     """
115 |     response =  joern_remote(f'get_call_code_by_id("{code_id}")')
116 |     return extract_value(response)
117 | 
118 | @joern_mcp.tool()
119 | def get_method_code_by_class_full_name_and_method_name(class_full_name:str, method_name:str) -> list[str]:
120 |     """Get the code of a method by its class full name and method name,
121 |     this tool is usually used when you don't know the full name of the method, but you know the full name of the class and the name of the method. If there are multiple methods with the same name in the class, the code of all methods will be returned.
122 |   
123 |     @param class_full_name: The fully qualified name of the class, like 'com.android.nfc.NfcService'
124 |     @param method_name: The name of the method, like 'onReceive'
125 |     @return: List of full name, name, signature and id of methods in the class
126 |     """
127 |     responses = joern_remote(f'get_method_code_by_class_full_name_and_method_name("{class_full_name}", "{method_name}")')
128 |     return extract_list(responses)
129 | 
130 | # @joern_mcp.tool()
131 | # def get_method_by_full_name_without_signature(full_name_without_signature:str) -> list[str]:
132 | #     """Get the info of a method list by its fully qualified name without signature
133 |     
134 | #     @param full_name_without_signature: fully qualified name of methodwithout signature,like com.android.nfc.NfcService.onReceive
135 | #     @return: The info of the methods, including the full name, name, signature and id
136 | #     """
137 | #     response = joern_remote(f'get_method_by_full_name_without_signature("{full_name_without_signature}")')
138 | #     return extract_list(response)
139 | 
140 | @joern_mcp.tool()
141 | def get_derived_classes_by_class_full_name(class_full_name:str) -> list[str]:
142 |     """Get the derived classes of a class
143 |     
144 |     @param class_full_name: The fully qualified name of the class
145 |     @return: The derived classes info of the class, including the full name, name and id
146 |     """
147 |     response = joern_remote(f'get_derived_classes_by_class_full_name("{class_full_name}")')
148 |     return extract_list(response)
149 | 
150 | @joern_mcp.tool()
151 | def get_parent_classes_by_class_full_name(class_full_name:str) -> list[str]:
152 |     """Get the parent classes of a class
153 |     
154 |     @param class_full_name: The fully qualified name of the class
155 |     @return: The parent classes info of the class, including the full name, name and id
156 |     """
157 |     response = joern_remote(f'get_parent_classes_by_class_full_name("{class_full_name}")')
158 |     return extract_list(response)
159 | 
160 | @joern_mcp.tool()
161 | def get_method_by_call_id(call_id:str) -> str:
162 |     """Get the method info by the call id which the call is in the method
163 |   
164 |     @param id: The id of the call
165 |     @return: The method info of the call
166 |     """
167 |     response =  joern_remote(f'get_method_by_call_id("{call_id}")')
168 |     return extract_value(response)
169 | 
170 | @joern_mcp.tool()
171 | def get_referenced_method_full_name_by_call_id(call_id:str) -> str:
172 |     """Get the method info by the call id which the call is referenced the method
173 |     
174 |     @param id: The id of the call
175 |     @return: The method info of the call
176 |     """
177 |     response =  joern_remote(f'get_referenced_method_full_name_by_call_id("{call_id}")')
178 |     return extract_value(response)   
179 | 
180 | @joern_mcp.tool()
181 | def get_calls_in_method_by_method_full_name(method_full_name:str) -> list[str]:
182 |     """Get the calls info by the method full name which the call is in the method
183 | 
184 |     @param method_full_name: The full name of the method
185 |     @return: The calls info of the method
186 |     """
187 |     response = joern_remote(f'get_calls_in_method_by_method_full_name("{method_full_name}")')
188 |     return extract_list(response)
```

--------------------------------------------------------------------------------
/test_mcp_client.py:
--------------------------------------------------------------------------------

```python
  1 | import os
  2 | import asyncio
  3 | from dotenv import load_dotenv
  4 | from fastmcp import Client
  5 | from fastmcp.client.transports import PythonStdioTransport
  6 | 
  7 | async def test_connection(client):
  8 |     """Test server connection"""
  9 |     print("Testing [check_connection] server connection...")
 10 |     try:
 11 |         result = await client.call_tool("check_connection")
 12 |         print(f"Connection test result: {result[0].text}")
 13 |         return True
 14 |     except Exception as e:
 15 |         print(f"Connection test failed: {str(e)}")
 16 |         return False
 17 | 
 18 | async def test_ping(client):
 19 |     """Test ping functionality"""
 20 |     print("Testing [ping] ping...")
 21 |     try:
 22 |         result = await client.call_tool("ping")
 23 |         print(f"Ping result: {result[0].text}")
 24 |         return True
 25 |     except Exception as e:
 26 |         print(f"Ping test failed: {str(e)}")
 27 |         return False
 28 | 
 29 | async def test_load_cpg(client, cpg_path):
 30 |     """Test loading CPG file"""
 31 |     print("Testing [load_cpg] CPG file loading...")
 32 |     try:
 33 |         result = await client.call_tool("load_cpg", {"cpg_filepath": cpg_path})
 34 |         print(f"CPG loading result: {result[0].text}")
 35 |         return True
 36 |     except Exception as e:
 37 |         print(f"CPG loading failed: {str(e)}")
 38 |         return False
 39 | 
 40 | async def test_method_info_queries(client, method_full_name, method_id, full_name_without_signature):
 41 |     """Test method-related queries"""
 42 |     print("Testing method queries...")
 43 |     success = True
 44 |     
 45 |     try:
 46 |         # Test getting method callers
 47 |         print("Testing [get_method_callers] get method callers...")
 48 |         callers = await client.call_tool("get_method_callers", {"method_full_name": method_full_name})
 49 |         if callers:
 50 |             print(f"Method callers: {callers[0].text}")
 51 |         else:
 52 |             print(f"Method callers: []")
 53 |     except Exception as e:
 54 |         print(f"Failed to get method callers: {str(e)}")
 55 |         success = False
 56 |     
 57 |     try:
 58 |         # Test getting method callees
 59 |         print("Testing [get_method_callees] get method callees...")
 60 |         callees = await client.call_tool("get_method_callees", {"method_full_name": method_full_name})
 61 |         if callees:
 62 |             print(f"Method callees: {callees[0].text}")
 63 |         else:
 64 |             print(f"Method callees: []")
 65 |     except Exception as e:
 66 |         print(f"Failed to get method callees: {str(e)}")
 67 |         success = False
 68 | 
 69 |     try:
 70 |         # Test getting method full name by ID
 71 |         print("Testing [get_method_full_name_by_id] get method full name by ID...")
 72 |         full_name = await client.call_tool("get_method_full_name_by_id", {"id": method_id})
 73 |         print(f"Method full name: {full_name[0].text}")
 74 |     except Exception as e:
 75 |         print(f"Failed to get method full name: {str(e)}")
 76 |         success = False   
 77 | 
 78 | 
 79 |     try:
 80 |         # Test getting method by full name without signature
 81 |         print("Testing [get_method_by_full_name_without_signature] get method by full name without signature...")
 82 |         method = await client.call_tool("get_method_by_full_name_without_signature", {"full_name_without_signature": full_name_without_signature})
 83 |         print(f"Method info: {method[0].text}")
 84 |     except Exception as e:
 85 |         print(f"Failed to get method info: {str(e)}")
 86 |         success = False
 87 |     
 88 |     return success
 89 | 
 90 | async def test_class_info_queries(client, class_full_name):
 91 |     """Test class-related queries"""
 92 |     print("Testing class queries...")
 93 |     success = True
 94 |     
 95 |     try:
 96 |         # Test getting class methods
 97 |         print("Testing [get_class_methods_by_class_full_name] get class methods...")
 98 |         methods = await client.call_tool("get_class_methods_by_class_full_name", {"class_full_name": class_full_name})
 99 |         if methods:
100 |             print(f"Class methods: {methods[0].text}")
101 |         else:
102 |             print(f"Class methods: []")
103 |     except Exception as e:
104 |         print(f"Failed to get class methods: {str(e)}")
105 |         success = False
106 |     
107 |     try:
108 |         # Test getting derived classes
109 |         print("Testing [get_derived_classes_by_class_full_name] get derived classes...")
110 |         derived = await client.call_tool("get_derived_classes_by_class_full_name", {"class_full_name": class_full_name})
111 |         if derived:
112 |             print(f"Derived classes: {derived[0].text}")
113 |         else:
114 |             print(f"Derived classes: []")
115 |     except Exception as e:
116 |         print(f"Failed to get derived classes: {str(e)}")
117 |         success = False
118 |     
119 |     try:
120 |         # Test getting parent classes
121 |         print("Testing [get_parent_classes_by_class_full_name] get parent classes...")
122 |         parents = await client.call_tool("get_parent_classes_by_class_full_name", {"class_full_name": class_full_name})
123 |         if parents:
124 |             print(f"Parent classes: {parents[0].text}")
125 |         else:
126 |             print(f"Parent classes: []")
127 |     except Exception as e:
128 |         print(f"Failed to get parent classes: {str(e)}")
129 |         success = False
130 |     
131 |     return success
132 | 
133 | async def test_call_info_queries(client, call_id, method_full_name):
134 |     """Test call-related queries"""
135 |     print("\nTesting call queries...")
136 |     success = True
137 |     
138 |     try:
139 |         # Test getting call code
140 |         print("Testing [get_call_code_by_id] get call code...")
141 |         code = await client.call_tool("get_call_code_by_id", {"id": call_id})
142 |         print(f"Call code: {code[0].text}")
143 |     except Exception as e:
144 |         print(f"Failed to get call code: {str(e)}")
145 |         success = False
146 |     
147 |     try:
148 |         # Test getting method by call ID
149 |         print("Testing [get_method_by_call_id] get method by call ID...")
150 |         method = await client.call_tool("get_method_by_call_id", {"id": call_id})
151 |         print(f"Method info: {method[0].text}")
152 |     except Exception as e:
153 |         print(f"Failed to get method by call ID: {str(e)}")
154 |         success = False
155 |     
156 |     try:
157 |         # Test getting referenced method full name
158 |         print("Testing [get_referenced_method_full_name_by_call_id] get referenced method full name...")
159 |         ref_method = await client.call_tool("get_referenced_method_full_name_by_call_id", {"id": call_id})
160 |         print(f"Referenced method: {ref_method[0].text}")
161 |     except Exception as e:
162 |         print(f"Failed to get referenced method: {str(e)}")
163 |         success = False
164 |     
165 |     print("Testing [get_calls_in_method_by_method_full_name] get calls in method...")
166 |     try:
167 |         calls = await client.call_tool("get_calls_in_method_by_method_full_name", {"method_full_name": method_full_name})
168 |         if calls:
169 |             print(f"Calls in method: {calls[0].text}")
170 |         else:
171 |             print(f"Calls in method: []")
172 |         success =  True
173 |     except Exception as e:
174 |         print(f"Failed to get calls in method: {str(e)}")
175 |         success =  False
176 |     return success
177 | 
178 | async def test_method_code_queries(client, method_full_name, class_full_name, method_name, method_id):
179 |     """Test method name-related queries"""
180 |     print("Testing method name queries...")
181 |     success = True
182 |     
183 |     try:
184 |         # Test getting method code
185 |         print("Testing [get_method_code_by_full_name] get method code...")
186 |         code = await client.call_tool("get_method_code_by_full_name", {"method_full_name": method_full_name})
187 |         print(f"Method code: {code[0].text}")
188 |     except Exception as e:
189 |         print(f"Failed to get method code: {str(e)}")
190 |         success = False
191 | 
192 |     try:
193 |         # Test getting method code by class full name and method name
194 |         print("Testing [get_method_code_by_class_full_name_and_method_name] get method code by class and method name...")
195 |         codes = await client.call_tool("get_method_code_by_class_full_name_and_method_name", {
196 |             "class_full_name": class_full_name,
197 |             "method_name": method_name
198 |         })
199 |         print(f"Method codes: {codes[0].text}")
200 |     except Exception as e:
201 |         print(f"Failed to get method codes: {str(e)}")
202 |         success = False    
203 | 
204 |     try:
205 |         # Test getting method code by ID
206 |         print("Testing [get_method_code_by_id] get method code by ID...")
207 |         code = await client.call_tool("get_method_code_by_id", {"id": method_id})
208 |         print(f"Method code: {code[0].text}")
209 |     except Exception as e:
210 |         print(f"Failed to get method code: {str(e)}")
211 |         success = False
212 |     
213 |     return success
214 | 
215 | 
216 | async def main():
217 |     """Main test function"""
218 |     print("Starting MCP server test...")
219 |     print("=" * 50)
220 |     
221 |     # Load environment variables
222 |     load_dotenv()
223 |     SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
224 |     # Create client
225 |     client = Client(
226 |         transport=PythonStdioTransport('server.py'),
227 |         roots=[f"file://{SCRIPT_DIR}"]  # Replace with actual workspace directory
228 |     )
229 |     
230 |     async with client:
231 |         # Test basic connection
232 |         if not await test_connection(client):
233 |             print("Server connection test failed, terminating test")
234 |             return
235 |         
236 |         # Test ping
237 |         if not await test_ping(client):
238 |             print("Ping test failed, terminating test")
239 |             return
240 |         
241 |         # Test CPG loading
242 |         cpg_path = os.path.join(SCRIPT_DIR,"com.android.nfc.cpg")  # Replace with actual CPG file path
243 |         if not await test_load_cpg(client, cpg_path):
244 |             print("CPG loading test failed, terminating test")
245 |             return
246 |         
247 |         # Test method queries
248 |         method_full_name = "com.android.nfc.NfcService$6.onReceive:void(android.content.Context,android.content.Intent)"
249 |         method_name = "onReceive"
250 |         method_id = "111669160511L"
251 |         call_id = "30064950783L"
252 |         full_name_without_signature = "com.android.nfc.NfcService$6.onReceive"
253 |         class_full_name = "com.android.nfc.NfcService"  # Replace with actual class name
254 |         
255 |         if not await test_method_info_queries(client, method_full_name, method_id, full_name_without_signature):
256 |             print("Method queries test failed")
257 |         
258 |         # Test class queries
259 |         if not await test_class_info_queries(client, class_full_name):
260 |             print("Class queries test failed")
261 | 
262 |         # Test method code queries
263 |         if not await test_method_code_queries(client, method_full_name, class_full_name, method_name, method_id):
264 |             print("Method ID queries test failed")
265 |         
266 |         # Test call queries
267 |         if not await test_call_info_queries(client, call_id, method_full_name):
268 |             print("Call queries test failed")
269 |         
270 | 
271 |     print("Test completed!")
272 | 
273 | if __name__ == "__main__":
274 |     asyncio.run(main())
```