#
tokens: 9564/50000 6/7 files (page 1/2)
lines: off (toggle) GitHub
raw markdown copy
This is page 1 of 2. Use http://codebase.md/ch1nhpd/pentest-tools-mcp-server?page={x} to view the full context.

# Directory Structure

```
├── config.json
├── docker-compose.yml
├── Dockerfile
├── pentest-tools-mcp-server.py
├── README.md
├── requirements.txt
└── wordlists
    └── xss-payloads.txt
```

# Files

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Pentest Tools MCP Server

An MCP (Model Context Protocol) server for penetration testing tools, designed to work with various LLM clients like Claude Desktop, Roo Code, and other compatible MCP clients.

## Features

- Comprehensive pentesting tools:
  - Directory scanning (FFuf, Dirsearch)
  - Vulnerability scanning (Nuclei, XSStrike)
  - API testing
  - Reconnaissance
  - And more...
- Pre-configured wordlists from SecLists
- Automated report generation
- Claude Desktop integration

## Prerequisites

- Docker and Docker Compose (for containerized setup)
- Claude Desktop application or other MCP-compatible client
- Python 3.10+ and uv (for local setup)

## Directory Setup

1. Create the required directories:
```bash
# Create directories
mkdir -p reports templates wordlists
```

2. Directory structure should look like this:
```
pentest-tools/
├── reports/          # For storing scan reports
├── templates/        # For report templates
├── wordlists/        # For custom wordlists
├── pentest-tools-mcp-server.py
├── config.json
├── requirements.txt
├── docker-compose.yml
└── Dockerfile
```

## Setup

### Docker Setup (Recommended)

1. Build and start the container:
```bash
docker-compose up -d --build
```

2. Verify the container is running:
```bash
docker-compose ps
```

3. Check logs if needed:
```bash
docker-compose logs -f
```

### Local Setup

1. Install dependencies:
```bash
uv venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate
uv pip install -r requirements.txt
```

2. Install required system tools (example for Ubuntu/Debian):
```bash
sudo apt-get install nmap whatweb dnsrecon theharvester ffuf dirsearch sqlmap
```

## Claude Desktop Integration

1. Configure Claude Desktop:

Windows:
```
%APPDATA%\Claude\claude_desktop_config.json
```

MacOS/Linux:
```
~/Library/Application Support/Claude/claude_desktop_config.json
```

2. Add server configuration:

For Docker setup:
```json
{
    "mcpServers": {
        "pentest-tools": {
            "command": "docker-compose",
            "args": [
                "run",
                "--rm",
                "pentest-tools",
                "python3",
                "pentest-tools-mcp-server.py"
            ],
            "cwd": "\\Path\\to\\pentest-tools"
        }
    }
}
```

If the above configuration doesn't work on Windows, try this alternative approach:

```json
{
    "mcpServers": {
        "pentest-tools": {
            "command": "cmd",
            "args": [
                "/c",
                "cd /d \\path\\to\\pentest-tools && docker-compose run --rm pentest-tools python3 pentest-tools-mcp-server.py"
            ]
        }
    }
}
```

Note about `cwd` (Current Working Directory):
- `cwd` tells Claude Desktop which directory to run the command from
- It must be the absolute path to the directory containing `docker-compose.yml`
- On Windows, use double backslashes (`\\`) in paths
- On Linux/MacOS, use forward slashes (`/`)

3. Restart Claude Desktop

## Usage

Available commands in Claude Desktop:

1. Reconnaissance:
```
/recon example.com
```

2. Directory scanning:
```
/scan example.com --type directory
```

3. Vulnerability scanning:
```
/scan example.com --type full
/scan example.com --type xss
/scan example.com --type sqli
/scan example.com --type ssrf
```

4. API testing:
```
/scan api.example.com --type api
```

Natural language commands:
- "Run a full security scan on example.com"
- "Check for XSS vulnerabilities on example.com"
- "Perform reconnaissance on example.com"

## Directory Structure Details

```
pentest-tools/
├── reports/            # Scan reports directory
│   ├── recon/         # Reconnaissance reports
│   ├── vulns/         # Vulnerability scan reports
│   └── api/           # API testing reports
├── templates/          # Report templates
│   ├── recon.html     # Template for recon reports
│   ├── vuln.html      # Template for vulnerability reports
│   └── api.html       # Template for API test reports
├── wordlists/         # Custom wordlists
│   ├── SecLists/     # Cloned from SecLists repo
│   ├── custom/       # Your custom wordlists
│   └── generated/    # Tool-generated wordlists
├── pentest-tools-mcp-server.py # Main MCP server
├── config.json        # Tool configuration
├── requirements.txt   # Python dependencies
├── docker-compose.yml # Docker configuration
└── Dockerfile        # Container definition
```

## Security Notes

- Always ensure you have permission to scan targets
- Keep tools and dependencies updated
- Review scan results carefully
- Follow responsible disclosure practices




```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
mcp[cli]>=1.2.0
httpx
aiohttp
aiofiles
beautifulsoup4
pyjwt
pyyaml
requests
sslyze
python-nmap
dnspython
pdfkit 
dotenv
```

--------------------------------------------------------------------------------
/docker-compose.yml:
--------------------------------------------------------------------------------

```yaml
version: '3.8'

services:
  pentest-tools:
    build: .
    container_name: pentest-tools
    volumes:
      - ./reports:/app/reports
      - ./templates:/app/templates
      - ./wordlists:/usr/share/wordlists/pentest-tools
    environment:
      - GITHUB_TOKEN=${GITHUB_TOKEN}
    cap_add:
      - NET_ADMIN  # Required for some network scanning tools
      - NET_RAW    # Required for raw socket operations
    security_opt:
      - seccomp:unconfined  # Required for some security tools
    stdin_open: true  # Keep STDIN open for MCP
    tty: true        # Allocate a pseudo-TTY

networks:
  pentest-network:
    driver: bridge 
```

--------------------------------------------------------------------------------
/config.json:
--------------------------------------------------------------------------------

```json
{
    "wordlists": {
        "dirsearch": "/usr/share/wordlists/pentest-tools/dirsearch.txt",
        "common": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/Web-Content/common.txt",
        "api_endpoints": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/Web-Content/api-endpoints.txt",
        "subdomains": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/DNS/subdomains-top1million-5000.txt",
        "passwords": "/usr/share/wordlists/pentest-tools/SecLists/Passwords/Common-Credentials/10-million-password-list-top-1000.txt",
        "jwt_secrets": "/usr/share/wordlists/pentest-tools/SecLists/Passwords/Common-Credentials/common-secrets.txt",
        "xss": "/usr/share/wordlists/pentest-tools/xss-payloads.txt",
        "sqli": "/usr/share/wordlists/pentest-tools/sqli-payloads.txt",
        "lfi": "/usr/share/wordlists/pentest-tools/lfi-payloads.txt",
        "ssrf": "/usr/share/wordlists/pentest-tools/ssrf-payloads.txt"
    },
    "tools": {
        "xsstrike": "/root/tools/XSStrike/xsstrike.py",
        "nuclei_templates": "/root/tools/nuclei-templates",
        "crlfuzz": "/root/go/bin/crlfuzz",
        "graphql_path": "/root/tools/graphql-tools"
    },
    "reporting": {
        "output_dir": "/app/reports",
        "template_dir": "/app/templates"
    }
} 
```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM kalilinux/kali-rolling

# Prevent interactive prompts during package installation
ENV DEBIAN_FRONTEND=noninteractive

# Install basic system packages
RUN apt-get update && apt-get install -y \
    python3 \
    python3-pip \
    python3-venv \
    golang \
    git \
    wget \
    nmap \
    whatweb \
    dnsrecon \
    theharvester \
    ffuf \
    dirsearch \
    sqlmap \
    amass \
    nodejs \
    npm \
    curl \
    dnsutils \
    bind9-utils \
    sslyze \
    && rm -rf /var/lib/apt/lists/*

# Set up Python virtual environment
ENV VIRTUAL_ENV=/opt/venv
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

# Install Python packages in virtual environment
COPY requirements.txt /tmp/
RUN pip install --no-cache-dir -r /tmp/requirements.txt

# Check Go version
RUN go version

# Check network connectivity to GitHub
RUN curl -v https://github.com

# Install Go tools individually (excluding Amass)
RUN go install -v github.com/projectdiscovery/subfinder/v2/cmd/subfinder@latest
RUN go install -v github.com/tomnomnom/assetfinder@latest
RUN go install -v github.com/tomnomnom/waybackurls@latest
RUN go install -v github.com/lc/gau/v2/cmd/gau@latest
RUN go install -v github.com/tillson/git-hound@latest
RUN go install -v github.com/hakluke/hakrawler@latest
RUN go install -v github.com/projectdiscovery/nuclei/v2/cmd/nuclei@latest
RUN go install -v github.com/projectdiscovery/httpx/cmd/httpx@latest

# Install npm packages
RUN npm install -g wappalyzer-cli lighthouse snyk

# Set up wordlists directory
RUN mkdir -p /usr/share/wordlists/pentest-tools

# Download SecLists
RUN git clone https://github.com/danielmiessler/SecLists.git /usr/share/wordlists/pentest-tools/SecLists

# Download other wordlists
RUN wget -O /usr/share/wordlists/pentest-tools/dirsearch.txt https://raw.githubusercontent.com/maurosoria/dirsearch/master/db/dicc.txt && \
    wget -O /usr/share/wordlists/pentest-tools/xss-payloads.txt https://raw.githubusercontent.com/payloadbox/xss-payload-list/master/Intruder/xss-payload-list.txt && \
    wget -O /usr/share/wordlists/pentest-tools/sqli-payloads.txt https://raw.githubusercontent.com/payloadbox/sql-injection-payload-list/refs/heads/master/Intruder/detect/Generic_SQLI.txt && \
    wget -O /usr/share/wordlists/pentest-tools/lfi-payloads.txt https://raw.githubusercontent.com/emadshanab/LFI-Payload-List/master/LFI%20payloads.txt && \
    wget -O /usr/share/wordlists/pentest-tools/ssrf-payloads.txt https://gist.githubusercontent.com/rootsploit/66c9ae8fc3ef387fa5ffbb67fcef0766/raw/d5a4088d628ed05f161b9dd9bf3c6755910a164f/SSRF-Payloads.txt

# Set up tools directory
RUN mkdir -p /root/tools

# Clone useful repositories
RUN git clone https://github.com/s0md3v/XSStrike.git /root/tools/XSStrike && \
    git clone https://github.com/projectdiscovery/nuclei-templates.git /root/tools/nuclei-templates && \
    git clone https://github.com/graphql/graphql-js.git /root/tools/graphql-tools

# Create necessary directories for the application
RUN mkdir -p /app/reports /app/templates

# Set working directory
WORKDIR /app

# Copy application files
COPY . /app/

# Add Go binaries to PATH
ENV PATH="/root/go/bin:$PATH"

# Set entrypoint
CMD ["python", "pentest-tools-mcp-server.py"] 
```

--------------------------------------------------------------------------------
/pentest-tools-mcp-server.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3

from typing import Any, List, Optional, Dict
from mcp.server.fastmcp import FastMCP
import httpx
import subprocess
import json
import asyncio
import re
from pathlib import Path
from datetime import datetime
import yaml
import aiofiles
import aiohttp
from bs4 import BeautifulSoup
# import jwt
import base64
import logging
import threading
from concurrent.futures import ThreadPoolExecutor
import urllib.parse
import os
from dotenv import load_dotenv

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    filename='pentest.log'
)
logger = logging.getLogger('pentest-tools')

# Initialize FastMCP server
mcp = FastMCP("pentest-tools")

# Configuration
CONFIG = {
    "wordlists": {
        "dirsearch": "/usr/share/wordlists/pentest-tools/dirsearch.txt",
        "common": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/Web-Content/common.txt",
        "api_endpoints": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/Web-Content/api-endpoints.txt",
        "subdomains": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/DNS/subdomains-top1million-5000.txt",
        "passwords": "/usr/share/wordlists/pentest-tools/SecLists/Passwords/Common-Credentials/10-million-password-list-top-1000.txt",
        "jwt_secrets": "/usr/share/wordlists/pentest-tools/SecLists/Passwords/Common-Credentials/common-secrets.txt",
        "xss": "/usr/share/wordlists/pentest-tools/xss-payloads.txt",
        "sqli": "/usr/share/wordlists/pentest-tools/sqli-payloads.txt",
        "lfi": "/usr/share/wordlists/pentest-tools/lfi-payloads.txt",
        "ssrf": "/usr/share/wordlists/pentest-tools/ssrf-payloads.txt"
    },
    "tools": {
        "xsstrike": "/root/tools/XSStrike/xsstrike.py",
        "nuclei_templates": "/root/tools/nuclei-templates",
        "crlfuzz": "/root/go/bin/crlfuzz",
        "graphql_path": "/root/tools/graphql-tools"
    },
    "reporting": {
        "output_dir": "reports",
        "template_dir": "templates"
    }
}

# Load environment variables
load_dotenv()

def format_ffuf_results(results):
    """Format the FFuf results in a readable format.
    
    Args:
        results: FFuf JSON results
    
    Returns:
        Formatted string with FFuf results
    """
    formatted = []
    
    # Add summary
    if "stats" in results:
        stats = results["stats"]
        formatted.append(f"Total requests: {stats.get('total', 0)}")
        formatted.append(f"Duration: {stats.get('elapsed', 0)} seconds")
        formatted.append(f"Requests per second: {stats.get('req_sec', 0)}")
    
    # Add results
    if "results" in results:
        formatted.append("\nDiscovered URLs:")
        for item in results["results"]:
            status = item.get("status", "")
            url = item.get("url", "")
            size = item.get("length", 0)
            formatted.append(f"[Status: {status}] [{size} bytes] {url}")
    
    return "\n".join(formatted)

@mcp.tool()
async def advanced_directory_scan(url: str, extensions: List[str] = None) -> str:
    """Advanced directory and file scanning with multiple tools and techniques.
    
    Args:
        url: Target URL
        extensions: List of file extensions to scan for
    """
    if extensions is None:
        extensions = ["php", "asp", "aspx", "jsp", "js", "txt", "conf", "bak", "backup", "swp", "old", "db", "sql"]
    
    results = []
    
    # 1. FFuf with advanced options
    try:
        ffuf_cmd = [
            "ffuf",
            "-u", f"{url}/FUZZ",
            "-w", CONFIG["wordlists"]["dirsearch"],
            "-e", ",".join(extensions),
            "-recursion",
            "-recursion-depth", "3",
            "-mc", "all",
            "-ac",
            "-o", "ffuf.json",
            "-of", "json"
        ]
        subprocess.run(ffuf_cmd, check=True)
        with open("ffuf.json") as f:
            ffuf_results = json.load(f)
        results.append("=== FFuf Results ===\n" + format_ffuf_results(ffuf_results))
    except Exception as e:
        results.append(f"FFuf error: {str(e)}")
    
    # 2. Dirsearch with advanced options
    try:
        dirsearch_cmd = [
            "dirsearch",
            "-u", url,
            "-e", ",".join(extensions),
            "--deep-recursive",
            "--force-recursive",
            "--exclude-status", "404",
            "-o", "dirsearch.json",
            "--format", "json"
        ]
        subprocess.run(dirsearch_cmd, check=True)
        with open("dirsearch.json") as f:
            dirsearch_results = json.load(f)
        results.append("=== Dirsearch Results ===\n" + json.dumps(dirsearch_results, indent=2))
    except Exception as e:
        results.append(f"Dirsearch error: {str(e)}")
    
    return "\n\n".join(results)

@mcp.tool()
async def advanced_api_scan(url: str) -> str:
    """Advanced API security testing with multiple techniques.
    
    Args:
        url: Target API URL
    """
    results = []
    
    # 1. GraphQL Security Testing
    if "/graphql" in url or "/graphiql" in url:
        try:
            # Introspection query
            graphql_query = """
            query IntrospectionQuery {
              __schema {
                types { name, fields { name, type { name } } }
                queryType { name }
                mutationType { name }
                subscriptionType { name }
              }
            }
            """
            async with httpx.AsyncClient(verify=False) as client:
                response = await client.post(f"{url}", json={"query": graphql_query})
                if response.status_code == 200:
                    results.append("=== GraphQL Schema ===\n" + json.dumps(response.json(), indent=2))
                    
                    # Test for common GraphQL vulnerabilities
                    vulns = await test_graphql_vulnerabilities(url, response.json())
                    results.append("=== GraphQL Vulnerabilities ===\n" + vulns)
        except Exception as e:
            results.append(f"GraphQL testing error: {str(e)}")
    
    # 2. REST API Testing
    try:
        # Test common REST endpoints
        common_paths = ["/v1", "/v2", "/api", "/api/v1", "/api/v2", "/swagger", "/docs", "/openapi.json"]
        async with httpx.AsyncClient(verify=False) as client:
            for path in common_paths:
                response = await client.get(f"{url}{path}")
                if response.status_code != 404:
                    results.append(f"\nFound API endpoint: {path}")
                    results.append(f"Status: {response.status_code}")
                    results.append(f"Response: {response.text[:500]}...")
                    
                    # If Swagger/OpenAPI found, parse and test endpoints
                    if "swagger" in path or "openapi" in path:
                        api_spec = response.json()
                        results.append("\n=== Testing API Endpoints ===")
                        for endpoint, methods in api_spec.get("paths", {}).items():
                            for method, details in methods.items():
                                test_result = await test_api_endpoint(url, endpoint, method, details)
                                results.append(test_result)
    except Exception as e:
        results.append(f"REST API testing error: {str(e)}")
    
    return "\n\n".join(results)

@mcp.tool()
async def advanced_xss_scan(url: str) -> str:
    """Advanced XSS vulnerability scanning.
    
    Args:
        url: Target URL
    """
    results = []
    
    # 1. XSStrike with advanced options
    try:
        xsstrike_cmd = [
            "python3", CONFIG["tools"]["xsstrike"],
            "-u", url,
            "--crawl",
            "--params",
            "--fuzzer",
            "--blind",
            "--vectors", CONFIG["wordlists"]["xss"]
        ]
        xsstrike = subprocess.check_output(xsstrike_cmd, text=True)
        results.append("=== XSStrike Results ===\n" + xsstrike)
    except Exception as e:
        results.append(f"XSStrike error: {str(e)}")
    
    # 2. Custom XSS testing
    try:
        async with httpx.AsyncClient(verify=False) as client:
            # Get all input parameters
            response = await client.get(url)
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # Test input fields
            for input_field in soup.find_all(['input', 'textarea']):
                field_name = input_field.get('name', '')
                if field_name:
                    # Test various XSS payloads
                    with open(CONFIG["wordlists"]["xss"]) as f:
                        for payload in f:
                            payload = payload.strip()
                            data = {field_name: payload}
                            response = await client.post(url, data=data)
                            if payload in response.text:
                                results.append(f"Potential XSS found in {field_name} with payload: {payload}")
    except Exception as e:
        results.append(f"Custom XSS testing error: {str(e)}")
    
    return "\n\n".join(results)

@mcp.tool()
async def advanced_sqli_scan(url: str) -> str:
    """Advanced SQL injection testing.
    
    Args:
        url: Target URL
    """
    results = []
    
    # 1. SQLMap with advanced options
    try:
        sqlmap_cmd = [
            "sqlmap",
            "-u", url,
            "--batch",
            "--random-agent",
            "--level", "5",
            "--risk", "3",
            "--threads", "10",
            "--tamper=space2comment,between,randomcase",
            "--time-sec", "1",
            "--dump"
        ]
        sqlmap = subprocess.check_output(sqlmap_cmd, text=True)
        results.append("=== SQLMap Results ===\n" + sqlmap)
    except Exception as e:
        results.append(f"SQLMap error: {str(e)}")
    
    # 2. Custom SQL injection testing
    try:
        async with httpx.AsyncClient(verify=False) as client:
            # Test parameters
            params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
            for param in params:
                with open(CONFIG["wordlists"]["sqli"]) as f:
                    for payload in f:
                        payload = payload.strip()
                        test_url = url.replace(f"{param}={params[param][0]}", f"{param}={payload}")
                        response = await client.get(test_url)
                        
                        # Check for SQL errors
                        sql_errors = [
                            "SQL syntax",
                            "mysql_fetch_array",
                            "ORA-",
                            "PostgreSQL",
                            "SQLite3::"
                        ]
                        for error in sql_errors:
                            if error in response.text:
                                results.append(f"Potential SQL injection in parameter {param} with payload: {payload}")
    except Exception as e:
        results.append(f"Custom SQLi testing error: {str(e)}")
    
    return "\n\n".join(results)

@mcp.tool()
async def advanced_ssrf_scan(url: str) -> str:
    """Advanced Server-Side Request Forgery testing.
    
    Args:
        url: Target URL
    """
    results = []
    
    try:
        async with httpx.AsyncClient(verify=False) as client:
            # Test various SSRF payloads
            with open(CONFIG["wordlists"]["ssrf"]) as f:
                for payload in f:
                    payload = payload.strip()
                    
                    # Test in different parameter positions
                    params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query)
                    for param in params:
                        test_url = url.replace(f"{param}={params[param][0]}", f"{param}={payload}")
                        response = await client.get(test_url)
                        
                        # Check for successful SSRF
                        if response.status_code == 200 and len(response.text) > 0:
                            results.append(f"Potential SSRF in parameter {param} with payload: {payload}")
                            results.append(f"Response length: {len(response.text)}")
                            results.append(f"Response preview: {response.text[:200]}...")
    except Exception as e:
        results.append(f"SSRF testing error: {str(e)}")
    
    return "\n\n".join(results)

@mcp.tool()
async def test_graphql_vulnerabilities(url: str, schema: dict) -> str:
    """Test GraphQL specific vulnerabilities.
    
    Args:
        url: GraphQL endpoint URL
        schema: GraphQL schema from introspection
    """
    results = []
    
    try:
        async with httpx.AsyncClient(verify=False) as client:
            # 1. Test for DoS via nested queries
            nested_query = "query {\n  " + "user { ".repeat(10) + "id " + "}".repeat(10) + "\n}"
            response = await client.post(url, json={"query": nested_query})
            if response.status_code != 200:
                results.append("Potential DoS vulnerability - nested queries not properly limited")
            
            # 2. Test for sensitive data exposure
            sensitive_types = ["User", "Admin", "Password", "Token", "Secret"]
            for type_obj in schema["__schema"]["types"]:
                if any(sensitive in type_obj["name"] for sensitive in sensitive_types):
                    results.append(f"Potential sensitive data exposure in type: {type_obj['name']}")
            
            # 3. Test for batch query attacks
            batch_query = [{"query": "{ __schema { types { name } } }"} for _ in range(100)]
            response = await client.post(url, json=batch_query)
            if response.status_code == 200:
                results.append("Batch queries allowed - potential DoS vector")
    
    except Exception as e:
        results.append(f"GraphQL vulnerability testing error: {str(e)}")
    
    return "\n\n".join(results)

@mcp.tool()
async def test_api_endpoint(base_url: str, endpoint: str, method: str, details: dict) -> str:
    """Test individual API endpoint for vulnerabilities.
    
    Args:
        base_url: Base API URL
        endpoint: API endpoint path
        method: HTTP method
        details: Endpoint details from OpenAPI spec
    """
    results = []
    
    try:
        async with httpx.AsyncClient(verify=False) as client:
            # 1. Test without authentication
            response = await client.request(method, f"{base_url}{endpoint}")
            if response.status_code != 401:
                results.append(f"Endpoint {endpoint} accessible without auth")
            
            # 2. Test parameter fuzzing
            if "parameters" in details:
                for param in details["parameters"]:
                    if param["in"] == "query":
                        # Test SQL injection
                        test_url = f"{base_url}{endpoint}?{param['name']}=1' OR '1'='1"
                        response = await client.request(method, test_url)
                        if "error" in response.text.lower():
                            results.append(f"Potential SQL injection in parameter {param['name']}")
                        
                        # Test XSS
                        test_url = f"{base_url}{endpoint}?{param['name']}=<script>alert(1)</script>"
                        response = await client.request(method, test_url)
                        if "<script>alert(1)</script>" in response.text:
                            results.append(f"Potential XSS in parameter {param['name']}")
            
            # 3. Test for mass assignment
            if method.lower() in ["post", "put"] and "requestBody" in details:
                schema = details["requestBody"]["content"]["application/json"]["schema"]
                if "properties" in schema:
                    # Try to inject admin/privileged fields
                    payload = {
                        "isAdmin": True,
                        "role": "admin",
                        "privileges": ["admin"],
                        **{k: "test" for k in schema["properties"].keys()}
                    }
                    response = await client.request(method, f"{base_url}{endpoint}", json=payload)
                    if response.status_code == 200:
                        results.append(f"Potential mass assignment vulnerability in {endpoint}")
    
    except Exception as e:
        results.append(f"API endpoint testing error: {str(e)}")
    
    return "\n\n".join(results)

@mcp.tool()
async def advanced_recon(domain: str) -> str:
    """Perform advanced reconnaissance on a target domain.
    
    Args:
        domain: Target domain name
    """
    results = []
    
    # 1. Subdomain Enumeration
    try:
        # Subfinder
        subdomains = subprocess.check_output(["subfinder", "-d", domain, "-silent"], text=True)
        results.append("=== Subfinder Results ===\n" + subdomains)
        
        # Amass
        amass = subprocess.check_output([
            "amass", "enum",
            "-d", domain,
            "-passive",
            "-silent"
        ], text=True)
        results.append("=== Amass Results ===\n" + amass)
        
        # Assetfinder
        assetfinder = subprocess.check_output(["assetfinder", "--subs-only", domain], text=True)
        results.append("=== Assetfinder Results ===\n" + assetfinder)
        
        # GitHub Subdomains
        github_subdomains = subprocess.check_output([
            "github-subdomains",
            "-d", domain,
            "-t", "YOUR_GITHUB_TOKEN"
        ], text=True)
        results.append("=== GitHub Subdomains ===\n" + github_subdomains)
        
    except Exception as e:
        results.append(f"Subdomain enumeration error: {str(e)}")
    
    # 2. Port Scanning & Service Detection
    try:
        # Quick Nmap scan
        nmap_quick = subprocess.check_output([
            "nmap",
            "-sV", "-sC",
            "--min-rate", "1000",
            "-T4",
            domain
        ], text=True)
        results.append("=== Quick Nmap Scan ===\n" + nmap_quick)
        
        # Detailed scan of web ports
        nmap_web = subprocess.check_output([
            "nmap",
            "-p", "80,443,8080,8443",
            "-sV", "--script=http-enum,http-headers,http-methods,http-title",
            domain
        ], text=True)
        results.append("=== Web Services Scan ===\n" + nmap_web)
        
    except Exception as e:
        results.append(f"Port scanning error: {str(e)}")
    
    # 3. Technology Detection
    try:
        # Wappalyzer
        wappalyzer = subprocess.check_output(["wappalyzer", f"https://{domain}"], text=True)
        results.append("=== Technologies (Wappalyzer) ===\n" + wappalyzer)
        
        # Whatweb
        whatweb = subprocess.check_output(["whatweb", "-a", "3", domain], text=True)
        results.append("=== Technologies (Whatweb) ===\n" + whatweb)
        
    except Exception as e:
        results.append(f"Technology detection error: {str(e)}")
    
    # 4. DNS Information
    try:
        # DNSRecon
        dnsrecon = subprocess.check_output(["dnsrecon", "-d", domain, "-t", "std,axfr,srv"], text=True)
        results.append("=== DNS Information ===\n" + dnsrecon)
        
        # DNS Zone Transfer
        dig_axfr = subprocess.check_output(["dig", "axfr", domain], text=True)
        results.append("=== DNS Zone Transfer ===\n" + dig_axfr)
        
    except Exception as e:
        results.append(f"DNS enumeration error: {str(e)}")
    
    # 5. Web Archive
    try:
        # Waybackurls
        wayback = subprocess.check_output(["waybackurls", domain], text=True)
        results.append("=== Historical URLs ===\n" + wayback)
        
        # Gau
        gau = subprocess.check_output(["gau", domain], text=True)
        results.append("=== GAU URLs ===\n" + gau)
        
    except Exception as e:
        results.append(f"Web archive error: {str(e)}")
    
    # 6. SSL/TLS Analysis
    try:
        # SSLyze
        sslyze = subprocess.check_output([
            "sslyze",
            "--regular",
            domain
        ], text=True)
        results.append("=== SSL/TLS Analysis ===\n" + sslyze)
        
    except Exception as e:
        results.append(f"SSL analysis error: {str(e)}")
    
    # 7. Email Discovery
    try:
        # TheHarvester
        harvester = subprocess.check_output([
            "theHarvester",
            "-d", domain,
            "-b", "all"
        ], text=True)
        results.append("=== Email Addresses ===\n" + harvester)
        
    except Exception as e:
        results.append(f"Email discovery error: {str(e)}")
    
    # 8. GitHub Reconnaissance
    try:
        # GitHound
        githound = subprocess.check_output([
            "githound",
            "--subdomain-file", "subdomains.txt",
            "--threads", "10",
            domain
        ], text=True)
        results.append("=== GitHub Secrets ===\n" + githound)
        
    except Exception as e:
        results.append(f"GitHub recon error: {str(e)}")
    
    # 9. Content Discovery
    try:
        # Hakrawler
        hakrawler = subprocess.check_output([
            "hakrawler",
            "-url", f"https://{domain}",
            "-depth", "3",
            "-plain"
        ], text=True)
        results.append("=== Content Discovery ===\n" + hakrawler)
        
    except Exception as e:
        results.append(f"Content discovery error: {str(e)}")
    
    return "\n\n".join(results)

@mcp.tool()
async def analyze_recon_data(domain: str, recon_results: str) -> str:
    """Analyze reconnaissance data and identify potential security issues.
    
    Args:
        domain: Target domain
        recon_results: Results from reconnaissance
    """
    findings = []
    
    try:
        # 1. Analyze subdomains
        if "Subfinder Results" in recon_results:
            subdomains = re.findall(r'[\w\-\.]+\.'+domain, recon_results)
            findings.append(f"Found {len(subdomains)} subdomains")
            
            # Check for interesting subdomains
            interesting = [s for s in subdomains if any(x in s for x in ['dev', 'stage', 'test', 'admin', 'internal'])]
            if interesting:
                findings.append(f"Potentially sensitive subdomains: {', '.join(interesting)}")
        
        # 2. Analyze ports
        if "Nmap Scan" in recon_results:
            open_ports = re.findall(r'(\d+)/tcp\s+open', recon_results)
            findings.append(f"Found {len(open_ports)} open ports")
            
            # Check for dangerous ports
            dangerous = [p for p in open_ports if p in ['21', '23', '3389', '445', '135']]
            if dangerous:
                findings.append(f"Potentially dangerous ports open: {', '.join(dangerous)}")
        
        # 3. Analyze technologies
        if "Technologies" in recon_results:
            # Check for outdated versions
            outdated = re.findall(r'([\w\-]+) ([\d\.]+)', recon_results)
            for tech, version in outdated:
                findings.append(f"Detected {tech} version {version} - check for vulnerabilities")
        
        # 4. Analyze SSL/TLS
        if "SSL/TLS Analysis" in recon_results:
            if "SSLv2" in recon_results or "SSLv3" in recon_results:
                findings.append("WARNING: Outdated SSL protocols detected")
            if "TLSv1.0" in recon_results:
                findings.append("WARNING: TLS 1.0 is enabled")
        
        # 5. Analyze DNS
        if "DNS Information" in recon_results:
            if "AXFR" in recon_results:
                findings.append("WARNING: DNS Zone Transfer possible")
        
        # 6. Analyze web content
        if "Content Discovery" in recon_results:
            sensitive_files = re.findall(r'([\w\-\/]+\.(php|asp|aspx|jsp|config|env|git))', recon_results)
            if sensitive_files:
                findings.append(f"Found {len(sensitive_files)} potentially sensitive files")
        
        # 7. Analyze GitHub findings
        if "GitHub Secrets" in recon_results:
            if any(x in recon_results.lower() for x in ['password', 'secret', 'key', 'token']):
                findings.append("WARNING: Potential secrets found in GitHub repositories")
        
    except Exception as e:
        findings.append(f"Analysis error: {str(e)}")
    
    return "\n\n".join(findings)

@mcp.tool()
async def advanced_full_scan(target: str, options: dict = None) -> str:
    """Perform comprehensive security assessment with advanced options.
    
    Args:
        target: Target domain/IP/application
        options: Scan configuration options
    """
    if options is None:
        options = {
            "recon": True,
            "directory": True,
            "api": True,
            "xss": True,
            "sqli": True,
            "ssrf": True,
            "report": True
        }
    
    results = {}
    scan_tasks = []
    
    # 1. Reconnaissance
    if options.get("recon", True):
        recon_results = await advanced_recon(target)
        analysis = await analyze_recon_data(target, recon_results)
        results["recon"] = {
            "raw_results": recon_results,
            "analysis": analysis
        }
    
    # 2. Directory Scanning
    if options.get("directory", True):
        scan_tasks.append(advanced_directory_scan(f"https://{target}"))
    
    # Continue with other scans...
    if options.get("api", True):
        scan_tasks.append(advanced_api_scan(f"https://{target}"))
    
    if options.get("xss", True):
        scan_tasks.append(advanced_xss_scan(f"https://{target}"))
    
    if options.get("sqli", True):
        scan_tasks.append(advanced_sqli_scan(f"https://{target}"))
    
    if options.get("ssrf", True):
        scan_tasks.append(advanced_ssrf_scan(f"https://{target}"))
    
    # Run remaining tasks concurrently
    scan_results = await asyncio.gather(*scan_tasks, return_exceptions=True)
    
    # Add other results
    for i, task_name in enumerate(["directory", "api", "xss", "sqli", "ssrf"]):
        if i < len(scan_results):
            results[task_name] = scan_results[i]
    
    # Generate report
    if options.get("report", True):
        report_type = options.get("report_type", "html")
        await generate_report({"target": target, "results": results}, report_type)
    
    return json.dumps(results, indent=2)

@mcp.tool()
async def generate_report(scan_results: dict, report_type: str = "html") -> str:
    """Generate a comprehensive security report.
    
    Args:
        scan_results: Dictionary containing all scan results
        report_type: Output format (html, pdf, json)
    """
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    report_file = f"{CONFIG['reporting']['output_dir']}/report_{timestamp}.{report_type}"
    
    try:
        if report_type == "html":
            # Use template to generate HTML report
            with open(f"{CONFIG['reporting']['template_dir']}/report.html") as f:
                template = f.read()
            
            # Replace placeholders with results
            report_content = template.replace("{{RESULTS}}", json.dumps(scan_results, indent=2))
            
            with open(report_file, "w") as f:
                f.write(report_content)
        
        elif report_type == "pdf":
            # Convert HTML to PDF
            import pdfkit
            pdfkit.from_string(report_content, report_file)
        
        elif report_type == "json":
            with open(report_file, "w") as f:
                json.dump(scan_results, f, indent=2)
        
        return f"Report generated: {report_file}"
    
    except Exception as e:
        return f"Report generation error: {str(e)}"

@mcp.tool()
async def send_http_request(url: str, method: str = "GET", headers: dict = None, data: str = None, 
                           verify_ssl: bool = False, timeout: int = 30) -> str:
    """Send an HTTP request to a URL and read the response with headers.
    
    Args:
        url: Target URL to send request to
        method: HTTP method (GET, POST, PUT, DELETE, etc.)
        headers: Dictionary of HTTP headers to include
        data: Request body data
        verify_ssl: Whether to verify SSL certificates
        timeout: Request timeout in seconds
    
    Returns:
        String containing response headers and body
    """
    try:
        result = []
        result.append(f"=== Request to {url} ===")
        
        async with httpx.AsyncClient(verify=verify_ssl, timeout=float(timeout)) as client:
            if method.upper() == "GET":
                response = await client.get(url, headers=headers)
            elif method.upper() == "POST":
                response = await client.post(url, headers=headers, content=data)
            elif method.upper() == "PUT":
                response = await client.put(url, headers=headers, content=data)
            elif method.upper() == "DELETE":
                response = await client.delete(url, headers=headers)
            elif method.upper() == "HEAD":
                response = await client.head(url, headers=headers)
            elif method.upper() == "OPTIONS":
                response = await client.options(url, headers=headers)
            elif method.upper() == "PATCH":
                response = await client.patch(url, headers=headers, content=data)
            else:
                return f"Unsupported HTTP method: {method}"
        
        # Add response information
        result.append(f"Status Code: {response.status_code}")
        
        # Add headers
        result.append("\n=== Response Headers ===")
        for header, value in response.headers.items():
            result.append(f"{header}: {value}")
        
        # Add response body
        result.append("\n=== Response Body ===")
        if 'application/json' in response.headers.get('content-type', ''):
            try:
                formatted_json = json.dumps(response.json(), indent=2)
                result.append(formatted_json)
            except:
                result.append(response.text)
        else:
            result.append(response.text)
            
        return "\n".join(result)
    except Exception as e:
        return f"Error sending HTTP request: {str(e)}"

if __name__ == "__main__":
    # Create necessary directories
    Path(CONFIG["reporting"]["output_dir"]).mkdir(parents=True, exist_ok=True)
    
    # Initialize and run the server with stdio transport
    mcp.run(transport='stdio')



```
Page 1/2FirstPrevNextLast