#
tokens: 11202/50000 19/19 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── app.py
├── config.yaml
├── Dockerfile
├── LICENSE
├── README.md
├── requirements.txt
└── src
    ├── kali_mcps
    │   ├── __init__.py
    │   ├── base
    │   │   ├── __init__.py
    │   │   └── kali_command.py
    │   ├── manual.yaml
    │   ├── nm
    │   │   └── actions.py
    │   ├── nmap
    │   │   ├── __init__.py
    │   │   └── actions.py
    │   ├── objdump
    │   │   └── actions.py
    │   ├── strings
    │   │   └── actions.py
    │   ├── traceroute
    │   │   └── actions.py
    │   └── wireshark
    │       └── actions.py
    ├── mcp_server.py
    └── sandbox.py
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
#   For a library or package, you might want to ignore these files since the code is
#   intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
#   However, in case of collaboration, if having platform-specific dependencies or dependencies
#   having no cross-platform support, pipenv may install dependencies that don't work, or not
#   install all needed dependencies.
#Pipfile.lock

# UV
#   Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#uv.lock

# poetry
#   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
#   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
#   in version control.
#   https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
#  and can be added to the global gitignore or merged into this file.  For a more nuclear
#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# Ruff stuff:
.ruff_cache/

# PyPI configuration file
.pypirc
sandbox_expire/
logs/
tests/

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# awsome-kali-MCPServers

## Overview
Welcome to awsome-kali-MCPServers! This repository is a collection of Model Context Protocol (MCP) servers designed specifically for Kali Linux environments. The goal is to enhance reverse engineering, security testing, and automation workflows by integrating powerful tools and flexible features. Whether you're a security researcher or a developer, this project aims to streamline your tasks with Kali Linux.

## Quick Start
Follow these steps to quickly get started with `kali-mcps`:
1. **Build the Docker Image**
First, build the Docker image, temporarily named kali-mcps. Run the following command in the project root directory:
```bash
docker build -t kali-mcps:latest .
```
2. **Launch an MCP Client**
Ensure you have an MCP client installed, such as claude desktop, cline, goose, or roo code. Open your chosen MCP client.
3. **Configure the MCP Client**
In your MCP client, create a configuration file (e.g., config.json) with the following content:
```json
{
  "mcpServers": {
    "kali-docker": {
      "command": "docker",
      "args": ["run", "-i", "kali-mcps:latest"]
    }
  }
}
```
- `"kali-docker"` is the server name, which you can customize.
- `"command": "docker"` specifies that Docker will be used to run the container.
- `"args"` defines the Docker run parameters: `-i` enables interactive mode, and `kali-mcps:latest` is the image you just built.

4. **Use Kali Tools**
Once configured, connect to the kali-mcps container via the MCP client and start using the built-in Kali tools (e.g., Nmap, nm, objdump, strings, tshark) for your tasks. Examples include:
- Run `basic_scan` for basic network scanning.
- Run `disassemble` to disassemble a target file.
- Run `capture_live` to capture real-time network traffic.

<p align="center">
  <img width="482" alt="image" src="https://github.com/user-attachments/assets/0e9fff0a-059d-424b-bb36-450a1d11adf9" />
</p>

## What to Expect
Network Analysis: Tools for sniffing and analyzing traffic.
Binary Understanding: Support for reverse engineering and function analysis.
Automation: Scripts and servers to simplify repetitive tasks.

## New Features
Since the last update, we have added the following features, integrating a series of tools based on the FastMCP framework:

### 1. Network Scanning (Nmap)
- `basic_scan`: Basic network scanning.
- `intense_scan`: In-depth network scanning.
- `stealth_scan`: Stealth network scanning.
- `quick_scan`: Quick network scanning.
- `vulnerability_scan`: Vulnerability scanning.

### 2. Symbol Analysis (nm)
- `basic_symbols`: Lists basic symbols.
- `dynamic_symbols`: Lists dynamic symbols.
- `demangle_symbols`: Decodes symbols.
- `numeric_sort`: Sorts symbols numerically.
- `size_sort`: Sorts symbols by size.
- `undefined_symbols`: Lists undefined symbols.

### 3. Binary Analysis (objdump)
- `file_headers`: Lists file headers.
- `disassemble`: Disassembles the target file.
- `symbol_table`: Lists the symbol table.
- `section_headers`: Lists section headers.
- `full_contents`: Lists full contents.

### 4. String Extraction (strings)
- `basic_strings`: Basic string extraction.
- `min_length_strings`: Extracts strings with a specified minimum length.
- `offset_strings`: Extracts strings with offsets.
- `encoding_strings`: Extracts strings based on encoding.

### 5. Network Traffic Analysis (Wireshark/tshark)
- `capture_live`: Captures network traffic in real-time.
- `analyze_pcap`: Analyzes pcap files.
- `extract_http`: Extracts HTTP data.
- `protocol_hierarchy`: Lists protocol hierarchy.
- `conversation_statistics`: Provides conversation statistics.
- `expert_info`: Analyzes expert information.
### 6. Sandbox Support (Docker)
A new sandbox feature has been added, enabling secure command execution in an isolated container environment:

Runs commands using Docker containers, with the default image being ubuntu-systemd:22.04.
Configurable memory limit (default: 2GB), CPU limit (default: 1 core), network mode, and timeout duration.
Supports bidirectional file copying between the host and the container.
Automatically cleans up container resources.


## TODO
- [ ] **Docker Sandbox Support**: Add containerized environments for safe testing and execution.
- [ ] **Network Tools Integration**: Support for tools like Nmap and Wireshark for advanced network analysis.
- [ ] **Reverse Engineering Tools**: Integrate Ghidra and Radare2 for enhanced binary analysis.
- [ ] **Agent Support**: Enable agent-based functionality for distributed tasks or remote operations.
 
## Current Status
This project is still in its early stages. I’m working on preparing the content, including server configurations, tool integrations, and documentation. Nothing is fully ready yet, but stay tuned—exciting things are coming soon!

## Stay Updated
Feel free to star or watch this repository to get updates as I add more features and files. Contributions and suggestions are welcome once the groundwork is laid out.

```

--------------------------------------------------------------------------------
/src/kali_mcps/base/__init__.py:
--------------------------------------------------------------------------------

```python
"""
Base package for Kali commands
""" 
```

--------------------------------------------------------------------------------
/src/kali_mcps/nmap/__init__.py:
--------------------------------------------------------------------------------

```python
"""
Nmap package for network scanning
""" 
```

--------------------------------------------------------------------------------
/config.yaml:
--------------------------------------------------------------------------------

```yaml
kalilinux:
  name: kali-tools
  image: kalimcps:latest # docker built -t kalimcps:latest .


```

--------------------------------------------------------------------------------
/src/kali_mcps/__init__.py:
--------------------------------------------------------------------------------

```python
"""
Kali MCP Servers package
"""
from src.kali_mcps.base.kali_command import CommandRunner
from src.kali_mcps.objdump.actions import (
    file_headers_action,
    disassemble_action,
    symbol_table_action,
    section_headers_action,
)

__all__ = [
    "CommandRunner",
    "file_headers_action",
    "disassemble_action",
    "symbol_table_action",
    "section_headers_action",
]
```

--------------------------------------------------------------------------------
/src/kali_mcps/traceroute/actions.py:
--------------------------------------------------------------------------------

```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner

class TracerouteCommand(CommandRunner):
    def __init__(self):
        super().__init__("traceroute", network_enabled=True, memory_limit="1g", timeout=120)

def traceroute_action(target: str):
    """
    Traceroute to the target
    """
    cmd = TracerouteCommand()
    command = ["traceroute", target]
    return cmd.execute(command)


if __name__ == "__main__":
    print(traceroute_action("8.8.8.8"))
```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM python:3.12-slim-bookworm

WORKDIR /app

ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update && \
    apt-get install -y \
        iputils-ping \
        net-tools \
        dnsutils \
        nmap \
        binutils \
        traceroute \
        tshark && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

COPY requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt 


COPY . .

ENV PYTHONPATH=/app
ENV IS_SAFE=false

# set run cmd
ENTRYPOINT ["python", "app.py"]
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
annotated-types==0.7.0
anyio==4.9.0
certifi==2025.1.31
charset-normalizer==3.4.1
click==8.1.8
docker==7.1.0
h11==0.14.0
httpcore==1.0.7
httpx==0.28.1
httpx-sse==0.4.0
idna==3.10
iniconfig==2.1.0
loguru==0.7.3
mcp==1.6.0
packaging==24.2
pluggy==1.5.0
pydantic==2.11.1
pydantic-settings==2.8.1
pydantic_core==2.33.0
pytest==8.3.5
pytest-asyncio==0.26.0
python-dotenv==1.1.0
requests==2.32.3
sniffio==1.3.1
sse-starlette==2.2.1
starlette==0.46.1
typing-inspection==0.4.0
typing_extensions==4.13.0
urllib3==2.3.0
uvicorn==0.34.0

```

--------------------------------------------------------------------------------
/app.py:
--------------------------------------------------------------------------------

```python
from src.mcp_server import mcp 
import argparse  # 添加 argparse 模块
import docker
from mcp.server.models import InitializationOptions


if __name__ == "__main__":
    # create arg parser
    parser = argparse.ArgumentParser(description='MCP Server with Kali image configuration')
    parser.add_argument('--kali-image', 
                       type=str,
                       required=False,
                       help='Specify the Kali Linux image name to use')
    
    args = parser.parse_args()
    if args.kali_image:
        image_name = args.kali_image
        # use docker lib to check if the image exists
        client = docker.from_env()
        try:
            client.images.get(image_name)
            print(f"Kali image {image_name} found")
            client.close()
        except docker.errors.ImageNotFound:
            print(f"Kali image {image_name} not found")
            exit(1)
        print(f"Using Kali image: {image_name}")
    
    print("Starting server is running")
    
    # pass image name to mcp
    mcp.run(transport="stdio")
```

--------------------------------------------------------------------------------
/src/kali_mcps/objdump/actions.py:
--------------------------------------------------------------------------------

```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner

class ObjdumpCommand(CommandRunner):
    def __init__(self):
        super().__init__("objdump", network_enabled=False, memory_limit="1g", timeout=120)

def file_headers_action(target: str) -> tuple[str, str]:
    """
    Display file headers
    For example: objdump -f /path/to/file
    """
    cmd = ObjdumpCommand()
    command = ["objdump", "-f", target]
    return cmd.execute(command)

def disassemble_action(target: str, section: str = ".text") -> tuple[str, str]:
    """
    Disassemble section (default: .text)
    For example: objdump -d -j .text /path/to/file
    """
    cmd = ObjdumpCommand()
    command = ["objdump", "-d", "-j", section, target]
    return cmd.execute(command)

def symbol_table_action(target: str) -> tuple[str, str]:
    """
    Display symbol table
    For example: objdump -t /path/to/file
    """
    cmd = ObjdumpCommand()
    command = ["objdump", "-t", target]
    return cmd.execute(command)

def section_headers_action(target: str) -> tuple[str, str]:
    """
    Display all section headers
    For example: objdump -h /path/to/file
    """
    cmd = ObjdumpCommand()
    command = ["objdump", "-h", target]
    return cmd.execute(command)

def full_contents_action(target: str) -> tuple[str, str]:
    """
    Display all information including headers and disassembly
    For example: objdump -x /path/to/file
    """
    cmd = ObjdumpCommand()
    command = ["objdump", "-x", target]
    return cmd.execute(command)

if __name__ == "__main__":
    # Test example
    target_file = "/bin/ls"
    print(file_headers_action(target_file))

```

--------------------------------------------------------------------------------
/src/kali_mcps/nmap/actions.py:
--------------------------------------------------------------------------------

```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner

class NmapCommand(CommandRunner):
    def __init__(self):
        super().__init__("nmap", 
                        network_enabled=True,  # nmap需要网络访问
                        memory_limit="2g",     # 需要更多内存
                        timeout=300)           # 需要更长的超时时间

def basic_scan_action(target: str) -> tuple[str, str]:
    """
    Basic scan
    For example: nmap 192.168.1.1
    """
    cmd = NmapCommand()
    command = ["nmap", target]
    return cmd.execute(command)

def intense_scan_action(target: str) -> tuple[str, str]:
    """
    Intense scan (-T4 -A)
    Includes: OS detection, version detection, script scanning, and traceroute
    """
    cmd = NmapCommand()
    command = ["nmap", "-T4", "-A", target]
    return cmd.execute(command)

def stealth_scan_action(target: str) -> tuple[str, str]:
    """
    SYN scan (-sS)
    Half-open scan, more stealthy
    Requires root privileges
    """
    cmd = NmapCommand()
    command = ["nmap", "-sS", target]
    return cmd.execute(command)

def quick_scan_action(target: str) -> tuple[str, str]:
    """
    Quick scan (-T4 -F)
    Only scans the most common ports
    """
    cmd = NmapCommand()
    command = ["nmap", "-T4", "-F", target]
    return cmd.execute(command)

def vulnerability_scan_action(target: str) -> tuple[str, str]:
    """
    Vulnerability scan (-sV --script vuln)
    Uses vulnerability detection scripts
    """
    cmd = NmapCommand()
    command = ["nmap", "-sV", "--script", "vuln", target]
    return cmd.execute(command)

if __name__ == "__main__":
    # Test example
    print(quick_scan_action("10.1.1.106"))

```

--------------------------------------------------------------------------------
/src/kali_mcps/nm/actions.py:
--------------------------------------------------------------------------------

```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner

class NmCommand(CommandRunner):
    def __init__(self):
        super().__init__("nm", network_enabled=False, memory_limit="1g", timeout=120)

def basic_symbols_action(target: str) -> tuple[str, str]:
    """
    Basic symbol listing
    For example: nm /path/to/file
    """ 
    cmd = NmCommand()
    command = ["nm", target]
    return cmd.execute(command)

def dynamic_symbols_action(target: str) -> tuple[str, str]:
    """
    Display dynamic symbols
    For example: nm -D /path/to/file
    """
    cmd = NmCommand()
    command = ["nm", "-D", target]
    return cmd.execute(command)

def demangle_symbols_action(target: str) -> tuple[str, str]:
    """
    Demangle C++ symbols
    For example: nm -C /path/to/file
    """
    cmd = NmCommand()
    command = ["nm", "-C", target]
    return cmd.execute(command)

def numeric_sort_action(target: str) -> tuple[str, str]:
    """
    Sort symbols numerically by address
    For example: nm -n /path/to/file
    """
    cmd = NmCommand()
    command = ["nm", "-n", target]
    return cmd.execute(command)

def size_sort_action(target: str) -> tuple[str, str]:
    """
    Sort symbols by size
    For example: nm -S /path/to/file
    """
    cmd = NmCommand()
    command = ["nm", "-S", target]
    return cmd.execute(command)

def undefined_symbols_action(target: str) -> tuple[str, str]:
    """
    Display only undefined symbols
    For example: nm -u /path/to/file
    """
    cmd = NmCommand()
    command = ["nm", "-u", target]
    return cmd.execute(command)

if __name__ == "__main__":
    # Test example
    target_file = "/bin/ls"
    print(basic_symbols_action(target_file))

```

--------------------------------------------------------------------------------
/src/kali_mcps/strings/actions.py:
--------------------------------------------------------------------------------

```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner

class StringsCommand(CommandRunner):
    def __init__(self):
        super().__init__("strings", network_enabled=False, memory_limit="1g", timeout=120)

def basic_strings_action(target: str, input_file: bytes = None) -> tuple[str, str]:
    """
    Basic strings analysis
    For example: strings /path/to/file
    """
    cmd = StringsCommand()
    command = ["strings", target]
    return cmd.execute(command, input_files={"input_file": "/tmp/input_file"})

def min_length_strings_action(target: str, length: int = 6) -> tuple[str, str]:
    """
    Strings analysis with specified minimum length
    For example: strings -n 6 /path/to/file
    """
    cmd = StringsCommand()
    command = ["strings", "-n", str(length), target]
    return cmd.execute(command)

def offset_strings_action(target: str, format: str = "x") -> tuple[str, str]:
    """
    Strings analysis showing string offsets
    format can be: 'd' (decimal), 'o' (octal), 'x' (hexadecimal)
    For example: strings -t x /path/to/file
    """
    cmd = StringsCommand()
    command = ["strings", "-t", format, target]
    return cmd.execute(command)

def encoding_strings_action(target: str, encoding: str = "S") -> tuple[str, str]:
    """
    Strings analysis with specified character encoding
    encoding can be: 
    - 's' = 7-bit string
    - 'S' = 8-bit string
    - 'b' = 16-bit big-endian
    - 'l' = 16-bit little-endian
    For example: strings -e S /path/to/file
    """
    cmd = StringsCommand()
    command = ["strings", "-e", encoding, target]
    return cmd.execute(command)

if __name__ == "__main__":
    # Test example
    target_file = "/bin/ls"
    input_file = bytes.fromhex("68656c6c6f20776f726c64")
    print(basic_strings_action(target_file, {"input_file": input_file , "output_file": "/tmp/output_file"}))

```

--------------------------------------------------------------------------------
/src/kali_mcps/wireshark/actions.py:
--------------------------------------------------------------------------------

```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner

class TsharkCommand(CommandRunner):
    def __init__(self):
        super().__init__("tshark", 
                        network_enabled=True,  # 需要网络访问
                        memory_limit="2g",     # 需要更多内存
                        timeout=300)           # 需要更长的超时时间

def capture_live_action(interface: str, duration: int = 30, filter: str = "") -> tuple[str, str]:
    """
    Capture live traffic from network interface
    For example: tshark -i eth0 -a duration:30 -f "port 80"
    """
    cmd = TsharkCommand()
    command = ["tshark", "-i", interface, "-a", f"duration:{duration}"]
    if filter:
        command.extend(["-f", filter])
    return cmd.execute(command)

def analyze_pcap_action(pcap_file: str, display_filter: str = "") -> tuple[str, str]:
    """
    Analyze existing pcap file
    For example: tshark -r file.pcap -Y "http"
    """
    cmd = TsharkCommand()
    command = ["tshark", "-r", pcap_file]
    if display_filter:
        command.extend(["-Y", display_filter])
    return cmd.execute(command)

def extract_http_action(pcap_file: str) -> tuple[str, str]:
    """
    Extract HTTP objects from pcap file
    For example: tshark -r file.pcap -Y "http" -T fields -e http.request.method -e http.request.uri
    """
    cmd = TsharkCommand()
    command = [
        "tshark", "-r", pcap_file,
        "-Y", "http",
        "-T", "fields",
        "-e", "http.request.method",
        "-e", "http.request.uri"
    ]
    return cmd.execute(command)

def protocol_hierarchy_action(pcap_file: str) -> tuple[str, str]:
    """
    Show protocol hierarchy statistics
    For example: tshark -r file.pcap -q -z io,phs
    """
    cmd = TsharkCommand()
    command = ["tshark", "-r", pcap_file, "-q", "-z", "io,phs"]
    return cmd.execute(command)

def conversation_statistics_action(pcap_file: str) -> tuple[str, str]:
    """
    Show conversation statistics
    For example: tshark -r file.pcap -q -z conv,ip
    """
    cmd = TsharkCommand()
    command = ["tshark", "-r", pcap_file, "-q", "-z", "conv,ip"]
    return cmd.execute(command)

def expert_info_action(pcap_file: str) -> tuple[str, str]:
    """
    Show expert information (errors, warnings, notes)
    For example: tshark -r file.pcap -q -z expert
    """
    cmd = TsharkCommand()
    command = ["tshark", "-r", pcap_file, "-q", "-z", "expert"]
    return cmd.execute(command)

if __name__ == "__main__":
    # Test example
    pcap_file = "capture.pcap"
    print(protocol_hierarchy_action(pcap_file))

```

--------------------------------------------------------------------------------
/src/kali_mcps/base/kali_command.py:
--------------------------------------------------------------------------------

```python
import subprocess
import os
import asyncio
from src.sandbox import create_sandbox_client, SandboxSettings, SandboxTimeoutError, SandboxError

class CommandRunner:
    """Base class for executing Kali commands"""
    
    def __init__(self, command_name: str, network_enabled: bool = False, 
                 memory_limit: str = "1g", timeout: int = 120):
        """
        Initialize CommandRunner
        Args:
            command_name: Name of the command (e.g. 'objdump', 'nm', etc.)
            network_enabled: Whether network access is needed
            memory_limit: Memory limit for sandbox
            timeout: Timeout in seconds
        """
        self.command_name = command_name
        self.network_enabled = network_enabled
        self.memory_limit = memory_limit
        self.timeout = timeout
        self.IS_SAFE = os.environ.get("IS_SAFE", "false").lower() == "true"

    def run_command(self, command: list) -> tuple[str, str]:
        """Execute command and return output results"""
        try:
            process = subprocess.Popen(
                command,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                universal_newlines=True
            )
            stdout, stderr = process.communicate()
            return stdout, stderr
        except Exception as e:
            return "", str(e)

    async def run_with_sandbox(self, command: list, input_files: dict = None) -> tuple[str, str]:
        """
        Execute command in Kali sandbox
        Args:
            command: Command to execute
            input_files: Dict of {local_path: container_path} for files to copy into container
        """
        kali_config = SandboxSettings(
            image="kalilinux/kali-rolling",
            memory_limit=self.memory_limit,
            cpu_limit=1.0,
            network_enabled=self.network_enabled,
            timeout=self.timeout
        )
        client = create_sandbox_client()
        try:
            await client.create(config=kali_config)
            
            # 如果有输入文件,先复制到容器中
            if input_files:
                for local_path, container_path in input_files.items():
                    await client.copy_to_container(local_path, container_path)
            
            # 执行命令
            cmd_str = " ".join(command)
            stdout = await client.run_command(cmd_str)
            return stdout, ""
        except Exception as e:
            return "", str(e)
        finally:
            await client.cleanup()

    async def safe_execute_kali_command(self, command: list, input_files: dict = None) -> tuple[str, str]:
        """
        Safely execute Kali command
        Args:
            command: Command to execute
            input_files: Dict of {local_path: container_path} for files to copy into container
        """
        try:
            result = await self.run_with_sandbox(command, input_files)
            return result
        except SandboxTimeoutError:
            return "", "Command execution timed out"
        except SandboxError as e:
            return "", f"Sandbox execution error: {str(e)}"
        except Exception as e:
            return "", f"Unknown error: {str(e)}"

    def execute(self, command: list, input_files: dict = None) -> tuple[str, str]:
        """
        Execute command with safety check
        Args:
            command: Command to execute
            input_files: Dict of {local_path: container_path} for files to copy into container
        """
        # if self.IS_SAFE:
        #     return await self.safe_execute_kali_command(command, input_files)
        # else:
        return self.run_command(command)  
```

--------------------------------------------------------------------------------
/src/sandbox.py:
--------------------------------------------------------------------------------

```python
import asyncio
import docker
import tarfile
import os
import io
from typing import Optional, Dict, Any
from dataclasses import dataclass

class SandboxSettings:
    """sandbox configuration settings"""
    def __init__(
        self,
        image: str = "kalimcps:latest",
        memory_limit: str = "2g",
        cpu_limit: float = 1.0,
        network_enabled: bool = True,
        network_mode: str = "bridge",
        timeout: int = 300
    ):
        self.image = image
        self.memory_limit = memory_limit
        self.cpu_limit = cpu_limit
        self.network_enabled = network_enabled
        self.network_mode = network_mode
        self.timeout = timeout

class SandboxError(Exception):
    """Base exception for sandbox-related errors."""


class SandboxTimeoutError(SandboxError):
    """Exception raised when a sandbox operation times out."""


class SandboxResourceError(SandboxError):
    """Exception raised for resource-related errors."""


class SandboxClient:
    """Sandbox client, providing only container creation and cleanup functionality"""
    
    def __init__(self):
        self.client = docker.from_env()
        self.container = None

    async def create(self, config: SandboxSettings) -> None:
        """Create and start the container"""
        try:
            # prepare container configuration
            container_config = {
                "image": config.image,
                "detach": True,
                "mem_limit": config.memory_limit,
                "nano_cpus": int(config.cpu_limit * 1e9),
                "network_mode": config.network_mode if config.network_enabled else "none",
            }
            
            # create and start container
            self.container = self.client.containers.run(**container_config)
            print(f"Container created: {self.container.id}")
        except Exception as e:
            raise SandboxError(f"Failed to create container: {str(e)}")
    
    async def run_command(self, command: str) -> str:
        """Execute a command in the container"""
        if not self.container:
            raise SandboxError("Container not created")
        
        try:
            exec_result = self.container.exec_run(command, tty=True)
            return exec_result.output.decode('utf-8')
        except Exception as e:
            raise SandboxError(f"Failed to execute command: {str(e)}")

    async def copy_to_container(self, source_path: str, container_path: str) -> None:
        """
        Copy a file from host to container
        Args:
            source_path: Path to the file on host
            container_path: Path where to put the file in container
        """
        if not self.container:
            raise SandboxError("Container not created")
        
        try:
            # create a memory-based tar file
            tar_stream = io.BytesIO()
            with tarfile.open(fileobj=tar_stream, mode='w') as tar:
                # add file to tar
                tar.add(source_path, arcname=os.path.basename(container_path))
            
            tar_stream.seek(0)
            # copy to container
            self.container.put_archive(
                path=os.path.dirname(container_path),
                data=tar_stream
            )
            print(f"File copied to container: {container_path}")
        except Exception as e:
            raise SandboxError(f"Failed to copy file to container: {str(e)}")

    async def copy_from_container(self, container_path: str, dest_path: str) -> None:
        """
        Copy a file from container to host
        Args:
            container_path: Path to the file in container
            dest_path: Path where to put the file on host
        """
        if not self.container:
            raise SandboxError("Container not created")
        
        try:
            # get the tar stream of the file from container
            bits, stat = self.container.get_archive(container_path)
            
            # create the destination directory (if it doesn't exist)
            os.makedirs(os.path.dirname(dest_path), exist_ok=True)
            
            # write the tar stream to a temporary file
            with open(dest_path, 'wb') as f:
                for chunk in bits:
                    f.write(chunk)
            print(f"File copied from container: {dest_path}")
        except Exception as e:
            raise SandboxError(f"Failed to copy file from container: {str(e)}")
    
    async def cleanup(self) -> None:
        """Clean up and remove the container"""
        if self.container:
            try:
                self.container.stop()
                self.container.remove()
                print(f"Container cleaned up: {self.container.id}")
                self.container = None
            except Exception as e:
                print(f"Error cleaning up container: {str(e)}")

def create_sandbox_client() -> SandboxClient:
    """Create a factory function for the sandbox client"""
    return SandboxClient()



# example
async def run_in_sandbox(command: str) -> str:
    """Run a command in the sandbox"""
    config = SandboxSettings()
    client = create_sandbox_client()
    
    try:
        await client.create(config=config)
        result = await client.run_command(command)
        return result
    except SandboxTimeoutError:
        return "Command execution timed out"
    except SandboxError as e:
        return f"Sandbox execution error: {str(e)}"
    except Exception as e:
        return f"Unknown error: {str(e)}"
    finally:
        await client.cleanup()

# if you run this script directly
if __name__ == "__main__":
    async def main():
        print("Starting sandbox test...")

        result = await run_in_sandbox("echo 'Sandbox test successful'")
        print(result)
    
    asyncio.run(main()) 
```

--------------------------------------------------------------------------------
/src/kali_mcps/manual.yaml:
--------------------------------------------------------------------------------

```yaml

mcp:
  tools:
    - nmap:
        functions:
          - basic_scan
            description: "Perform a basic network scan using nmap."
            parameters:
              target:
                type: str
                description: "The target IP address or hostname to scan."
          - intense_scan
            description: "Perform an intense network scan using nmap."
            parameters:
              target:
                type: str
                description: "The target IP address or hostname to scan."
          - stealth_scan
            description: "Perform a stealth network scan using nmap."
            parameters:
              target:
                type: str
                description: "The target IP address or hostname to scan."
          - quick_scan
            description: "Perform a quick network scan using nmap."
            parameters:
              target:
                type: str
                description: "The target IP address or hostname to scan."
          - vulnerability_scan
            description: "Perform a vulnerability scan using nmap."
            parameters:
              target:
                type: str
                description: "The target IP address or hostname to scan."
                
    - strings:
        functions:
          - basic_strings
            description: "Perform a basic strings scan using strings."
            parameters:
              target:
                type: str
                description: "The target file to scan."
          - min_length_strings
            description: "Perform a min length strings scan using strings."
            parameters:
              target:
                type: str
                description: "The target file to scan."
          - offset_strings
            description: "Perform an offset strings scan using strings."
            parameters:
              target:
                type: str
                description: "The target file to scan."
          - encoding_strings
            description: "Perform an encoding strings scan using strings."
            parameters:
              target:
                type: str
                description: "The target file to scan."
                
    - objdump:
        functions:
          - file_headers
            description: "Perform a file headers scan using objdump."
            parameters:
              target:
                type: str
                description: "The target file to scan."
          - disassemble
            description: "Perform a disassemble scan using objdump."
            parameters:
              target:
                type: str
                description: "The target file to scan."
          - symbol_table
            description: "Perform a symbol table scan using objdump."
            parameters:
              target:
                type: str
                description: "The target file to scan."
          - section_headers
            description: "Perform a section headers scan using objdump."
            parameters:
              target:
                type: str
                description: "The target file to scan."
          - full_contents
            description: "Perform a full contents scan using objdump."
            parameters:
              target:
                type: str
                description: "The target file to scan."
                
    - nm:
        functions:
          - basic_symbols
            description: "Perform a basic symbols scan using nm."
            parameters:
              target:
                type: str
                description: "The target file to scan."
          - dynamic_symbols
            description: "Perform a dynamic symbols scan using nm."
            parameters:
              target:
                type: str
                description: "The target file to scan." 
          - demangle_symbols
            description: "Perform a demangle symbols scan using nm."
            parameters:
              target:
                type: str
                description: "The target file to scan."   
          - numeric_sort
            description: "Perform a numeric sort scan using nm."
            parameters:
              target:
                type: str
                description: "The target file to scan." 
          - size_sort
            description: "Perform a size sort scan using nm."
            parameters:
              target:
                type: str
                description: "The target file to scan."   
          - undefined_symbols
            description: "Perform an undefined symbols scan using nm."
            parameters:
              target:
                type: str
                description: "The target file to scan."     
                
    - wireshark:
        functions:
          - capture_live
            description: "Perform a capture live scan using wireshark."
            parameters: 
              interface:
                type: str
                description: "The network interface to capture live traffic on."
              duration:
                type: int
                description: "The duration of the capture in seconds."  
          - analyze_pcap
            description: "Perform an analyze pcap scan using wireshark."
            parameters:
              pcap_file:
                type: str
                description: "The pcap file to analyze."  
          - extract_http
            description: "Perform an extract http scan using wireshark."
            parameters:
              pcap_file:
                type: str
                description: "The pcap file to analyze."  
          - protocol_hierarchy
            description: "Perform a protocol hierarchy scan using wireshark."
            parameters:
              pcap_file:
                type: str
                description: "The pcap file to analyze."    
          - conversation_statistics
            description: "Perform a conversation statistics scan using wireshark."
            parameters:
              pcap_file:
                type: str
                description: "The pcap file to analyze."      
          - expert_info
            description: "Perform an expert info scan using wireshark."
            parameters:
              pcap_file:
                type: str
                description: "The pcap file to analyze."  
```

--------------------------------------------------------------------------------
/src/mcp_server.py:
--------------------------------------------------------------------------------

```python
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
import os
from src.kali_mcps.nmap.actions import basic_scan_action, intense_scan_action, stealth_scan_action, quick_scan_action, vulnerability_scan_action
from src.kali_mcps.nm.actions import basic_symbols_action, dynamic_symbols_action, demangle_symbols_action, numeric_sort_action, size_sort_action, undefined_symbols_action
from src.kali_mcps.objdump.actions import file_headers_action, disassemble_action, symbol_table_action, section_headers_action, full_contents_action
from src.kali_mcps.strings.actions import basic_strings_action, min_length_strings_action, offset_strings_action, encoding_strings_action
from src.kali_mcps.wireshark.actions import capture_live_action, analyze_pcap_action, extract_http_action, protocol_hierarchy_action, conversation_statistics_action, expert_info_action
from src.kali_mcps.traceroute.actions import traceroute_action
mcp = FastMCP("kali-tools")


IS_SAFE = os.environ.get("IS_SAFE", "false").lower() == "true"  # is safe mode, if true, the command will be executed in the sandbox

# nmap start
@mcp.tool()
def basic_scan(target: str):
    """Perform a basic network scan using nmap.

    Args:
        target (str): The target IP address or hostname to scan.

    Returns:
        str: The output results of the basic scan.
    """
    return basic_scan_action(target)

@mcp.tool()
def intense_scan(target: str):
    """Perform an intense network scan using nmap.

    Args:
        target (str): The target IP address or hostname to scan.

    Returns:
        str: The output results of the intense scan.
    """
    return intense_scan_action(target)

@mcp.tool()
def stealth_scan(target: str):
    """Perform a stealth network scan using nmap.

    Args:
        target (str): The target IP address or hostname to scan.

    Returns:
        str: The output results of the stealth scan.
    """
    return stealth_scan_action(target)

@mcp.tool()
def quick_scan(target: str):
    """Perform a quick network scan using nmap.

    Args:
        target (str): The target IP address or hostname to scan.

    Returns:
        str: The output results of the quick scan.
    """
    return quick_scan_action(target)

@mcp.tool()
def vulnerability_scan(target: str):
    """Perform a vulnerability scan using nmap.

    Args:
        target (str): The target IP address or hostname to scan.

    Returns:
        str: The output results of the vulnerability scan.
    """
    return vulnerability_scan_action(target)
# nmap end

# nm start
@mcp.tool()
def basic_symbols(target: str):
    """Perform a basic symbol listing using nm.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the basic symbol listing.
    """
    return basic_symbols_action(target)

@mcp.tool()
def dynamic_symbols(target: str):
    """Perform a dynamic symbol listing using nm.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the dynamic symbol listing.
    """
    return dynamic_symbols_action(target)

@mcp.tool()
def demangle_symbols(target: str):
    """Perform a demangling of symbols using nm.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the demangling of symbols.
    """
    return demangle_symbols_action(target)

@mcp.tool()
def numeric_sort(target: str):
    """Perform a numeric sort of symbols using nm.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the numeric sort of symbols.
    """
    return numeric_sort_action(target)

@mcp.tool()
def size_sort(target: str):
    """Perform a size sort of symbols using nm.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the size sort of symbols.
    """
    return size_sort_action(target)

@mcp.tool()
def undefined_symbols(target: str):
    """Perform an undefined symbol listing using nm.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the undefined symbol listing.
    """
    return undefined_symbols_action(target)
# nm end

# objdump start
@mcp.tool()
def file_headers(target: str):
    """Perform a file header listing using objdump.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the file header listing.
    """
    return file_headers_action(target)

@mcp.tool()
def disassemble(target: str):
    """Perform a disassembly of the target file using objdump.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the disassembly of the target file.
    """
    return disassemble_action(target)

@mcp.tool()
def symbol_table(target: str):
    """Perform a symbol table listing using objdump.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the symbol table listing.
    """
    return symbol_table_action(target)

@mcp.tool()
def section_headers(target: str):
    """Perform a section header listing using objdump.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the section header listing.
    """
    return section_headers_action(target)

@mcp.tool()
def full_contents(target: str):
    """Perform a full contents listing using objdump.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the full contents listing.
    """
    return full_contents_action(target)
# objdump end

# strings start
@mcp.tool()
def basic_strings(target: str):
    """Perform a basic string listing using strings.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the basic string listing.
    """
    return basic_strings_action(target)

@mcp.tool()
def min_length_strings(target: str):
    """Perform a minimum length string listing using strings.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the minimum length string listing.
    """
    return min_length_strings_action(target)

@mcp.tool()
def offset_strings(target: str):
    """Perform an offset string listing using strings.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the offset string listing.
    """
    return offset_strings_action(target)

@mcp.tool()
def encoding_strings(target: str):
    """Perform an encoding string listing using strings.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the encoding string listing.
    """
    return encoding_strings_action(target)
# strings end

# tshark start
@mcp.tool()
def capture_live(interface: str, duration: int = 30, filter: str = ""):
    """Perform a live capture of network traffic using tshark.

    Args:
        interface (str): The network interface to capture from.
        duration (int): The duration of the capture in seconds.
        filter (str): The filter to apply to the capture.

    Returns:
        str: The output results of the live capture of network traffic.
    """
    return capture_live_action(interface, duration, filter)


@mcp.tool()
def analyze_pcap(pcap_file: str, display_filter: str = ""):
    """Perform an analysis of a pcap file using tshark.

    Args:
        pcap_file (str): The path to the pcap file to analyze.
        display_filter (str): The filter to apply to the analysis.

    Returns:
        str: The output results of the analysis of the pcap file.
    """
    return analyze_pcap_action(pcap_file, display_filter)

@mcp.tool()
def extract_http(pcap_file: str):
    """Perform an HTTP extraction from a pcap file using tshark.

    Args:
        pcap_file (str): The path to the pcap file to extract HTTP from.    

    Returns:
        str: The output results of the HTTP extraction from the pcap file.
    """
    return extract_http_action(pcap_file)  

@mcp.tool()
def protocol_hierarchy(pcap_file: str):
    """Perform a protocol hierarchy listing using tshark.

    Args:
        pcap_file (str): The path to the pcap file to analyze.  

    Returns:
        str: The output results of the protocol hierarchy listing.
    """
    return protocol_hierarchy_action(pcap_file)    

@mcp.tool()
def conversation_statistics(pcap_file: str):
    """Perform a conversation statistics listing using tshark.

    Args:
        pcap_file (str): The path to the pcap file to analyze.  

    Returns:
        str: The output results of the conversation statistics listing.
    """
    return conversation_statistics_action(pcap_file)   

@mcp.tool() 
def expert_info(pcap_file: str):
    """Perform an expert information listing using tshark.

    Args:
        pcap_file (str): The path to the pcap file to analyze.  

    Returns:
        str: The output results of the expert information listing.
    """
    return expert_info_action(pcap_file)   
# tshark end

# traceroute start
@mcp.tool()
def traceroute(target: str):
    """
    Perform a traceroute to the target.

    Args:
        target (str): The target IP address or hostname to traceroute to.

    Returns:
        str: The output results of the traceroute.  
    """
    return traceroute_action(target)
# traceroute end

# run server, using stdio transport
if __name__ == "__main__":
    print("Starting server is running")
    mcp.run(transport="stdio")
```