# Directory Structure
```
├── .gitignore
├── app.py
├── config.yaml
├── Dockerfile
├── LICENSE
├── README.md
├── requirements.txt
├── requirments.txt
├── setup.py
└── src
├── kali_mcps
│ ├── __init__.py
│ ├── base
│ │ ├── __init__.py
│ │ └── kali_command.py
│ ├── manual.yaml
│ ├── nm
│ │ └── actions.py
│ ├── nmap
│ │ ├── __init__.py
│ │ └── actions.py
│ ├── objdump
│ │ └── actions.py
│ ├── strings
│ │ └── actions.py
│ ├── traceroute
│ │ └── actions.py
│ └── wireshark
│ └── actions.py
├── mcp_server.py
└── sandbox.py
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
sandbox_expire/
logs/
tests/
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# awesome_kali_MCPServers
Welcome to the **awsome_kali_MCPServers** repository, a collection of MCP servers specifically tailored for Kali Linux. These servers are designed to empower AI Agents in reverse engineering and security testing, offering a range of powerful features to enhance your workflows.
## Description
**awsome kali MCPServers** provides flexible network analysis, target sniffing, traffic analysis, binary understanding, and automation capabilities, all aimed at enhancing AI-driven processes in the realm of cybersecurity. Whether you are a seasoned security professional or just starting your journey into the world of Kali Linux, these servers are here to support you.
## Features
- **Network Analysis:** Gain insights into network activity, traffic patterns, and potential vulnerabilities.
- **Target Sniffing:** Identify and capture data packets to analyze potential threats or weaknesses.
- **Traffic Analysis:** Monitor and analyze network traffic for suspicious activity or irregularities.
- **Binary Understanding:** Decode and interpret binary data for deeper insights into security issues.
- **Automation:** Streamline repetitive tasks and workflows to increase efficiency and productivity.
## How to Use
To get started with **awsome kali MCPServers**, simply visit the [Releases page](https://github.com/VERMAXVR/awsome_kali_MCPServers/releases) to download the necessary files. Once downloaded, follow the execution instructions to set up the servers on your Kali Linux machine.

## Topics
This repository covers a range of topics related to cybersecurity, AI integration, and Kali Linux usage. The main topics include:
- Agent
- Kali Linux
- LLM
- MCP Server
- Security
- Tools
## Get in Touch
If you have any questions, feedback, or suggestions regarding **awsome kali MCPServers**, feel free to reach out. We are here to support you in your cybersecurity endeavors.
Stay secure and keep exploring with **awsome kali MCPServers**!
```
--------------------------------------------------------------------------------
/setup.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/kali_mcps/base/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Base package for Kali commands
"""
```
--------------------------------------------------------------------------------
/src/kali_mcps/nmap/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Nmap package for network scanning
"""
```
--------------------------------------------------------------------------------
/config.yaml:
--------------------------------------------------------------------------------
```yaml
kalilinux:
name: kali-tools
image: kalimcps:latest # docker built -t kalimcps:latest .
```
--------------------------------------------------------------------------------
/src/kali_mcps/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Kali MCP Servers package
"""
from src.kali_mcps.base.kali_command import CommandRunner
from src.kali_mcps.objdump.actions import (
file_headers_action,
disassemble_action,
symbol_table_action,
section_headers_action,
)
__all__ = [
"CommandRunner",
"file_headers_action",
"disassemble_action",
"symbol_table_action",
"section_headers_action",
]
```
--------------------------------------------------------------------------------
/src/kali_mcps/traceroute/actions.py:
--------------------------------------------------------------------------------
```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner
class TracerouteCommand(CommandRunner):
def __init__(self):
super().__init__("traceroute", network_enabled=True, memory_limit="1g", timeout=120)
def traceroute_action(target: str):
"""
Traceroute to the target
"""
cmd = TracerouteCommand()
command = ["traceroute", target]
return cmd.execute(command)
if __name__ == "__main__":
print(traceroute_action("8.8.8.8"))
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
FROM python:3.12-slim-bookworm
WORKDIR /app
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && \
apt-get install -y \
iputils-ping \
net-tools \
dnsutils \
nmap \
binutils \
traceroute \
tshark && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
ENV PYTHONPATH=/app
ENV IS_SAFE=false
# set run cmd
ENTRYPOINT ["python", "app.py"]
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
annotated-types==0.7.0
anyio==4.9.0
certifi==2025.1.31
charset-normalizer==3.4.1
click==8.1.8
docker==7.1.0
h11==0.14.0
httpcore==1.0.7
httpx==0.28.1
httpx-sse==0.4.0
idna==3.10
iniconfig==2.1.0
loguru==0.7.3
mcp==1.6.0
packaging==24.2
pluggy==1.5.0
pydantic==2.11.1
pydantic-settings==2.8.1
pydantic_core==2.33.0
pytest==8.3.5
pytest-asyncio==0.26.0
python-dotenv==1.1.0
requests==2.32.3
sniffio==1.3.1
sse-starlette==2.2.1
starlette==0.46.1
typing-inspection==0.4.0
typing_extensions==4.13.0
urllib3==2.3.0
uvicorn==0.34.0
```
--------------------------------------------------------------------------------
/requirments.txt:
--------------------------------------------------------------------------------
```
annotated-types==0.7.0
anyio==4.9.0
certifi==2025.1.31
charset-normalizer==3.4.1
click==8.1.8
docker==7.1.0
h11==0.14.0
httpcore==1.0.7
httpx==0.28.1
httpx-sse==0.4.0
idna==3.10
iniconfig==2.1.0
loguru==0.7.3
mcp==1.6.0
packaging==24.2
pluggy==1.5.0
pydantic==2.11.1
pydantic-settings==2.8.1
pydantic_core==2.33.0
pytest==8.3.5
pytest-asyncio==0.26.0
python-dotenv==1.1.0
requests==2.32.3
sniffio==1.3.1
sse-starlette==2.2.1
starlette==0.46.1
typing-inspection==0.4.0
typing_extensions==4.13.0
urllib3==2.3.0
uvicorn==0.34.0
```
--------------------------------------------------------------------------------
/app.py:
--------------------------------------------------------------------------------
```python
from src.mcp_server import mcp
import argparse # 添加 argparse 模块
import docker
from mcp.server.models import InitializationOptions
if __name__ == "__main__":
# create arg parser
parser = argparse.ArgumentParser(description='MCP Server with Kali image configuration')
parser.add_argument('--kali-image',
type=str,
required=False,
help='Specify the Kali Linux image name to use')
args = parser.parse_args()
if args.kali_image:
image_name = args.kali_image
# use docker lib to check if the image exists
client = docker.from_env()
try:
client.images.get(image_name)
print(f"Kali image {image_name} found")
client.close()
except docker.errors.ImageNotFound:
print(f"Kali image {image_name} not found")
exit(1)
print(f"Using Kali image: {image_name}")
print("Starting server is running")
# pass image name to mcp
mcp.run(transport="stdio")
```
--------------------------------------------------------------------------------
/src/kali_mcps/objdump/actions.py:
--------------------------------------------------------------------------------
```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner
class ObjdumpCommand(CommandRunner):
def __init__(self):
super().__init__("objdump", network_enabled=False, memory_limit="1g", timeout=120)
def file_headers_action(target: str) -> tuple[str, str]:
"""
Display file headers
For example: objdump -f /path/to/file
"""
cmd = ObjdumpCommand()
command = ["objdump", "-f", target]
return cmd.execute(command)
def disassemble_action(target: str, section: str = ".text") -> tuple[str, str]:
"""
Disassemble section (default: .text)
For example: objdump -d -j .text /path/to/file
"""
cmd = ObjdumpCommand()
command = ["objdump", "-d", "-j", section, target]
return cmd.execute(command)
def symbol_table_action(target: str) -> tuple[str, str]:
"""
Display symbol table
For example: objdump -t /path/to/file
"""
cmd = ObjdumpCommand()
command = ["objdump", "-t", target]
return cmd.execute(command)
def section_headers_action(target: str) -> tuple[str, str]:
"""
Display all section headers
For example: objdump -h /path/to/file
"""
cmd = ObjdumpCommand()
command = ["objdump", "-h", target]
return cmd.execute(command)
def full_contents_action(target: str) -> tuple[str, str]:
"""
Display all information including headers and disassembly
For example: objdump -x /path/to/file
"""
cmd = ObjdumpCommand()
command = ["objdump", "-x", target]
return cmd.execute(command)
if __name__ == "__main__":
# Test example
target_file = "/bin/ls"
print(file_headers_action(target_file))
```
--------------------------------------------------------------------------------
/src/kali_mcps/nmap/actions.py:
--------------------------------------------------------------------------------
```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner
class NmapCommand(CommandRunner):
def __init__(self):
super().__init__("nmap",
network_enabled=True, # nmap需要网络访问
memory_limit="2g", # 需要更多内存
timeout=300) # 需要更长的超时时间
def basic_scan_action(target: str) -> tuple[str, str]:
"""
Basic scan
For example: nmap 192.168.1.1
"""
cmd = NmapCommand()
command = ["nmap", target]
return cmd.execute(command)
def intense_scan_action(target: str) -> tuple[str, str]:
"""
Intense scan (-T4 -A)
Includes: OS detection, version detection, script scanning, and traceroute
"""
cmd = NmapCommand()
command = ["nmap", "-T4", "-A", target]
return cmd.execute(command)
def stealth_scan_action(target: str) -> tuple[str, str]:
"""
SYN scan (-sS)
Half-open scan, more stealthy
Requires root privileges
"""
cmd = NmapCommand()
command = ["nmap", "-sS", target]
return cmd.execute(command)
def quick_scan_action(target: str) -> tuple[str, str]:
"""
Quick scan (-T4 -F)
Only scans the most common ports
"""
cmd = NmapCommand()
command = ["nmap", "-T4", "-F", target]
return cmd.execute(command)
def vulnerability_scan_action(target: str) -> tuple[str, str]:
"""
Vulnerability scan (-sV --script vuln)
Uses vulnerability detection scripts
"""
cmd = NmapCommand()
command = ["nmap", "-sV", "--script", "vuln", target]
return cmd.execute(command)
if __name__ == "__main__":
# Test example
print(quick_scan_action("10.1.1.106"))
```
--------------------------------------------------------------------------------
/src/kali_mcps/nm/actions.py:
--------------------------------------------------------------------------------
```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner
class NmCommand(CommandRunner):
def __init__(self):
super().__init__("nm", network_enabled=False, memory_limit="1g", timeout=120)
def basic_symbols_action(target: str) -> tuple[str, str]:
"""
Basic symbol listing
For example: nm /path/to/file
"""
cmd = NmCommand()
command = ["nm", target]
return cmd.execute(command)
def dynamic_symbols_action(target: str) -> tuple[str, str]:
"""
Display dynamic symbols
For example: nm -D /path/to/file
"""
cmd = NmCommand()
command = ["nm", "-D", target]
return cmd.execute(command)
def demangle_symbols_action(target: str) -> tuple[str, str]:
"""
Demangle C++ symbols
For example: nm -C /path/to/file
"""
cmd = NmCommand()
command = ["nm", "-C", target]
return cmd.execute(command)
def numeric_sort_action(target: str) -> tuple[str, str]:
"""
Sort symbols numerically by address
For example: nm -n /path/to/file
"""
cmd = NmCommand()
command = ["nm", "-n", target]
return cmd.execute(command)
def size_sort_action(target: str) -> tuple[str, str]:
"""
Sort symbols by size
For example: nm -S /path/to/file
"""
cmd = NmCommand()
command = ["nm", "-S", target]
return cmd.execute(command)
def undefined_symbols_action(target: str) -> tuple[str, str]:
"""
Display only undefined symbols
For example: nm -u /path/to/file
"""
cmd = NmCommand()
command = ["nm", "-u", target]
return cmd.execute(command)
if __name__ == "__main__":
# Test example
target_file = "/bin/ls"
print(basic_symbols_action(target_file))
```
--------------------------------------------------------------------------------
/src/kali_mcps/strings/actions.py:
--------------------------------------------------------------------------------
```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner
class StringsCommand(CommandRunner):
def __init__(self):
super().__init__("strings", network_enabled=False, memory_limit="1g", timeout=120)
def basic_strings_action(target: str, input_file: bytes = None) -> tuple[str, str]:
"""
Basic strings analysis
For example: strings /path/to/file
"""
cmd = StringsCommand()
command = ["strings", target]
return cmd.execute(command, input_files={"input_file": "/tmp/input_file"})
def min_length_strings_action(target: str, length: int = 6) -> tuple[str, str]:
"""
Strings analysis with specified minimum length
For example: strings -n 6 /path/to/file
"""
cmd = StringsCommand()
command = ["strings", "-n", str(length), target]
return cmd.execute(command)
def offset_strings_action(target: str, format: str = "x") -> tuple[str, str]:
"""
Strings analysis showing string offsets
format can be: 'd' (decimal), 'o' (octal), 'x' (hexadecimal)
For example: strings -t x /path/to/file
"""
cmd = StringsCommand()
command = ["strings", "-t", format, target]
return cmd.execute(command)
def encoding_strings_action(target: str, encoding: str = "S") -> tuple[str, str]:
"""
Strings analysis with specified character encoding
encoding can be:
- 's' = 7-bit string
- 'S' = 8-bit string
- 'b' = 16-bit big-endian
- 'l' = 16-bit little-endian
For example: strings -e S /path/to/file
"""
cmd = StringsCommand()
command = ["strings", "-e", encoding, target]
return cmd.execute(command)
if __name__ == "__main__":
# Test example
target_file = "/bin/ls"
input_file = bytes.fromhex("68656c6c6f20776f726c64")
print(basic_strings_action(target_file, {"input_file": input_file , "output_file": "/tmp/output_file"}))
```
--------------------------------------------------------------------------------
/src/kali_mcps/wireshark/actions.py:
--------------------------------------------------------------------------------
```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner
class TsharkCommand(CommandRunner):
def __init__(self):
super().__init__("tshark",
network_enabled=True, # 需要网络访问
memory_limit="2g", # 需要更多内存
timeout=300) # 需要更长的超时时间
def capture_live_action(interface: str, duration: int = 30, filter: str = "") -> tuple[str, str]:
"""
Capture live traffic from network interface
For example: tshark -i eth0 -a duration:30 -f "port 80"
"""
cmd = TsharkCommand()
command = ["tshark", "-i", interface, "-a", f"duration:{duration}"]
if filter:
command.extend(["-f", filter])
return cmd.execute(command)
def analyze_pcap_action(pcap_file: str, display_filter: str = "") -> tuple[str, str]:
"""
Analyze existing pcap file
For example: tshark -r file.pcap -Y "http"
"""
cmd = TsharkCommand()
command = ["tshark", "-r", pcap_file]
if display_filter:
command.extend(["-Y", display_filter])
return cmd.execute(command)
def extract_http_action(pcap_file: str) -> tuple[str, str]:
"""
Extract HTTP objects from pcap file
For example: tshark -r file.pcap -Y "http" -T fields -e http.request.method -e http.request.uri
"""
cmd = TsharkCommand()
command = [
"tshark", "-r", pcap_file,
"-Y", "http",
"-T", "fields",
"-e", "http.request.method",
"-e", "http.request.uri"
]
return cmd.execute(command)
def protocol_hierarchy_action(pcap_file: str) -> tuple[str, str]:
"""
Show protocol hierarchy statistics
For example: tshark -r file.pcap -q -z io,phs
"""
cmd = TsharkCommand()
command = ["tshark", "-r", pcap_file, "-q", "-z", "io,phs"]
return cmd.execute(command)
def conversation_statistics_action(pcap_file: str) -> tuple[str, str]:
"""
Show conversation statistics
For example: tshark -r file.pcap -q -z conv,ip
"""
cmd = TsharkCommand()
command = ["tshark", "-r", pcap_file, "-q", "-z", "conv,ip"]
return cmd.execute(command)
def expert_info_action(pcap_file: str) -> tuple[str, str]:
"""
Show expert information (errors, warnings, notes)
For example: tshark -r file.pcap -q -z expert
"""
cmd = TsharkCommand()
command = ["tshark", "-r", pcap_file, "-q", "-z", "expert"]
return cmd.execute(command)
if __name__ == "__main__":
# Test example
pcap_file = "capture.pcap"
print(protocol_hierarchy_action(pcap_file))
```
--------------------------------------------------------------------------------
/src/kali_mcps/base/kali_command.py:
--------------------------------------------------------------------------------
```python
import subprocess
import os
import asyncio
from src.sandbox import create_sandbox_client, SandboxSettings, SandboxTimeoutError, SandboxError
class CommandRunner:
"""Base class for executing Kali commands"""
def __init__(self, command_name: str, network_enabled: bool = False,
memory_limit: str = "1g", timeout: int = 120):
"""
Initialize CommandRunner
Args:
command_name: Name of the command (e.g. 'objdump', 'nm', etc.)
network_enabled: Whether network access is needed
memory_limit: Memory limit for sandbox
timeout: Timeout in seconds
"""
self.command_name = command_name
self.network_enabled = network_enabled
self.memory_limit = memory_limit
self.timeout = timeout
self.IS_SAFE = os.environ.get("IS_SAFE", "false").lower() == "true"
def run_command(self, command: list) -> tuple[str, str]:
"""Execute command and return output results"""
try:
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)
stdout, stderr = process.communicate()
return stdout, stderr
except Exception as e:
return "", str(e)
async def run_with_sandbox(self, command: list, input_files: dict = None) -> tuple[str, str]:
"""
Execute command in Kali sandbox
Args:
command: Command to execute
input_files: Dict of {local_path: container_path} for files to copy into container
"""
kali_config = SandboxSettings(
image="kalilinux/kali-rolling",
memory_limit=self.memory_limit,
cpu_limit=1.0,
network_enabled=self.network_enabled,
timeout=self.timeout
)
client = create_sandbox_client()
try:
await client.create(config=kali_config)
# 如果有输入文件,先复制到容器中
if input_files:
for local_path, container_path in input_files.items():
await client.copy_to_container(local_path, container_path)
# 执行命令
cmd_str = " ".join(command)
stdout = await client.run_command(cmd_str)
return stdout, ""
except Exception as e:
return "", str(e)
finally:
await client.cleanup()
async def safe_execute_kali_command(self, command: list, input_files: dict = None) -> tuple[str, str]:
"""
Safely execute Kali command
Args:
command: Command to execute
input_files: Dict of {local_path: container_path} for files to copy into container
"""
try:
result = await self.run_with_sandbox(command, input_files)
return result
except SandboxTimeoutError:
return "", "Command execution timed out"
except SandboxError as e:
return "", f"Sandbox execution error: {str(e)}"
except Exception as e:
return "", f"Unknown error: {str(e)}"
def execute(self, command: list, input_files: dict = None) -> tuple[str, str]:
"""
Execute command with safety check
Args:
command: Command to execute
input_files: Dict of {local_path: container_path} for files to copy into container
"""
# if self.IS_SAFE:
# return await self.safe_execute_kali_command(command, input_files)
# else:
return self.run_command(command)
```
--------------------------------------------------------------------------------
/src/sandbox.py:
--------------------------------------------------------------------------------
```python
import asyncio
import docker
import tarfile
import os
import io
from typing import Optional, Dict, Any
from dataclasses import dataclass
class SandboxSettings:
"""sandbox configuration settings"""
def __init__(
self,
image: str = "kalimcps:latest",
memory_limit: str = "2g",
cpu_limit: float = 1.0,
network_enabled: bool = True,
network_mode: str = "bridge",
timeout: int = 300
):
self.image = image
self.memory_limit = memory_limit
self.cpu_limit = cpu_limit
self.network_enabled = network_enabled
self.network_mode = network_mode
self.timeout = timeout
class SandboxError(Exception):
"""Base exception for sandbox-related errors."""
class SandboxTimeoutError(SandboxError):
"""Exception raised when a sandbox operation times out."""
class SandboxResourceError(SandboxError):
"""Exception raised for resource-related errors."""
class SandboxClient:
"""Sandbox client, providing only container creation and cleanup functionality"""
def __init__(self):
self.client = docker.from_env()
self.container = None
async def create(self, config: SandboxSettings) -> None:
"""Create and start the container"""
try:
# prepare container configuration
container_config = {
"image": config.image,
"detach": True,
"mem_limit": config.memory_limit,
"nano_cpus": int(config.cpu_limit * 1e9),
"network_mode": config.network_mode if config.network_enabled else "none",
}
# create and start container
self.container = self.client.containers.run(**container_config)
print(f"Container created: {self.container.id}")
except Exception as e:
raise SandboxError(f"Failed to create container: {str(e)}")
async def run_command(self, command: str) -> str:
"""Execute a command in the container"""
if not self.container:
raise SandboxError("Container not created")
try:
exec_result = self.container.exec_run(command, tty=True)
return exec_result.output.decode('utf-8')
except Exception as e:
raise SandboxError(f"Failed to execute command: {str(e)}")
async def copy_to_container(self, source_path: str, container_path: str) -> None:
"""
Copy a file from host to container
Args:
source_path: Path to the file on host
container_path: Path where to put the file in container
"""
if not self.container:
raise SandboxError("Container not created")
try:
# create a memory-based tar file
tar_stream = io.BytesIO()
with tarfile.open(fileobj=tar_stream, mode='w') as tar:
# add file to tar
tar.add(source_path, arcname=os.path.basename(container_path))
tar_stream.seek(0)
# copy to container
self.container.put_archive(
path=os.path.dirname(container_path),
data=tar_stream
)
print(f"File copied to container: {container_path}")
except Exception as e:
raise SandboxError(f"Failed to copy file to container: {str(e)}")
async def copy_from_container(self, container_path: str, dest_path: str) -> None:
"""
Copy a file from container to host
Args:
container_path: Path to the file in container
dest_path: Path where to put the file on host
"""
if not self.container:
raise SandboxError("Container not created")
try:
# get the tar stream of the file from container
bits, stat = self.container.get_archive(container_path)
# create the destination directory (if it doesn't exist)
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
# write the tar stream to a temporary file
with open(dest_path, 'wb') as f:
for chunk in bits:
f.write(chunk)
print(f"File copied from container: {dest_path}")
except Exception as e:
raise SandboxError(f"Failed to copy file from container: {str(e)}")
async def cleanup(self) -> None:
"""Clean up and remove the container"""
if self.container:
try:
self.container.stop()
self.container.remove()
print(f"Container cleaned up: {self.container.id}")
self.container = None
except Exception as e:
print(f"Error cleaning up container: {str(e)}")
def create_sandbox_client() -> SandboxClient:
"""Create a factory function for the sandbox client"""
return SandboxClient()
# example
async def run_in_sandbox(command: str) -> str:
"""Run a command in the sandbox"""
config = SandboxSettings()
client = create_sandbox_client()
try:
await client.create(config=config)
result = await client.run_command(command)
return result
except SandboxTimeoutError:
return "Command execution timed out"
except SandboxError as e:
return f"Sandbox execution error: {str(e)}"
except Exception as e:
return f"Unknown error: {str(e)}"
finally:
await client.cleanup()
# if you run this script directly
if __name__ == "__main__":
async def main():
print("Starting sandbox test...")
result = await run_in_sandbox("echo 'Sandbox test successful'")
print(result)
asyncio.run(main())
```
--------------------------------------------------------------------------------
/src/kali_mcps/manual.yaml:
--------------------------------------------------------------------------------
```yaml
mcp:
tools:
- nmap:
functions:
- basic_scan
description: "Perform a basic network scan using nmap."
parameters:
target:
type: str
description: "The target IP address or hostname to scan."
- intense_scan
description: "Perform an intense network scan using nmap."
parameters:
target:
type: str
description: "The target IP address or hostname to scan."
- stealth_scan
description: "Perform a stealth network scan using nmap."
parameters:
target:
type: str
description: "The target IP address or hostname to scan."
- quick_scan
description: "Perform a quick network scan using nmap."
parameters:
target:
type: str
description: "The target IP address or hostname to scan."
- vulnerability_scan
description: "Perform a vulnerability scan using nmap."
parameters:
target:
type: str
description: "The target IP address or hostname to scan."
- strings:
functions:
- basic_strings
description: "Perform a basic strings scan using strings."
parameters:
target:
type: str
description: "The target file to scan."
- min_length_strings
description: "Perform a min length strings scan using strings."
parameters:
target:
type: str
description: "The target file to scan."
- offset_strings
description: "Perform an offset strings scan using strings."
parameters:
target:
type: str
description: "The target file to scan."
- encoding_strings
description: "Perform an encoding strings scan using strings."
parameters:
target:
type: str
description: "The target file to scan."
- objdump:
functions:
- file_headers
description: "Perform a file headers scan using objdump."
parameters:
target:
type: str
description: "The target file to scan."
- disassemble
description: "Perform a disassemble scan using objdump."
parameters:
target:
type: str
description: "The target file to scan."
- symbol_table
description: "Perform a symbol table scan using objdump."
parameters:
target:
type: str
description: "The target file to scan."
- section_headers
description: "Perform a section headers scan using objdump."
parameters:
target:
type: str
description: "The target file to scan."
- full_contents
description: "Perform a full contents scan using objdump."
parameters:
target:
type: str
description: "The target file to scan."
- nm:
functions:
- basic_symbols
description: "Perform a basic symbols scan using nm."
parameters:
target:
type: str
description: "The target file to scan."
- dynamic_symbols
description: "Perform a dynamic symbols scan using nm."
parameters:
target:
type: str
description: "The target file to scan."
- demangle_symbols
description: "Perform a demangle symbols scan using nm."
parameters:
target:
type: str
description: "The target file to scan."
- numeric_sort
description: "Perform a numeric sort scan using nm."
parameters:
target:
type: str
description: "The target file to scan."
- size_sort
description: "Perform a size sort scan using nm."
parameters:
target:
type: str
description: "The target file to scan."
- undefined_symbols
description: "Perform an undefined symbols scan using nm."
parameters:
target:
type: str
description: "The target file to scan."
- wireshark:
functions:
- capture_live
description: "Perform a capture live scan using wireshark."
parameters:
interface:
type: str
description: "The network interface to capture live traffic on."
duration:
type: int
description: "The duration of the capture in seconds."
- analyze_pcap
description: "Perform an analyze pcap scan using wireshark."
parameters:
pcap_file:
type: str
description: "The pcap file to analyze."
- extract_http
description: "Perform an extract http scan using wireshark."
parameters:
pcap_file:
type: str
description: "The pcap file to analyze."
- protocol_hierarchy
description: "Perform a protocol hierarchy scan using wireshark."
parameters:
pcap_file:
type: str
description: "The pcap file to analyze."
- conversation_statistics
description: "Perform a conversation statistics scan using wireshark."
parameters:
pcap_file:
type: str
description: "The pcap file to analyze."
- expert_info
description: "Perform an expert info scan using wireshark."
parameters:
pcap_file:
type: str
description: "The pcap file to analyze."
```
--------------------------------------------------------------------------------
/src/mcp_server.py:
--------------------------------------------------------------------------------
```python
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
import os
from src.kali_mcps.nmap.actions import basic_scan_action, intense_scan_action, stealth_scan_action, quick_scan_action, vulnerability_scan_action
from src.kali_mcps.nm.actions import basic_symbols_action, dynamic_symbols_action, demangle_symbols_action, numeric_sort_action, size_sort_action, undefined_symbols_action
from src.kali_mcps.objdump.actions import file_headers_action, disassemble_action, symbol_table_action, section_headers_action, full_contents_action
from src.kali_mcps.strings.actions import basic_strings_action, min_length_strings_action, offset_strings_action, encoding_strings_action
from src.kali_mcps.wireshark.actions import capture_live_action, analyze_pcap_action, extract_http_action, protocol_hierarchy_action, conversation_statistics_action, expert_info_action
from src.kali_mcps.traceroute.actions import traceroute_action
mcp = FastMCP("kali-tools")
IS_SAFE = os.environ.get("IS_SAFE", "false").lower() == "true" # is safe mode, if true, the command will be executed in the sandbox
# nmap start
@mcp.tool()
def basic_scan(target: str):
"""Perform a basic network scan using nmap.
Args:
target (str): The target IP address or hostname to scan.
Returns:
str: The output results of the basic scan.
"""
return basic_scan_action(target)
@mcp.tool()
def intense_scan(target: str):
"""Perform an intense network scan using nmap.
Args:
target (str): The target IP address or hostname to scan.
Returns:
str: The output results of the intense scan.
"""
return intense_scan_action(target)
@mcp.tool()
def stealth_scan(target: str):
"""Perform a stealth network scan using nmap.
Args:
target (str): The target IP address or hostname to scan.
Returns:
str: The output results of the stealth scan.
"""
return stealth_scan_action(target)
@mcp.tool()
def quick_scan(target: str):
"""Perform a quick network scan using nmap.
Args:
target (str): The target IP address or hostname to scan.
Returns:
str: The output results of the quick scan.
"""
return quick_scan_action(target)
@mcp.tool()
def vulnerability_scan(target: str):
"""Perform a vulnerability scan using nmap.
Args:
target (str): The target IP address or hostname to scan.
Returns:
str: The output results of the vulnerability scan.
"""
return vulnerability_scan_action(target)
# nmap end
# nm start
@mcp.tool()
def basic_symbols(target: str):
"""Perform a basic symbol listing using nm.
Args:
target (str): The target file or executable to analyze.
Returns:
str: The output results of the basic symbol listing.
"""
return basic_symbols_action(target)
@mcp.tool()
def dynamic_symbols(target: str):
"""Perform a dynamic symbol listing using nm.
Args:
target (str): The target file or executable to analyze.
Returns:
str: The output results of the dynamic symbol listing.
"""
return dynamic_symbols_action(target)
@mcp.tool()
def demangle_symbols(target: str):
"""Perform a demangling of symbols using nm.
Args:
target (str): The target file or executable to analyze.
Returns:
str: The output results of the demangling of symbols.
"""
return demangle_symbols_action(target)
@mcp.tool()
def numeric_sort(target: str):
"""Perform a numeric sort of symbols using nm.
Args:
target (str): The target file or executable to analyze.
Returns:
str: The output results of the numeric sort of symbols.
"""
return numeric_sort_action(target)
@mcp.tool()
def size_sort(target: str):
"""Perform a size sort of symbols using nm.
Args:
target (str): The target file or executable to analyze.
Returns:
str: The output results of the size sort of symbols.
"""
return size_sort_action(target)
@mcp.tool()
def undefined_symbols(target: str):
"""Perform an undefined symbol listing using nm.
Args:
target (str): The target file or executable to analyze.
Returns:
str: The output results of the undefined symbol listing.
"""
return undefined_symbols_action(target)
# nm end
# objdump start
@mcp.tool()
def file_headers(target: str):
"""Perform a file header listing using objdump.
Args:
target (str): The target file or executable to analyze.
Returns:
str: The output results of the file header listing.
"""
return file_headers_action(target)
@mcp.tool()
def disassemble(target: str):
"""Perform a disassembly of the target file using objdump.
Args:
target (str): The target file or executable to analyze.
Returns:
str: The output results of the disassembly of the target file.
"""
return disassemble_action(target)
@mcp.tool()
def symbol_table(target: str):
"""Perform a symbol table listing using objdump.
Args:
target (str): The target file or executable to analyze.
Returns:
str: The output results of the symbol table listing.
"""
return symbol_table_action(target)
@mcp.tool()
def section_headers(target: str):
"""Perform a section header listing using objdump.
Args:
target (str): The target file or executable to analyze.
Returns:
str: The output results of the section header listing.
"""
return section_headers_action(target)
@mcp.tool()
def full_contents(target: str):
"""Perform a full contents listing using objdump.
Args:
target (str): The target file or executable to analyze.
Returns:
str: The output results of the full contents listing.
"""
return full_contents_action(target)
# objdump end
# strings start
@mcp.tool()
def basic_strings(target: str):
"""Perform a basic string listing using strings.
Args:
target (str): The target file or executable to analyze.
Returns:
str: The output results of the basic string listing.
"""
return basic_strings_action(target)
@mcp.tool()
def min_length_strings(target: str):
"""Perform a minimum length string listing using strings.
Args:
target (str): The target file or executable to analyze.
Returns:
str: The output results of the minimum length string listing.
"""
return min_length_strings_action(target)
@mcp.tool()
def offset_strings(target: str):
"""Perform an offset string listing using strings.
Args:
target (str): The target file or executable to analyze.
Returns:
str: The output results of the offset string listing.
"""
return offset_strings_action(target)
@mcp.tool()
def encoding_strings(target: str):
"""Perform an encoding string listing using strings.
Args:
target (str): The target file or executable to analyze.
Returns:
str: The output results of the encoding string listing.
"""
return encoding_strings_action(target)
# strings end
# tshark start
@mcp.tool()
def capture_live(interface: str, duration: int = 30, filter: str = ""):
"""Perform a live capture of network traffic using tshark.
Args:
interface (str): The network interface to capture from.
duration (int): The duration of the capture in seconds.
filter (str): The filter to apply to the capture.
Returns:
str: The output results of the live capture of network traffic.
"""
return capture_live_action(interface, duration, filter)
@mcp.tool()
def analyze_pcap(pcap_file: str, display_filter: str = ""):
"""Perform an analysis of a pcap file using tshark.
Args:
pcap_file (str): The path to the pcap file to analyze.
display_filter (str): The filter to apply to the analysis.
Returns:
str: The output results of the analysis of the pcap file.
"""
return analyze_pcap_action(pcap_file, display_filter)
@mcp.tool()
def extract_http(pcap_file: str):
"""Perform an HTTP extraction from a pcap file using tshark.
Args:
pcap_file (str): The path to the pcap file to extract HTTP from.
Returns:
str: The output results of the HTTP extraction from the pcap file.
"""
return extract_http_action(pcap_file)
@mcp.tool()
def protocol_hierarchy(pcap_file: str):
"""Perform a protocol hierarchy listing using tshark.
Args:
pcap_file (str): The path to the pcap file to analyze.
Returns:
str: The output results of the protocol hierarchy listing.
"""
return protocol_hierarchy_action(pcap_file)
@mcp.tool()
def conversation_statistics(pcap_file: str):
"""Perform a conversation statistics listing using tshark.
Args:
pcap_file (str): The path to the pcap file to analyze.
Returns:
str: The output results of the conversation statistics listing.
"""
return conversation_statistics_action(pcap_file)
@mcp.tool()
def expert_info(pcap_file: str):
"""Perform an expert information listing using tshark.
Args:
pcap_file (str): The path to the pcap file to analyze.
Returns:
str: The output results of the expert information listing.
"""
return expert_info_action(pcap_file)
# tshark end
# traceroute start
@mcp.tool()
def traceroute(target: str):
"""
Perform a traceroute to the target.
Args:
target (str): The target IP address or hostname to traceroute to.
Returns:
str: The output results of the traceroute.
"""
return traceroute_action(target)
# traceroute end
# run server, using stdio transport
if __name__ == "__main__":
print("Starting server is running")
mcp.run(transport="stdio")
```