# Directory Structure
```
├── .gitignore
├── .python-version
├── common_tools.py
├── env_example.txt
├── LICENSE
├── mcp_settings.json
├── prompts_cn.md
├── prompts_en.md
├── pyproject.toml
├── README_cn.md
├── README.md
├── requirements.txt
├── sample_cline_mcp_settings.json
├── server_tools_source.sc
├── server_tools.py
├── server_tools.sc
├── server.py
├── test_mcp_client.py
├── test_sc_tools.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.12
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
/.scala-build/
/.vscode/
/.metals/
/.bsp/
.idea
.env
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
[](https://mseep.ai/app/sfncat-mcp-joern)
# Joern MCP Server
A simple MCP Server for Joern.
<a href="https://glama.ai/mcp/servers/@sfncat/mcp-joern">
<img width="380" height="200" src="https://glama.ai/mcp/servers/@sfncat/mcp-joern/badge" alt="Joern Server MCP server" />
</a>
## Project Introduction
This project is an MCP Server based on Joern, providing a series of features to help developers with code review and security analysis.
## Environment Requirements
- Python >= 3.10 (default 3.12) & uv
- Joern
## Installation Steps
1. Clone the project locally:
```bash
git clone https://github.com/sfncat/mcp-joern.git
cd mcp-joern
```
2. Install Python dependencies:
```bash
uv venv .venv
source .venv/bin/activate
uv sync
```
## Project Structure
```
├── server.py # MCP Server main program
├── test_mcp_client.py # Test program for joern server and mcp tool
├── test_sc_tools.py # Direct test program for sc tools
├── common_tools.py # Common utility functions
├── server_tools.py # Server utility functions
├── server_tools.sc # Scala implementation of server utility functions
├── server_tools_source.sc # Scala implementation of server utility functions,use sourceCode to get the source code of method
├── requirements.txt # Python dependency file
├── sample_cline_mcp_settings.json # Sample cline mcp configuration file
└── env_example.txt # Environment variables example file
```
## Usage
1. Start the Joern server:
```bash
joern -J-Xmx40G --server --server-host 127.0.0.1 --server-port 16162 --server-auth-username user --server-auth-password password --import server_tools.sc
Or
joern -J-Xmx40G --server --server-host 127.0.0.1 --server-port 16162 --server-auth-username user --server-auth-password password --import server_tools_source.sc
```
If you are using it under Windows, you may need to set the JVM system variables through the command line or in the system environment variables.
```
set _JAVA_OPTIONS=-Dfile.encoding=UTF-8
```
set joern logging level to ERROR
```
set SL_LOGGING_LEVEL=ERROR //windows
export SL_LOGGING_LEVEL=ERROR //linux
```
if you have the following warning
```
Unable to create a system terminal, creating a dumb terminal (enable debug logging for more information)
```
you can disable it by setting the environment variable
```
set TERM=dumb
export TERM=dumb
```
to restore the default behavior
```
set TERM=xterm-256color
export TERM=xterm-256color
```
2. Copy env_example.txt to .env
Modify the configuration information to match the joern server startup configuration
3. Run the test connection:
Modify the information in `test_mcp_client.py` to confirm the joern server is working properly
```bash
uv run test_mcp_client.py
Starting MCP server test...
==================================================
Testing server connection...
[04/16/25 20:38:54] INFO Processing request of type CallToolRequest server.py:534
Connection test result: Successfully connected to Joern MCP, joern server version is XXX
```
4. Configure MCP server
Configure the mcp server in cline, refer to `sample_cline_mcp_settings.json`.
5. Use MCP server
Ask questions to the large language model, refer to `prompts_en.md`
## Development Notes
- `.env` file is used to store environment variables
- `.gitignore` file defines files to be ignored by Git version control
- `pyproject.toml` defines the Python configuration for the project
- MCP tool development
- Implement in `server_tools.sc`, add definitions in `server_tools.py`, and add tests in `test_mcp_client.py`
## Contribution Guidelines
Welcome to submit Issues and Pull Requests to help improve the project.
Welcome to add more tools.
## References
https://github.com/flankerhqd/jebmcp
https://docs.joern.io/server/
https://docs.joern.io/interpreter/
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
requests>=2.32.3
fastmcp>=2.1.0
python-dotenv>=1.1.0
openai>=1.0.0
```
--------------------------------------------------------------------------------
/env_example.txt:
--------------------------------------------------------------------------------
```
HOST = "127.0.0.1"
PORT = "16162"
USER_NAME = "user"
PASSWORD = "password"
LOG_LEVEL = "INFO"
TIMEOUT = 1000
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "joern-mcp"
version = "0.1.0"
description = "A simple Joern MCP Server."
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"python-dotenv>=1.0.1",
"fastmcp>=2.1.0",
"requests>=2.32.3",
]
```
--------------------------------------------------------------------------------
/sample_cline_mcp_settings.json:
--------------------------------------------------------------------------------
```json
{
"mcpServers": {
"joern": {
"autoApprove": [
"ping",
"load_cpg",
"get_method_callees",
"get_method_callers",
"get_class_full_name_by_id",
"get_class_methods_by_class_full_name",
"get_method_code_by_full_name",
"get_method_code_by_id",
"get_method_full_name_by_id",
"get_call_code_by_id",
"get_method_code_by_class_full_name_and_method_name",
"get_derived_classes_by_class_full_name",
"get_parent_classes_by_class_full_name",
"get_method_by_call_id",
"get_referenced_method_full_name_by_call_id",
"get_calls_in_method_by_method_full_name"
],
"disabled": false,
"timeout": 1800,
"command": "uv",
"args": [
"--directory",
"/home/user/github/joern_mcp",
"run",
"server.py"
],
"transportType": "stdio",
"config": {
"host":"127.0.0.1",
"port":"16162",
"log_level":"ERROR",
"description": "Joern mcp server"
}
}
}
}
```
--------------------------------------------------------------------------------
/prompts_en.md:
--------------------------------------------------------------------------------
```markdown
## Information
1. cpg_filepath = /home/user/cpg/com.android.nfc.cpg
2. class_full_name = com.android.nfc.NfcService$6
## Processing Requirements
1. Review the code of the onReceive method in the current class, analyze the code logic, and examine the handling logic for different Actions.
2. If different Action handlers call other methods, continue to review the code of the called methods, and if necessary, continue analyzing subsequent called methods.
3. For Action handlers with security risks, describe the complete handling logic, focus on whether parameters are obtained from the intent and how they are processed, and pay attention to sensitive operations or sensitive information.
4. If security vulnerabilities exist, describe in detail the possible causes of the vulnerabilities, provide the vulnerability-related code, and generate Intent data that can reach the vulnerable branch.
5. If there is logging in the processing logic, find the TAG string used for printing.
## Notes
1. joern server is already started
2. joern mcp server is already started
3. Strings in joern must use double quotes, not single quotes
## Sanitization Rules
1. No need to consider permission checks
2. No need to verify Intent source
## Output Rules
1. Output should use markdown format
2. Intent data should be expressed in Java language
```
--------------------------------------------------------------------------------
/mcp_settings.json:
--------------------------------------------------------------------------------
```json
{
"mcpServers": {
"joern": {
"autoApprove": [
"ping",
"load_cpg",
"get_method_callees",
"get_method_callers",
"get_class_full_name_by_id",
"get_class_methods_by_class_full_name",
"get_method_code_by_full_name",
"get_method_code_by_id",
"get_method_full_name_by_id",
"get_call_code_by_id",
"get_method_code_by_class_full_name_and_method_name",
"get_derived_classes_by_class_full_name",
"get_parent_classes_by_class_full_name",
"get_method_by_call_id",
"get_referenced_method_full_name_by_call_id",
"get_calls_in_method_by_method_full_name"
],
"disabled": false,
"command": "uv",
"args": [
"--directory",
"/home/kali/ssd/workspace/mcp-joern",
"run",
"server.py"
],
"transportType": "stdio",
"description": "Joern mcp server",
"config": {
"host":"127.0.0.1",
"port":"16162",
"log_level":"ERROR",
"timeout": 1800
}
}
},
"llm": {
"model_type": "custom",
"config": {
"openai": {
"model": "gpt-4",
"base_url": "https://api.openai.com/v1",
"timeout": 30
},
"local": {
"model": "",
"base_url": "http://localhost:8000/v1",
"timeout": 30
},
"custom": {
"model": "qwq-plus-latest",
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"timeout": 60
}
}
}
}
```
--------------------------------------------------------------------------------
/test_sc_tools.py:
--------------------------------------------------------------------------------
```python
import json
import sys
import os
import requests
import re
from dotenv import load_dotenv
from urllib3 import response
from common_tools import *
load_dotenv()
server_endpoint = f'{os.getenv("HOST")}:{os.getenv("PORT")}'
print(server_endpoint)
basic_auth = (os.getenv("USER_NAME"), os.getenv("PASSWORD"))
def joern_remote(query):
"""
Execute remote query and return results
Parameters:
query -- The query string to execute
Returns:
Returns the server response's stdout content on success
Returns None on failure, error messages will be output to stderr
"""
data = {"query": query}
headers = {'Content-Type': 'application/json'}
try:
response = requests.post(
f'http://{server_endpoint}/query-sync',
data=json.dumps(data),
auth=basic_auth,
headers=headers,
timeout=10
)
response.raise_for_status() # 自动处理HTTP错误状态码
result = response.json()
return remove_ansi_escape_sequences(result.get('stdout', ''))
except requests.exceptions.RequestException as e:
sys.stderr.write(f"Request Error: {str(e)}\n")
except json.JSONDecodeError:
sys.stderr.write("Error: Invalid JSON response\n")
return None
def get_calls_in_method_by_method_full_name(method_full_name:str) -> list[str]:
"""Get the calls info by the method full name which the call is in the method
@param method_full_name: The full name of the method
@return: The calls info of the method
"""
response = joern_remote(f'get_calls_in_method_by_method_full_name("{method_full_name}")')
return extract_list(response)
if __name__ == "__main__":
method_full_name = "com.android.nfc.NfcService$6.onReceive:void(android.content.Context,android.content.Intent)"
print(get_calls_in_method_by_method_full_name(method_full_name))
```
--------------------------------------------------------------------------------
/server.py:
--------------------------------------------------------------------------------
```python
import json
import sys
import os
import re
import time
from typing import Dict, Any
import requests
from fastmcp import FastMCP
from dotenv import load_dotenv
from common_tools import *
load_dotenv()
def load_server_config(config_file: str = "mcp_settings.json") -> Dict[str, Any]:
"""load server config from config file"""
try:
with open(config_file, 'r') as f:
config = json.load(f)
return config.get('mcpServers').get('joern').get('config')
except FileNotFoundError:
print(f"config file {config_file} not exist")
return {}
except json.JSONDecodeError:
print(f"config file {config_file} format error")
return {}
joern_config = load_server_config()
# server_endpoint = f'{os.getenv("HOST")}:{os.getenv("PORT")}'
server_endpoint = f'{joern_config.get('host')}:{joern_config.get("port")}'
log_level = joern_config.get('log_level', 'ERROR')
joern_mcp = FastMCP("joern-mcp", log_level=log_level)
# joern_mcp._tool_manager.
print(server_endpoint)
basic_auth = (os.getenv("JOERN_AUTH_USERNAME"), os.getenv("JOERN_AUTH_PASSWORD"))
timeout = int(joern_config.get('timeout', '300'))
def joern_remote(query):
"""
Execute remote query and return results
Parameters:
query -- The query string to execute
Returns:
Returns the server response stdout content on success
Returns None on failure, error message will be output to stderr
"""
data = {"query": query}
headers = {'Content-Type': 'application/json'}
try:
response = requests.post(
f'http://{server_endpoint}/query-sync',
data=json.dumps(data),
headers=headers,
auth=basic_auth,
timeout=timeout
)
response.raise_for_status()
result = response.json()
return remove_ansi_escape_sequences(result.get('stdout', ''))
except requests.exceptions.RequestException as e:
sys.stderr.write(f"Request Error: {str(e)}\n")
except json.JSONDecodeError:
sys.stderr.write("Error: Invalid JSON response\n")
return None
@joern_mcp.tool()
def get_help():
"""Get help information from joern server"""
response = joern_remote('help')
if response:
return response
else:
return 'Query Failed'
@joern_mcp.tool()
def check_connection() -> str:
"""Check if the Joern MCP plugin is running"""
try:
metadata = extract_value(joern_remote("version"))
if not metadata:
return f"Failed to connect to Joern MCP! Make sure the Joern MCP server is running."
return f"Successfully connected to Joern MCP, joern server version is {metadata}"
except Exception as e:
return f"Failed to connect to Joern MCP! Make sure the Joern MCP server is running."
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
GENERATED_PY = os.path.join(SCRIPT_DIR, "server_tools.py")
def generate():
"""Generate and execute additional server tools from server_tools.py file.
This function reads the content of server_tools.py and executes it to add
more functionality to the server.
"""
with open(GENERATED_PY, "r") as f:
code = f.read()
exec(compile(code, GENERATED_PY, "exec"))
generate()
def main():
"""Start the MCP server using stdio transport.
This is the main entry point for running the Joern MCP server.
"""
joern_mcp.run(transport="stdio")
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/common_tools.py:
--------------------------------------------------------------------------------
```python
import os
import requests
import json
import re
import sys
def remove_ansi_escape_sequences(text: str) -> str:
"""Remove ANSI escape sequences from a string.
Args:
text (str): Input string containing ANSI escape sequences
Returns:
str: String with ANSI escape sequences removed
"""
ansi_escape = re.compile(r'\x1b\[[0-9;]*[a-zA-Z]')
return ansi_escape.sub('', text)
def extract_code_between_triple_quotes(input_str):
"""Extract content between triple quotes from a string.
Args:
input_str (str): Input string containing triple-quoted content
Returns:
str: Extracted content between triple quotes, or empty string if not found
"""
import re
# Find content between triple quotes
pattern = r'"""(.*?)"""'
match = re.search(pattern, input_str, re.DOTALL)
if match:
return match.group(1).strip()
return ''
def extract_list(input_str):
"""Extract a list of elements from a string representation of a Scala List.
Parameters:
input_str -- The input string containing a Scala List representation
Returns:
A Python list containing the extracted elements with cleaned data
"""
# Check if input is empty or None
if not input_str:
return []
# Use regex to match List content
list_pattern = r'List\((.*?)\)$'
list_match = re.search(list_pattern, input_str, re.DOTALL)
if not list_match:
return []
content = list_match.group(1).strip()
# Try to match content within triple quotes
triple_quote_pattern = r'"""(.*?)"""'
triple_quote_matches = re.findall(triple_quote_pattern, content, re.DOTALL)
if triple_quote_matches:
return triple_quote_matches
# If no triple-quoted content found, try to match content within regular quotes
single_quote_pattern = r'"((?:\\.|[^"\\])*?)"'
single_quote_matches = re.findall(single_quote_pattern, content, re.DOTALL)
elements = []
for item in single_quote_matches:
if item.strip():
# Handle escape characters
cleaned = item.replace('\\"', '"').replace('\\\\', '\\')
elements.append(cleaned)
return elements
def extract_quoted_string(input_str: str) -> str:
"""Extract content between quotes from a string.
Args:
input_str (str): Input string containing quoted content
Returns:
str: Extracted content between quotes, or empty string if not found
"""
pattern = r'"(.*?)"'
match = re.search(pattern, input_str)
if match:
return match.group(1)
return ''
def extract_long_value(input_str: str) -> str:
"""Extract Long value from a string representation of a Scala Long variable.
Args:
input_str (str): Input string containing a Scala Long value (e.g. 'val res4: Long = 90194313219L')
Returns:
str: Extracted Long value including the 'L' suffix, or empty string if not found
"""
pattern = r'= (\d+L)'
match = re.search(pattern, input_str)
if match:
return match.group(1)
return ''
def extract_value(input_str: str) -> str:
"""Extract value from a string based on its pattern.
This function automatically selects the appropriate extraction method based on
the input string format:
* If contains 'Long =', uses extract_long_value
* If contains triple quotes, uses extract_code_between_triple_quotes
* If contains single quotes, uses extract_quoted_string
Args:
input_str (str): Input string containing a value to extract
Returns:
str: Extracted value based on the detected pattern
"""
if 'Long =' in input_str:
return extract_long_value(input_str)
elif 'String = """' in input_str:
return extract_code_between_triple_quotes(input_str)
elif 'String = "' in input_str:
return extract_quoted_string(input_str)
else:
return input_str
```
--------------------------------------------------------------------------------
/server_tools.py:
--------------------------------------------------------------------------------
```python
# NOTE: This file has been automatically generated, do not modify!
# Architecture based on https://github.com/mrexodia/ida-pro-mcp (MIT License)
from http.client import responses
from typing import Annotated, Optional, TypedDict, Generic, TypeVar
from pydantic import Field
T = TypeVar("T")
@joern_mcp.tool()
def ping()->str:
"""Checks if the Joern server is running and responsive by querying its version
@return: The Joern server version if successful, 'Query Failed' if the server is not responding
"""
response = joern_remote('version')
if response:
return extract_value(response)
else:
return 'Query Failed'
@joern_mcp.tool()
def load_cpg(cpg_filepath: str) -> str:
"""
Loads a CPG from a file if the cpg is not loaded or the cpg is not the same as the filepath.
Args:
cpg_filepath (str): The path to the CPG file, the filepath is absolute path.
Returns:
str: True if the CPG is loaded successfully, False otherwise.
"""
# return extract_value(joern_remote(f'val cpg = CpgLoader.load("{cpg_filepath}")'))
return extract_value(joern_remote(f'load_cpg("{cpg_filepath}")'))
@joern_mcp.tool()
def get_method_callees(method_full_name: str) -> list[str]:
"""Retrieves a list of methods info that are called by the specified method
@param method_full_name: The fully qualified name of the source method(e.g., com.android.nfc.NfcService$6.onReceive:void(android.content.Context,android.content.Intent))
@return: List of full name, name, signature and id of methods which call the source method
"""
# responses = joern_remote(f'cpg.method.fullNameExact("{method_full_name}").head.callee.distinct.map(m => (s"methodFullName=$' + '{m.fullName} methodId=${m.id}L")).l')
responses = joern_remote(f'get_method_callees("{method_full_name}")')
return extract_list(responses)
@joern_mcp.tool()
def get_method_callers(method_full_name: str) -> list[str]:
"""Retrieves a list of methods that call the specified method
@param method_full_name: The fully qualified name of the source method(e.g., com.android.nfc.NfcService$6.onReceive:void(android.content.Context,android.content.Intent))
@return: List of full name, name, signature and id of methods called by the source method
"""
responses = joern_remote(f'get_method_callers("{method_full_name}")')
return extract_list(responses)
@joern_mcp.tool()
def get_class_full_name_by_id(class_id:str) -> str:
"""Retrieves the fully name of a class by its ID
@param id: The unique identifier of the class (typeDecl), the id is a Long int string, like '111669149702L'
@return: The fully name of the class (e.g., com.android.nfc.NfcService$6)
"""
response = joern_remote(f'get_class_full_name_by_id("{class_id}")')
return extract_value(response)
@joern_mcp.tool()
def get_class_methods_by_class_full_name(class_full_name:str) -> list[str]:
"""Get the methods of a class by its fully qualified name
@param class_full_name: The fully qualified name of the class
@return: List of full name, name, signature and id of methods in the class
"""
response = joern_remote(f'get_class_methods_by_class_full_name("{class_full_name}")')
return extract_list(response)
@joern_mcp.tool()
def get_method_code_by_full_name(method_full_name:str) -> str:
"""Get the code of a method by its fully name, If you know the full name of the method, you can use this tool to get the method code directly.
If you only know the full name of the class and the name of the method, you should use get_method_code_by_class_full_name_and_method_name
@param method_full_name: The fully qualified name of the method (e.g., com.android.nfc.NfcService$6.onReceive:void(android.content.Context,android.content.Intent))
@return: The source code of the specified method
"""
response = joern_remote(f'get_method_code_by_method_full_name("{method_full_name}")')
return extract_value(response)
@joern_mcp.tool()
def get_method_code_by_id(method_id:str) -> str:
"""Get the code of a method by its class full name and method name
@param class_full_name: The fully qualified name of the class
@param method_name: The name of the method
@return: List of full name, name, signature and id of methods in the class
"""
response = joern_remote(f'get_method_code_by_id("{method_id}")')
return extract_value(response)
@joern_mcp.tool()
def get_method_full_name_by_id(method_id:str) -> str:
"""Retrieves the fully qualified name of a method by its ID
@param id: The unique identifier of the method, the id is a Long int string, like '111669149702L'
@return: The fully qualified name of the method (e.g., com.android.nfc.NfcService$6.onReceive:void(android.content.Context,android.content.Intent))
"""
response = joern_remote(f'get_method_full_name_by_id("{method_id}")')
return extract_value(response)
@joern_mcp.tool()
def get_call_code_by_id(code_id:str) -> str:
"""Get the source code of a specific call node from the loaded CPG by the call id
@param id: The unique identifier of the call node, the id is a Long int string, like '111669149702L'
@return: The source code of the specified call
"""
response = joern_remote(f'get_call_code_by_id("{code_id}")')
return extract_value(response)
@joern_mcp.tool()
def get_method_code_by_class_full_name_and_method_name(class_full_name:str, method_name:str) -> list[str]:
"""Get the code of a method by its class full name and method name,
this tool is usually used when you don't know the full name of the method, but you know the full name of the class and the name of the method. If there are multiple methods with the same name in the class, the code of all methods will be returned.
@param class_full_name: The fully qualified name of the class, like 'com.android.nfc.NfcService'
@param method_name: The name of the method, like 'onReceive'
@return: List of full name, name, signature and id of methods in the class
"""
responses = joern_remote(f'get_method_code_by_class_full_name_and_method_name("{class_full_name}", "{method_name}")')
return extract_list(responses)
# @joern_mcp.tool()
# def get_method_by_full_name_without_signature(full_name_without_signature:str) -> list[str]:
# """Get the info of a method list by its fully qualified name without signature
# @param full_name_without_signature: fully qualified name of methodwithout signature,like com.android.nfc.NfcService.onReceive
# @return: The info of the methods, including the full name, name, signature and id
# """
# response = joern_remote(f'get_method_by_full_name_without_signature("{full_name_without_signature}")')
# return extract_list(response)
@joern_mcp.tool()
def get_derived_classes_by_class_full_name(class_full_name:str) -> list[str]:
"""Get the derived classes of a class
@param class_full_name: The fully qualified name of the class
@return: The derived classes info of the class, including the full name, name and id
"""
response = joern_remote(f'get_derived_classes_by_class_full_name("{class_full_name}")')
return extract_list(response)
@joern_mcp.tool()
def get_parent_classes_by_class_full_name(class_full_name:str) -> list[str]:
"""Get the parent classes of a class
@param class_full_name: The fully qualified name of the class
@return: The parent classes info of the class, including the full name, name and id
"""
response = joern_remote(f'get_parent_classes_by_class_full_name("{class_full_name}")')
return extract_list(response)
@joern_mcp.tool()
def get_method_by_call_id(call_id:str) -> str:
"""Get the method info by the call id which the call is in the method
@param id: The id of the call
@return: The method info of the call
"""
response = joern_remote(f'get_method_by_call_id("{call_id}")')
return extract_value(response)
@joern_mcp.tool()
def get_referenced_method_full_name_by_call_id(call_id:str) -> str:
"""Get the method info by the call id which the call is referenced the method
@param id: The id of the call
@return: The method info of the call
"""
response = joern_remote(f'get_referenced_method_full_name_by_call_id("{call_id}")')
return extract_value(response)
@joern_mcp.tool()
def get_calls_in_method_by_method_full_name(method_full_name:str) -> list[str]:
"""Get the calls info by the method full name which the call is in the method
@param method_full_name: The full name of the method
@return: The calls info of the method
"""
response = joern_remote(f'get_calls_in_method_by_method_full_name("{method_full_name}")')
return extract_list(response)
```
--------------------------------------------------------------------------------
/test_mcp_client.py:
--------------------------------------------------------------------------------
```python
import os
import asyncio
from dotenv import load_dotenv
from fastmcp import Client
from fastmcp.client.transports import PythonStdioTransport
async def test_connection(client):
"""Test server connection"""
print("Testing [check_connection] server connection...")
try:
result = await client.call_tool("check_connection")
print(f"Connection test result: {result[0].text}")
return True
except Exception as e:
print(f"Connection test failed: {str(e)}")
return False
async def test_ping(client):
"""Test ping functionality"""
print("Testing [ping] ping...")
try:
result = await client.call_tool("ping")
print(f"Ping result: {result[0].text}")
return True
except Exception as e:
print(f"Ping test failed: {str(e)}")
return False
async def test_load_cpg(client, cpg_path):
"""Test loading CPG file"""
print("Testing [load_cpg] CPG file loading...")
try:
result = await client.call_tool("load_cpg", {"cpg_filepath": cpg_path})
print(f"CPG loading result: {result[0].text}")
return True
except Exception as e:
print(f"CPG loading failed: {str(e)}")
return False
async def test_method_info_queries(client, method_full_name, method_id, full_name_without_signature):
"""Test method-related queries"""
print("Testing method queries...")
success = True
try:
# Test getting method callers
print("Testing [get_method_callers] get method callers...")
callers = await client.call_tool("get_method_callers", {"method_full_name": method_full_name})
if callers:
print(f"Method callers: {callers[0].text}")
else:
print(f"Method callers: []")
except Exception as e:
print(f"Failed to get method callers: {str(e)}")
success = False
try:
# Test getting method callees
print("Testing [get_method_callees] get method callees...")
callees = await client.call_tool("get_method_callees", {"method_full_name": method_full_name})
if callees:
print(f"Method callees: {callees[0].text}")
else:
print(f"Method callees: []")
except Exception as e:
print(f"Failed to get method callees: {str(e)}")
success = False
try:
# Test getting method full name by ID
print("Testing [get_method_full_name_by_id] get method full name by ID...")
full_name = await client.call_tool("get_method_full_name_by_id", {"id": method_id})
print(f"Method full name: {full_name[0].text}")
except Exception as e:
print(f"Failed to get method full name: {str(e)}")
success = False
try:
# Test getting method by full name without signature
print("Testing [get_method_by_full_name_without_signature] get method by full name without signature...")
method = await client.call_tool("get_method_by_full_name_without_signature", {"full_name_without_signature": full_name_without_signature})
print(f"Method info: {method[0].text}")
except Exception as e:
print(f"Failed to get method info: {str(e)}")
success = False
return success
async def test_class_info_queries(client, class_full_name):
"""Test class-related queries"""
print("Testing class queries...")
success = True
try:
# Test getting class methods
print("Testing [get_class_methods_by_class_full_name] get class methods...")
methods = await client.call_tool("get_class_methods_by_class_full_name", {"class_full_name": class_full_name})
if methods:
print(f"Class methods: {methods[0].text}")
else:
print(f"Class methods: []")
except Exception as e:
print(f"Failed to get class methods: {str(e)}")
success = False
try:
# Test getting derived classes
print("Testing [get_derived_classes_by_class_full_name] get derived classes...")
derived = await client.call_tool("get_derived_classes_by_class_full_name", {"class_full_name": class_full_name})
if derived:
print(f"Derived classes: {derived[0].text}")
else:
print(f"Derived classes: []")
except Exception as e:
print(f"Failed to get derived classes: {str(e)}")
success = False
try:
# Test getting parent classes
print("Testing [get_parent_classes_by_class_full_name] get parent classes...")
parents = await client.call_tool("get_parent_classes_by_class_full_name", {"class_full_name": class_full_name})
if parents:
print(f"Parent classes: {parents[0].text}")
else:
print(f"Parent classes: []")
except Exception as e:
print(f"Failed to get parent classes: {str(e)}")
success = False
return success
async def test_call_info_queries(client, call_id, method_full_name):
"""Test call-related queries"""
print("\nTesting call queries...")
success = True
try:
# Test getting call code
print("Testing [get_call_code_by_id] get call code...")
code = await client.call_tool("get_call_code_by_id", {"id": call_id})
print(f"Call code: {code[0].text}")
except Exception as e:
print(f"Failed to get call code: {str(e)}")
success = False
try:
# Test getting method by call ID
print("Testing [get_method_by_call_id] get method by call ID...")
method = await client.call_tool("get_method_by_call_id", {"id": call_id})
print(f"Method info: {method[0].text}")
except Exception as e:
print(f"Failed to get method by call ID: {str(e)}")
success = False
try:
# Test getting referenced method full name
print("Testing [get_referenced_method_full_name_by_call_id] get referenced method full name...")
ref_method = await client.call_tool("get_referenced_method_full_name_by_call_id", {"id": call_id})
print(f"Referenced method: {ref_method[0].text}")
except Exception as e:
print(f"Failed to get referenced method: {str(e)}")
success = False
print("Testing [get_calls_in_method_by_method_full_name] get calls in method...")
try:
calls = await client.call_tool("get_calls_in_method_by_method_full_name", {"method_full_name": method_full_name})
if calls:
print(f"Calls in method: {calls[0].text}")
else:
print(f"Calls in method: []")
success = True
except Exception as e:
print(f"Failed to get calls in method: {str(e)}")
success = False
return success
async def test_method_code_queries(client, method_full_name, class_full_name, method_name, method_id):
"""Test method name-related queries"""
print("Testing method name queries...")
success = True
try:
# Test getting method code
print("Testing [get_method_code_by_full_name] get method code...")
code = await client.call_tool("get_method_code_by_full_name", {"method_full_name": method_full_name})
print(f"Method code: {code[0].text}")
except Exception as e:
print(f"Failed to get method code: {str(e)}")
success = False
try:
# Test getting method code by class full name and method name
print("Testing [get_method_code_by_class_full_name_and_method_name] get method code by class and method name...")
codes = await client.call_tool("get_method_code_by_class_full_name_and_method_name", {
"class_full_name": class_full_name,
"method_name": method_name
})
print(f"Method codes: {codes[0].text}")
except Exception as e:
print(f"Failed to get method codes: {str(e)}")
success = False
try:
# Test getting method code by ID
print("Testing [get_method_code_by_id] get method code by ID...")
code = await client.call_tool("get_method_code_by_id", {"id": method_id})
print(f"Method code: {code[0].text}")
except Exception as e:
print(f"Failed to get method code: {str(e)}")
success = False
return success
async def main():
"""Main test function"""
print("Starting MCP server test...")
print("=" * 50)
# Load environment variables
load_dotenv()
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
# Create client
client = Client(
transport=PythonStdioTransport('server.py'),
roots=[f"file://{SCRIPT_DIR}"] # Replace with actual workspace directory
)
async with client:
# Test basic connection
if not await test_connection(client):
print("Server connection test failed, terminating test")
return
# Test ping
if not await test_ping(client):
print("Ping test failed, terminating test")
return
# Test CPG loading
cpg_path = os.path.join(SCRIPT_DIR,"com.android.nfc.cpg") # Replace with actual CPG file path
if not await test_load_cpg(client, cpg_path):
print("CPG loading test failed, terminating test")
return
# Test method queries
method_full_name = "com.android.nfc.NfcService$6.onReceive:void(android.content.Context,android.content.Intent)"
method_name = "onReceive"
method_id = "111669160511L"
call_id = "30064950783L"
full_name_without_signature = "com.android.nfc.NfcService$6.onReceive"
class_full_name = "com.android.nfc.NfcService" # Replace with actual class name
if not await test_method_info_queries(client, method_full_name, method_id, full_name_without_signature):
print("Method queries test failed")
# Test class queries
if not await test_class_info_queries(client, class_full_name):
print("Class queries test failed")
# Test method code queries
if not await test_method_code_queries(client, method_full_name, class_full_name, method_name, method_id):
print("Method ID queries test failed")
# Test call queries
if not await test_call_info_queries(client, call_id, method_full_name):
print("Call queries test failed")
print("Test completed!")
if __name__ == "__main__":
asyncio.run(main())
```