#
tokens: 10803/50000 21/21 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── app.py
├── config.yaml
├── Dockerfile
├── LICENSE
├── README.md
├── requirements.txt
├── requirments.txt
├── setup.py
└── src
    ├── kali_mcps
    │   ├── __init__.py
    │   ├── base
    │   │   ├── __init__.py
    │   │   └── kali_command.py
    │   ├── manual.yaml
    │   ├── nm
    │   │   └── actions.py
    │   ├── nmap
    │   │   ├── __init__.py
    │   │   └── actions.py
    │   ├── objdump
    │   │   └── actions.py
    │   ├── strings
    │   │   └── actions.py
    │   ├── traceroute
    │   │   └── actions.py
    │   └── wireshark
    │       └── actions.py
    ├── mcp_server.py
    └── sandbox.py
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
#   For a library or package, you might want to ignore these files since the code is
#   intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
#   However, in case of collaboration, if having platform-specific dependencies or dependencies
#   having no cross-platform support, pipenv may install dependencies that don't work, or not
#   install all needed dependencies.
#Pipfile.lock

# UV
#   Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#uv.lock

# poetry
#   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
#   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
#   in version control.
#   https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
#  and can be added to the global gitignore or merged into this file.  For a more nuclear
#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# Ruff stuff:
.ruff_cache/

# PyPI configuration file
.pypirc
sandbox_expire/
logs/
tests/

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# awesome_kali_MCPServers

Welcome to the **awsome_kali_MCPServers** repository, a collection of MCP servers specifically tailored for Kali Linux. These servers are designed to empower AI Agents in reverse engineering and security testing, offering a range of powerful features to enhance your workflows. 

## Description

**awsome kali MCPServers** provides flexible network analysis, target sniffing, traffic analysis, binary understanding, and automation capabilities, all aimed at enhancing AI-driven processes in the realm of cybersecurity. Whether you are a seasoned security professional or just starting your journey into the world of Kali Linux, these servers are here to support you.

## Features

- **Network Analysis:** Gain insights into network activity, traffic patterns, and potential vulnerabilities.
- **Target Sniffing:** Identify and capture data packets to analyze potential threats or weaknesses.
- **Traffic Analysis:** Monitor and analyze network traffic for suspicious activity or irregularities.
- **Binary Understanding:** Decode and interpret binary data for deeper insights into security issues.
- **Automation:** Streamline repetitive tasks and workflows to increase efficiency and productivity.

## How to Use

To get started with **awsome kali MCPServers**, simply visit the [Releases page](https://github.com/VERMAXVR/awsome_kali_MCPServers/releases) to download the necessary files. Once downloaded, follow the execution instructions to set up the servers on your Kali Linux machine. 

![Download Servers](https://img.shields.io/badge/Download%20Servers-Get%20Started-brightgreen)

## Topics

This repository covers a range of topics related to cybersecurity, AI integration, and Kali Linux usage. The main topics include:
- Agent
- Kali Linux
- LLM
- MCP Server
- Security
- Tools

## Get in Touch

If you have any questions, feedback, or suggestions regarding **awsome kali MCPServers**, feel free to reach out. We are here to support you in your cybersecurity endeavors.

Stay secure and keep exploring with **awsome kali MCPServers**!
```

--------------------------------------------------------------------------------
/setup.py:
--------------------------------------------------------------------------------

```python
 
```

--------------------------------------------------------------------------------
/src/kali_mcps/base/__init__.py:
--------------------------------------------------------------------------------

```python
"""
Base package for Kali commands
""" 
```

--------------------------------------------------------------------------------
/src/kali_mcps/nmap/__init__.py:
--------------------------------------------------------------------------------

```python
"""
Nmap package for network scanning
""" 
```

--------------------------------------------------------------------------------
/config.yaml:
--------------------------------------------------------------------------------

```yaml
kalilinux:
  name: kali-tools
  image: kalimcps:latest # docker built -t kalimcps:latest .


```

--------------------------------------------------------------------------------
/src/kali_mcps/__init__.py:
--------------------------------------------------------------------------------

```python
"""
Kali MCP Servers package
"""
from src.kali_mcps.base.kali_command import CommandRunner
from src.kali_mcps.objdump.actions import (
    file_headers_action,
    disassemble_action,
    symbol_table_action,
    section_headers_action,
)

__all__ = [
    "CommandRunner",
    "file_headers_action",
    "disassemble_action",
    "symbol_table_action",
    "section_headers_action",
]
```

--------------------------------------------------------------------------------
/src/kali_mcps/traceroute/actions.py:
--------------------------------------------------------------------------------

```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner

class TracerouteCommand(CommandRunner):
    def __init__(self):
        super().__init__("traceroute", network_enabled=True, memory_limit="1g", timeout=120)

def traceroute_action(target: str):
    """
    Traceroute to the target
    """
    cmd = TracerouteCommand()
    command = ["traceroute", target]
    return cmd.execute(command)


if __name__ == "__main__":
    print(traceroute_action("8.8.8.8"))
```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM python:3.12-slim-bookworm

WORKDIR /app

ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update && \
    apt-get install -y \
        iputils-ping \
        net-tools \
        dnsutils \
        nmap \
        binutils \
        traceroute \
        tshark && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

COPY requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt 


COPY . .

ENV PYTHONPATH=/app
ENV IS_SAFE=false

# set run cmd
ENTRYPOINT ["python", "app.py"]
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
annotated-types==0.7.0
anyio==4.9.0
certifi==2025.1.31
charset-normalizer==3.4.1
click==8.1.8
docker==7.1.0
h11==0.14.0
httpcore==1.0.7
httpx==0.28.1
httpx-sse==0.4.0
idna==3.10
iniconfig==2.1.0
loguru==0.7.3
mcp==1.6.0
packaging==24.2
pluggy==1.5.0
pydantic==2.11.1
pydantic-settings==2.8.1
pydantic_core==2.33.0
pytest==8.3.5
pytest-asyncio==0.26.0
python-dotenv==1.1.0
requests==2.32.3
sniffio==1.3.1
sse-starlette==2.2.1
starlette==0.46.1
typing-inspection==0.4.0
typing_extensions==4.13.0
urllib3==2.3.0
uvicorn==0.34.0

```

--------------------------------------------------------------------------------
/requirments.txt:
--------------------------------------------------------------------------------

```
annotated-types==0.7.0
anyio==4.9.0
certifi==2025.1.31
charset-normalizer==3.4.1
click==8.1.8
docker==7.1.0
h11==0.14.0
httpcore==1.0.7
httpx==0.28.1
httpx-sse==0.4.0
idna==3.10
iniconfig==2.1.0
loguru==0.7.3
mcp==1.6.0
packaging==24.2
pluggy==1.5.0
pydantic==2.11.1
pydantic-settings==2.8.1
pydantic_core==2.33.0
pytest==8.3.5
pytest-asyncio==0.26.0
python-dotenv==1.1.0
requests==2.32.3
sniffio==1.3.1
sse-starlette==2.2.1
starlette==0.46.1
typing-inspection==0.4.0
typing_extensions==4.13.0
urllib3==2.3.0
uvicorn==0.34.0

```

--------------------------------------------------------------------------------
/app.py:
--------------------------------------------------------------------------------

```python
from src.mcp_server import mcp 
import argparse  # 添加 argparse 模块
import docker
from mcp.server.models import InitializationOptions


if __name__ == "__main__":
    # create arg parser
    parser = argparse.ArgumentParser(description='MCP Server with Kali image configuration')
    parser.add_argument('--kali-image', 
                       type=str,
                       required=False,
                       help='Specify the Kali Linux image name to use')
    
    args = parser.parse_args()
    if args.kali_image:
        image_name = args.kali_image
        # use docker lib to check if the image exists
        client = docker.from_env()
        try:
            client.images.get(image_name)
            print(f"Kali image {image_name} found")
            client.close()
        except docker.errors.ImageNotFound:
            print(f"Kali image {image_name} not found")
            exit(1)
        print(f"Using Kali image: {image_name}")
    
    print("Starting server is running")
    
    # pass image name to mcp
    mcp.run(transport="stdio")
```

--------------------------------------------------------------------------------
/src/kali_mcps/objdump/actions.py:
--------------------------------------------------------------------------------

```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner

class ObjdumpCommand(CommandRunner):
    def __init__(self):
        super().__init__("objdump", network_enabled=False, memory_limit="1g", timeout=120)

def file_headers_action(target: str) -> tuple[str, str]:
    """
    Display file headers
    For example: objdump -f /path/to/file
    """
    cmd = ObjdumpCommand()
    command = ["objdump", "-f", target]
    return cmd.execute(command)

def disassemble_action(target: str, section: str = ".text") -> tuple[str, str]:
    """
    Disassemble section (default: .text)
    For example: objdump -d -j .text /path/to/file
    """
    cmd = ObjdumpCommand()
    command = ["objdump", "-d", "-j", section, target]
    return cmd.execute(command)

def symbol_table_action(target: str) -> tuple[str, str]:
    """
    Display symbol table
    For example: objdump -t /path/to/file
    """
    cmd = ObjdumpCommand()
    command = ["objdump", "-t", target]
    return cmd.execute(command)

def section_headers_action(target: str) -> tuple[str, str]:
    """
    Display all section headers
    For example: objdump -h /path/to/file
    """
    cmd = ObjdumpCommand()
    command = ["objdump", "-h", target]
    return cmd.execute(command)

def full_contents_action(target: str) -> tuple[str, str]:
    """
    Display all information including headers and disassembly
    For example: objdump -x /path/to/file
    """
    cmd = ObjdumpCommand()
    command = ["objdump", "-x", target]
    return cmd.execute(command)

if __name__ == "__main__":
    # Test example
    target_file = "/bin/ls"
    print(file_headers_action(target_file))

```

--------------------------------------------------------------------------------
/src/kali_mcps/nmap/actions.py:
--------------------------------------------------------------------------------

```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner

class NmapCommand(CommandRunner):
    def __init__(self):
        super().__init__("nmap", 
                        network_enabled=True,  # nmap需要网络访问
                        memory_limit="2g",     # 需要更多内存
                        timeout=300)           # 需要更长的超时时间

def basic_scan_action(target: str) -> tuple[str, str]:
    """
    Basic scan
    For example: nmap 192.168.1.1
    """
    cmd = NmapCommand()
    command = ["nmap", target]
    return cmd.execute(command)

def intense_scan_action(target: str) -> tuple[str, str]:
    """
    Intense scan (-T4 -A)
    Includes: OS detection, version detection, script scanning, and traceroute
    """
    cmd = NmapCommand()
    command = ["nmap", "-T4", "-A", target]
    return cmd.execute(command)

def stealth_scan_action(target: str) -> tuple[str, str]:
    """
    SYN scan (-sS)
    Half-open scan, more stealthy
    Requires root privileges
    """
    cmd = NmapCommand()
    command = ["nmap", "-sS", target]
    return cmd.execute(command)

def quick_scan_action(target: str) -> tuple[str, str]:
    """
    Quick scan (-T4 -F)
    Only scans the most common ports
    """
    cmd = NmapCommand()
    command = ["nmap", "-T4", "-F", target]
    return cmd.execute(command)

def vulnerability_scan_action(target: str) -> tuple[str, str]:
    """
    Vulnerability scan (-sV --script vuln)
    Uses vulnerability detection scripts
    """
    cmd = NmapCommand()
    command = ["nmap", "-sV", "--script", "vuln", target]
    return cmd.execute(command)

if __name__ == "__main__":
    # Test example
    print(quick_scan_action("10.1.1.106"))

```

--------------------------------------------------------------------------------
/src/kali_mcps/nm/actions.py:
--------------------------------------------------------------------------------

```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner

class NmCommand(CommandRunner):
    def __init__(self):
        super().__init__("nm", network_enabled=False, memory_limit="1g", timeout=120)

def basic_symbols_action(target: str) -> tuple[str, str]:
    """
    Basic symbol listing
    For example: nm /path/to/file
    """ 
    cmd = NmCommand()
    command = ["nm", target]
    return cmd.execute(command)

def dynamic_symbols_action(target: str) -> tuple[str, str]:
    """
    Display dynamic symbols
    For example: nm -D /path/to/file
    """
    cmd = NmCommand()
    command = ["nm", "-D", target]
    return cmd.execute(command)

def demangle_symbols_action(target: str) -> tuple[str, str]:
    """
    Demangle C++ symbols
    For example: nm -C /path/to/file
    """
    cmd = NmCommand()
    command = ["nm", "-C", target]
    return cmd.execute(command)

def numeric_sort_action(target: str) -> tuple[str, str]:
    """
    Sort symbols numerically by address
    For example: nm -n /path/to/file
    """
    cmd = NmCommand()
    command = ["nm", "-n", target]
    return cmd.execute(command)

def size_sort_action(target: str) -> tuple[str, str]:
    """
    Sort symbols by size
    For example: nm -S /path/to/file
    """
    cmd = NmCommand()
    command = ["nm", "-S", target]
    return cmd.execute(command)

def undefined_symbols_action(target: str) -> tuple[str, str]:
    """
    Display only undefined symbols
    For example: nm -u /path/to/file
    """
    cmd = NmCommand()
    command = ["nm", "-u", target]
    return cmd.execute(command)

if __name__ == "__main__":
    # Test example
    target_file = "/bin/ls"
    print(basic_symbols_action(target_file))

```

--------------------------------------------------------------------------------
/src/kali_mcps/strings/actions.py:
--------------------------------------------------------------------------------

```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner

class StringsCommand(CommandRunner):
    def __init__(self):
        super().__init__("strings", network_enabled=False, memory_limit="1g", timeout=120)

def basic_strings_action(target: str, input_file: bytes = None) -> tuple[str, str]:
    """
    Basic strings analysis
    For example: strings /path/to/file
    """
    cmd = StringsCommand()
    command = ["strings", target]
    return cmd.execute(command, input_files={"input_file": "/tmp/input_file"})

def min_length_strings_action(target: str, length: int = 6) -> tuple[str, str]:
    """
    Strings analysis with specified minimum length
    For example: strings -n 6 /path/to/file
    """
    cmd = StringsCommand()
    command = ["strings", "-n", str(length), target]
    return cmd.execute(command)

def offset_strings_action(target: str, format: str = "x") -> tuple[str, str]:
    """
    Strings analysis showing string offsets
    format can be: 'd' (decimal), 'o' (octal), 'x' (hexadecimal)
    For example: strings -t x /path/to/file
    """
    cmd = StringsCommand()
    command = ["strings", "-t", format, target]
    return cmd.execute(command)

def encoding_strings_action(target: str, encoding: str = "S") -> tuple[str, str]:
    """
    Strings analysis with specified character encoding
    encoding can be: 
    - 's' = 7-bit string
    - 'S' = 8-bit string
    - 'b' = 16-bit big-endian
    - 'l' = 16-bit little-endian
    For example: strings -e S /path/to/file
    """
    cmd = StringsCommand()
    command = ["strings", "-e", encoding, target]
    return cmd.execute(command)

if __name__ == "__main__":
    # Test example
    target_file = "/bin/ls"
    input_file = bytes.fromhex("68656c6c6f20776f726c64")
    print(basic_strings_action(target_file, {"input_file": input_file , "output_file": "/tmp/output_file"}))

```

--------------------------------------------------------------------------------
/src/kali_mcps/wireshark/actions.py:
--------------------------------------------------------------------------------

```python
import asyncio
from src.kali_mcps.base.kali_command import CommandRunner

class TsharkCommand(CommandRunner):
    def __init__(self):
        super().__init__("tshark", 
                        network_enabled=True,  # 需要网络访问
                        memory_limit="2g",     # 需要更多内存
                        timeout=300)           # 需要更长的超时时间

def capture_live_action(interface: str, duration: int = 30, filter: str = "") -> tuple[str, str]:
    """
    Capture live traffic from network interface
    For example: tshark -i eth0 -a duration:30 -f "port 80"
    """
    cmd = TsharkCommand()
    command = ["tshark", "-i", interface, "-a", f"duration:{duration}"]
    if filter:
        command.extend(["-f", filter])
    return cmd.execute(command)

def analyze_pcap_action(pcap_file: str, display_filter: str = "") -> tuple[str, str]:
    """
    Analyze existing pcap file
    For example: tshark -r file.pcap -Y "http"
    """
    cmd = TsharkCommand()
    command = ["tshark", "-r", pcap_file]
    if display_filter:
        command.extend(["-Y", display_filter])
    return cmd.execute(command)

def extract_http_action(pcap_file: str) -> tuple[str, str]:
    """
    Extract HTTP objects from pcap file
    For example: tshark -r file.pcap -Y "http" -T fields -e http.request.method -e http.request.uri
    """
    cmd = TsharkCommand()
    command = [
        "tshark", "-r", pcap_file,
        "-Y", "http",
        "-T", "fields",
        "-e", "http.request.method",
        "-e", "http.request.uri"
    ]
    return cmd.execute(command)

def protocol_hierarchy_action(pcap_file: str) -> tuple[str, str]:
    """
    Show protocol hierarchy statistics
    For example: tshark -r file.pcap -q -z io,phs
    """
    cmd = TsharkCommand()
    command = ["tshark", "-r", pcap_file, "-q", "-z", "io,phs"]
    return cmd.execute(command)

def conversation_statistics_action(pcap_file: str) -> tuple[str, str]:
    """
    Show conversation statistics
    For example: tshark -r file.pcap -q -z conv,ip
    """
    cmd = TsharkCommand()
    command = ["tshark", "-r", pcap_file, "-q", "-z", "conv,ip"]
    return cmd.execute(command)

def expert_info_action(pcap_file: str) -> tuple[str, str]:
    """
    Show expert information (errors, warnings, notes)
    For example: tshark -r file.pcap -q -z expert
    """
    cmd = TsharkCommand()
    command = ["tshark", "-r", pcap_file, "-q", "-z", "expert"]
    return cmd.execute(command)

if __name__ == "__main__":
    # Test example
    pcap_file = "capture.pcap"
    print(protocol_hierarchy_action(pcap_file))

```

--------------------------------------------------------------------------------
/src/kali_mcps/base/kali_command.py:
--------------------------------------------------------------------------------

```python
import subprocess
import os
import asyncio
from src.sandbox import create_sandbox_client, SandboxSettings, SandboxTimeoutError, SandboxError

class CommandRunner:
    """Base class for executing Kali commands"""
    
    def __init__(self, command_name: str, network_enabled: bool = False, 
                 memory_limit: str = "1g", timeout: int = 120):
        """
        Initialize CommandRunner
        Args:
            command_name: Name of the command (e.g. 'objdump', 'nm', etc.)
            network_enabled: Whether network access is needed
            memory_limit: Memory limit for sandbox
            timeout: Timeout in seconds
        """
        self.command_name = command_name
        self.network_enabled = network_enabled
        self.memory_limit = memory_limit
        self.timeout = timeout
        self.IS_SAFE = os.environ.get("IS_SAFE", "false").lower() == "true"

    def run_command(self, command: list) -> tuple[str, str]:
        """Execute command and return output results"""
        try:
            process = subprocess.Popen(
                command,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                universal_newlines=True
            )
            stdout, stderr = process.communicate()
            return stdout, stderr
        except Exception as e:
            return "", str(e)

    async def run_with_sandbox(self, command: list, input_files: dict = None) -> tuple[str, str]:
        """
        Execute command in Kali sandbox
        Args:
            command: Command to execute
            input_files: Dict of {local_path: container_path} for files to copy into container
        """
        kali_config = SandboxSettings(
            image="kalilinux/kali-rolling",
            memory_limit=self.memory_limit,
            cpu_limit=1.0,
            network_enabled=self.network_enabled,
            timeout=self.timeout
        )
        client = create_sandbox_client()
        try:
            await client.create(config=kali_config)
            
            # 如果有输入文件,先复制到容器中
            if input_files:
                for local_path, container_path in input_files.items():
                    await client.copy_to_container(local_path, container_path)
            
            # 执行命令
            cmd_str = " ".join(command)
            stdout = await client.run_command(cmd_str)
            return stdout, ""
        except Exception as e:
            return "", str(e)
        finally:
            await client.cleanup()

    async def safe_execute_kali_command(self, command: list, input_files: dict = None) -> tuple[str, str]:
        """
        Safely execute Kali command
        Args:
            command: Command to execute
            input_files: Dict of {local_path: container_path} for files to copy into container
        """
        try:
            result = await self.run_with_sandbox(command, input_files)
            return result
        except SandboxTimeoutError:
            return "", "Command execution timed out"
        except SandboxError as e:
            return "", f"Sandbox execution error: {str(e)}"
        except Exception as e:
            return "", f"Unknown error: {str(e)}"

    def execute(self, command: list, input_files: dict = None) -> tuple[str, str]:
        """
        Execute command with safety check
        Args:
            command: Command to execute
            input_files: Dict of {local_path: container_path} for files to copy into container
        """
        # if self.IS_SAFE:
        #     return await self.safe_execute_kali_command(command, input_files)
        # else:
        return self.run_command(command)  
```

--------------------------------------------------------------------------------
/src/sandbox.py:
--------------------------------------------------------------------------------

```python
import asyncio
import docker
import tarfile
import os
import io
from typing import Optional, Dict, Any
from dataclasses import dataclass

class SandboxSettings:
    """sandbox configuration settings"""
    def __init__(
        self,
        image: str = "kalimcps:latest",
        memory_limit: str = "2g",
        cpu_limit: float = 1.0,
        network_enabled: bool = True,
        network_mode: str = "bridge",
        timeout: int = 300
    ):
        self.image = image
        self.memory_limit = memory_limit
        self.cpu_limit = cpu_limit
        self.network_enabled = network_enabled
        self.network_mode = network_mode
        self.timeout = timeout

class SandboxError(Exception):
    """Base exception for sandbox-related errors."""


class SandboxTimeoutError(SandboxError):
    """Exception raised when a sandbox operation times out."""


class SandboxResourceError(SandboxError):
    """Exception raised for resource-related errors."""


class SandboxClient:
    """Sandbox client, providing only container creation and cleanup functionality"""
    
    def __init__(self):
        self.client = docker.from_env()
        self.container = None

    async def create(self, config: SandboxSettings) -> None:
        """Create and start the container"""
        try:
            # prepare container configuration
            container_config = {
                "image": config.image,
                "detach": True,
                "mem_limit": config.memory_limit,
                "nano_cpus": int(config.cpu_limit * 1e9),
                "network_mode": config.network_mode if config.network_enabled else "none",
            }
            
            # create and start container
            self.container = self.client.containers.run(**container_config)
            print(f"Container created: {self.container.id}")
        except Exception as e:
            raise SandboxError(f"Failed to create container: {str(e)}")
    
    async def run_command(self, command: str) -> str:
        """Execute a command in the container"""
        if not self.container:
            raise SandboxError("Container not created")
        
        try:
            exec_result = self.container.exec_run(command, tty=True)
            return exec_result.output.decode('utf-8')
        except Exception as e:
            raise SandboxError(f"Failed to execute command: {str(e)}")

    async def copy_to_container(self, source_path: str, container_path: str) -> None:
        """
        Copy a file from host to container
        Args:
            source_path: Path to the file on host
            container_path: Path where to put the file in container
        """
        if not self.container:
            raise SandboxError("Container not created")
        
        try:
            # create a memory-based tar file
            tar_stream = io.BytesIO()
            with tarfile.open(fileobj=tar_stream, mode='w') as tar:
                # add file to tar
                tar.add(source_path, arcname=os.path.basename(container_path))
            
            tar_stream.seek(0)
            # copy to container
            self.container.put_archive(
                path=os.path.dirname(container_path),
                data=tar_stream
            )
            print(f"File copied to container: {container_path}")
        except Exception as e:
            raise SandboxError(f"Failed to copy file to container: {str(e)}")

    async def copy_from_container(self, container_path: str, dest_path: str) -> None:
        """
        Copy a file from container to host
        Args:
            container_path: Path to the file in container
            dest_path: Path where to put the file on host
        """
        if not self.container:
            raise SandboxError("Container not created")
        
        try:
            # get the tar stream of the file from container
            bits, stat = self.container.get_archive(container_path)
            
            # create the destination directory (if it doesn't exist)
            os.makedirs(os.path.dirname(dest_path), exist_ok=True)
            
            # write the tar stream to a temporary file
            with open(dest_path, 'wb') as f:
                for chunk in bits:
                    f.write(chunk)
            print(f"File copied from container: {dest_path}")
        except Exception as e:
            raise SandboxError(f"Failed to copy file from container: {str(e)}")
    
    async def cleanup(self) -> None:
        """Clean up and remove the container"""
        if self.container:
            try:
                self.container.stop()
                self.container.remove()
                print(f"Container cleaned up: {self.container.id}")
                self.container = None
            except Exception as e:
                print(f"Error cleaning up container: {str(e)}")

def create_sandbox_client() -> SandboxClient:
    """Create a factory function for the sandbox client"""
    return SandboxClient()



# example
async def run_in_sandbox(command: str) -> str:
    """Run a command in the sandbox"""
    config = SandboxSettings()
    client = create_sandbox_client()
    
    try:
        await client.create(config=config)
        result = await client.run_command(command)
        return result
    except SandboxTimeoutError:
        return "Command execution timed out"
    except SandboxError as e:
        return f"Sandbox execution error: {str(e)}"
    except Exception as e:
        return f"Unknown error: {str(e)}"
    finally:
        await client.cleanup()

# if you run this script directly
if __name__ == "__main__":
    async def main():
        print("Starting sandbox test...")

        result = await run_in_sandbox("echo 'Sandbox test successful'")
        print(result)
    
    asyncio.run(main()) 
```

--------------------------------------------------------------------------------
/src/kali_mcps/manual.yaml:
--------------------------------------------------------------------------------

```yaml

mcp:
  tools:
    - nmap:
        functions:
          - basic_scan
            description: "Perform a basic network scan using nmap."
            parameters:
              target:
                type: str
                description: "The target IP address or hostname to scan."
          - intense_scan
            description: "Perform an intense network scan using nmap."
            parameters:
              target:
                type: str
                description: "The target IP address or hostname to scan."
          - stealth_scan
            description: "Perform a stealth network scan using nmap."
            parameters:
              target:
                type: str
                description: "The target IP address or hostname to scan."
          - quick_scan
            description: "Perform a quick network scan using nmap."
            parameters:
              target:
                type: str
                description: "The target IP address or hostname to scan."
          - vulnerability_scan
            description: "Perform a vulnerability scan using nmap."
            parameters:
              target:
                type: str
                description: "The target IP address or hostname to scan."
                
    - strings:
        functions:
          - basic_strings
            description: "Perform a basic strings scan using strings."
            parameters:
              target:
                type: str
                description: "The target file to scan."
          - min_length_strings
            description: "Perform a min length strings scan using strings."
            parameters:
              target:
                type: str
                description: "The target file to scan."
          - offset_strings
            description: "Perform an offset strings scan using strings."
            parameters:
              target:
                type: str
                description: "The target file to scan."
          - encoding_strings
            description: "Perform an encoding strings scan using strings."
            parameters:
              target:
                type: str
                description: "The target file to scan."
                
    - objdump:
        functions:
          - file_headers
            description: "Perform a file headers scan using objdump."
            parameters:
              target:
                type: str
                description: "The target file to scan."
          - disassemble
            description: "Perform a disassemble scan using objdump."
            parameters:
              target:
                type: str
                description: "The target file to scan."
          - symbol_table
            description: "Perform a symbol table scan using objdump."
            parameters:
              target:
                type: str
                description: "The target file to scan."
          - section_headers
            description: "Perform a section headers scan using objdump."
            parameters:
              target:
                type: str
                description: "The target file to scan."
          - full_contents
            description: "Perform a full contents scan using objdump."
            parameters:
              target:
                type: str
                description: "The target file to scan."
                
    - nm:
        functions:
          - basic_symbols
            description: "Perform a basic symbols scan using nm."
            parameters:
              target:
                type: str
                description: "The target file to scan."
          - dynamic_symbols
            description: "Perform a dynamic symbols scan using nm."
            parameters:
              target:
                type: str
                description: "The target file to scan." 
          - demangle_symbols
            description: "Perform a demangle symbols scan using nm."
            parameters:
              target:
                type: str
                description: "The target file to scan."   
          - numeric_sort
            description: "Perform a numeric sort scan using nm."
            parameters:
              target:
                type: str
                description: "The target file to scan." 
          - size_sort
            description: "Perform a size sort scan using nm."
            parameters:
              target:
                type: str
                description: "The target file to scan."   
          - undefined_symbols
            description: "Perform an undefined symbols scan using nm."
            parameters:
              target:
                type: str
                description: "The target file to scan."     
                
    - wireshark:
        functions:
          - capture_live
            description: "Perform a capture live scan using wireshark."
            parameters: 
              interface:
                type: str
                description: "The network interface to capture live traffic on."
              duration:
                type: int
                description: "The duration of the capture in seconds."  
          - analyze_pcap
            description: "Perform an analyze pcap scan using wireshark."
            parameters:
              pcap_file:
                type: str
                description: "The pcap file to analyze."  
          - extract_http
            description: "Perform an extract http scan using wireshark."
            parameters:
              pcap_file:
                type: str
                description: "The pcap file to analyze."  
          - protocol_hierarchy
            description: "Perform a protocol hierarchy scan using wireshark."
            parameters:
              pcap_file:
                type: str
                description: "The pcap file to analyze."    
          - conversation_statistics
            description: "Perform a conversation statistics scan using wireshark."
            parameters:
              pcap_file:
                type: str
                description: "The pcap file to analyze."      
          - expert_info
            description: "Perform an expert info scan using wireshark."
            parameters:
              pcap_file:
                type: str
                description: "The pcap file to analyze."  
```

--------------------------------------------------------------------------------
/src/mcp_server.py:
--------------------------------------------------------------------------------

```python
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
import os
from src.kali_mcps.nmap.actions import basic_scan_action, intense_scan_action, stealth_scan_action, quick_scan_action, vulnerability_scan_action
from src.kali_mcps.nm.actions import basic_symbols_action, dynamic_symbols_action, demangle_symbols_action, numeric_sort_action, size_sort_action, undefined_symbols_action
from src.kali_mcps.objdump.actions import file_headers_action, disassemble_action, symbol_table_action, section_headers_action, full_contents_action
from src.kali_mcps.strings.actions import basic_strings_action, min_length_strings_action, offset_strings_action, encoding_strings_action
from src.kali_mcps.wireshark.actions import capture_live_action, analyze_pcap_action, extract_http_action, protocol_hierarchy_action, conversation_statistics_action, expert_info_action
from src.kali_mcps.traceroute.actions import traceroute_action
mcp = FastMCP("kali-tools")


IS_SAFE = os.environ.get("IS_SAFE", "false").lower() == "true"  # is safe mode, if true, the command will be executed in the sandbox

# nmap start
@mcp.tool()
def basic_scan(target: str):
    """Perform a basic network scan using nmap.

    Args:
        target (str): The target IP address or hostname to scan.

    Returns:
        str: The output results of the basic scan.
    """
    return basic_scan_action(target)

@mcp.tool()
def intense_scan(target: str):
    """Perform an intense network scan using nmap.

    Args:
        target (str): The target IP address or hostname to scan.

    Returns:
        str: The output results of the intense scan.
    """
    return intense_scan_action(target)

@mcp.tool()
def stealth_scan(target: str):
    """Perform a stealth network scan using nmap.

    Args:
        target (str): The target IP address or hostname to scan.

    Returns:
        str: The output results of the stealth scan.
    """
    return stealth_scan_action(target)

@mcp.tool()
def quick_scan(target: str):
    """Perform a quick network scan using nmap.

    Args:
        target (str): The target IP address or hostname to scan.

    Returns:
        str: The output results of the quick scan.
    """
    return quick_scan_action(target)

@mcp.tool()
def vulnerability_scan(target: str):
    """Perform a vulnerability scan using nmap.

    Args:
        target (str): The target IP address or hostname to scan.

    Returns:
        str: The output results of the vulnerability scan.
    """
    return vulnerability_scan_action(target)
# nmap end

# nm start
@mcp.tool()
def basic_symbols(target: str):
    """Perform a basic symbol listing using nm.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the basic symbol listing.
    """
    return basic_symbols_action(target)

@mcp.tool()
def dynamic_symbols(target: str):
    """Perform a dynamic symbol listing using nm.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the dynamic symbol listing.
    """
    return dynamic_symbols_action(target)

@mcp.tool()
def demangle_symbols(target: str):
    """Perform a demangling of symbols using nm.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the demangling of symbols.
    """
    return demangle_symbols_action(target)

@mcp.tool()
def numeric_sort(target: str):
    """Perform a numeric sort of symbols using nm.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the numeric sort of symbols.
    """
    return numeric_sort_action(target)

@mcp.tool()
def size_sort(target: str):
    """Perform a size sort of symbols using nm.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the size sort of symbols.
    """
    return size_sort_action(target)

@mcp.tool()
def undefined_symbols(target: str):
    """Perform an undefined symbol listing using nm.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the undefined symbol listing.
    """
    return undefined_symbols_action(target)
# nm end

# objdump start
@mcp.tool()
def file_headers(target: str):
    """Perform a file header listing using objdump.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the file header listing.
    """
    return file_headers_action(target)

@mcp.tool()
def disassemble(target: str):
    """Perform a disassembly of the target file using objdump.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the disassembly of the target file.
    """
    return disassemble_action(target)

@mcp.tool()
def symbol_table(target: str):
    """Perform a symbol table listing using objdump.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the symbol table listing.
    """
    return symbol_table_action(target)

@mcp.tool()
def section_headers(target: str):
    """Perform a section header listing using objdump.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the section header listing.
    """
    return section_headers_action(target)

@mcp.tool()
def full_contents(target: str):
    """Perform a full contents listing using objdump.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the full contents listing.
    """
    return full_contents_action(target)
# objdump end

# strings start
@mcp.tool()
def basic_strings(target: str):
    """Perform a basic string listing using strings.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the basic string listing.
    """
    return basic_strings_action(target)

@mcp.tool()
def min_length_strings(target: str):
    """Perform a minimum length string listing using strings.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the minimum length string listing.
    """
    return min_length_strings_action(target)

@mcp.tool()
def offset_strings(target: str):
    """Perform an offset string listing using strings.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the offset string listing.
    """
    return offset_strings_action(target)

@mcp.tool()
def encoding_strings(target: str):
    """Perform an encoding string listing using strings.

    Args:
        target (str): The target file or executable to analyze.

    Returns:
        str: The output results of the encoding string listing.
    """
    return encoding_strings_action(target)
# strings end

# tshark start
@mcp.tool()
def capture_live(interface: str, duration: int = 30, filter: str = ""):
    """Perform a live capture of network traffic using tshark.

    Args:
        interface (str): The network interface to capture from.
        duration (int): The duration of the capture in seconds.
        filter (str): The filter to apply to the capture.

    Returns:
        str: The output results of the live capture of network traffic.
    """
    return capture_live_action(interface, duration, filter)


@mcp.tool()
def analyze_pcap(pcap_file: str, display_filter: str = ""):
    """Perform an analysis of a pcap file using tshark.

    Args:
        pcap_file (str): The path to the pcap file to analyze.
        display_filter (str): The filter to apply to the analysis.

    Returns:
        str: The output results of the analysis of the pcap file.
    """
    return analyze_pcap_action(pcap_file, display_filter)

@mcp.tool()
def extract_http(pcap_file: str):
    """Perform an HTTP extraction from a pcap file using tshark.

    Args:
        pcap_file (str): The path to the pcap file to extract HTTP from.    

    Returns:
        str: The output results of the HTTP extraction from the pcap file.
    """
    return extract_http_action(pcap_file)  

@mcp.tool()
def protocol_hierarchy(pcap_file: str):
    """Perform a protocol hierarchy listing using tshark.

    Args:
        pcap_file (str): The path to the pcap file to analyze.  

    Returns:
        str: The output results of the protocol hierarchy listing.
    """
    return protocol_hierarchy_action(pcap_file)    

@mcp.tool()
def conversation_statistics(pcap_file: str):
    """Perform a conversation statistics listing using tshark.

    Args:
        pcap_file (str): The path to the pcap file to analyze.  

    Returns:
        str: The output results of the conversation statistics listing.
    """
    return conversation_statistics_action(pcap_file)   

@mcp.tool() 
def expert_info(pcap_file: str):
    """Perform an expert information listing using tshark.

    Args:
        pcap_file (str): The path to the pcap file to analyze.  

    Returns:
        str: The output results of the expert information listing.
    """
    return expert_info_action(pcap_file)   
# tshark end

# traceroute start
@mcp.tool()
def traceroute(target: str):
    """
    Perform a traceroute to the target.

    Args:
        target (str): The target IP address or hostname to traceroute to.

    Returns:
        str: The output results of the traceroute.  
    """
    return traceroute_action(target)
# traceroute end

# run server, using stdio transport
if __name__ == "__main__":
    print("Starting server is running")
    mcp.run(transport="stdio")
```