This is page 1 of 2. Use http://codebase.md/ch1nhpd/pentest-tools-mcp-server?page={x} to view the full context. # Directory Structure ``` ├── config.json ├── docker-compose.yml ├── Dockerfile ├── pentest-tools-mcp-server.py ├── README.md ├── requirements.txt └── wordlists └── xss-payloads.txt ``` # Files -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # Pentest Tools MCP Server An MCP (Model Context Protocol) server for penetration testing tools, designed to work with various LLM clients like Claude Desktop, Roo Code, and other compatible MCP clients. ## Features - Comprehensive pentesting tools: - Directory scanning (FFuf, Dirsearch) - Vulnerability scanning (Nuclei, XSStrike) - API testing - Reconnaissance - And more... - Pre-configured wordlists from SecLists - Automated report generation - Claude Desktop integration ## Prerequisites - Docker and Docker Compose (for containerized setup) - Claude Desktop application or other MCP-compatible client - Python 3.10+ and uv (for local setup) ## Directory Setup 1. Create the required directories: ```bash # Create directories mkdir -p reports templates wordlists ``` 2. Directory structure should look like this: ``` pentest-tools/ ├── reports/ # For storing scan reports ├── templates/ # For report templates ├── wordlists/ # For custom wordlists ├── pentest-tools-mcp-server.py ├── config.json ├── requirements.txt ├── docker-compose.yml └── Dockerfile ``` ## Setup ### Docker Setup (Recommended) 1. Build and start the container: ```bash docker-compose up -d --build ``` 2. Verify the container is running: ```bash docker-compose ps ``` 3. Check logs if needed: ```bash docker-compose logs -f ``` ### Local Setup 1. Install dependencies: ```bash uv venv source .venv/bin/activate # On Windows: .venv\Scripts\activate uv pip install -r requirements.txt ``` 2. Install required system tools (example for Ubuntu/Debian): ```bash sudo apt-get install nmap whatweb dnsrecon theharvester ffuf dirsearch sqlmap ``` ## Claude Desktop Integration 1. Configure Claude Desktop: Windows: ``` %APPDATA%\Claude\claude_desktop_config.json ``` MacOS/Linux: ``` ~/Library/Application Support/Claude/claude_desktop_config.json ``` 2. Add server configuration: For Docker setup: ```json { "mcpServers": { "pentest-tools": { "command": "docker-compose", "args": [ "run", "--rm", "pentest-tools", "python3", "pentest-tools-mcp-server.py" ], "cwd": "\\Path\\to\\pentest-tools" } } } ``` If the above configuration doesn't work on Windows, try this alternative approach: ```json { "mcpServers": { "pentest-tools": { "command": "cmd", "args": [ "/c", "cd /d \\path\\to\\pentest-tools && docker-compose run --rm pentest-tools python3 pentest-tools-mcp-server.py" ] } } } ``` Note about `cwd` (Current Working Directory): - `cwd` tells Claude Desktop which directory to run the command from - It must be the absolute path to the directory containing `docker-compose.yml` - On Windows, use double backslashes (`\\`) in paths - On Linux/MacOS, use forward slashes (`/`) 3. Restart Claude Desktop ## Usage Available commands in Claude Desktop: 1. Reconnaissance: ``` /recon example.com ``` 2. Directory scanning: ``` /scan example.com --type directory ``` 3. Vulnerability scanning: ``` /scan example.com --type full /scan example.com --type xss /scan example.com --type sqli /scan example.com --type ssrf ``` 4. API testing: ``` /scan api.example.com --type api ``` Natural language commands: - "Run a full security scan on example.com" - "Check for XSS vulnerabilities on example.com" - "Perform reconnaissance on example.com" ## Directory Structure Details ``` pentest-tools/ ├── reports/ # Scan reports directory │ ├── recon/ # Reconnaissance reports │ ├── vulns/ # Vulnerability scan reports │ └── api/ # API testing reports ├── templates/ # Report templates │ ├── recon.html # Template for recon reports │ ├── vuln.html # Template for vulnerability reports │ └── api.html # Template for API test reports ├── wordlists/ # Custom wordlists │ ├── SecLists/ # Cloned from SecLists repo │ ├── custom/ # Your custom wordlists │ └── generated/ # Tool-generated wordlists ├── pentest-tools-mcp-server.py # Main MCP server ├── config.json # Tool configuration ├── requirements.txt # Python dependencies ├── docker-compose.yml # Docker configuration └── Dockerfile # Container definition ``` ## Security Notes - Always ensure you have permission to scan targets - Keep tools and dependencies updated - Review scan results carefully - Follow responsible disclosure practices ``` -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` mcp[cli]>=1.2.0 httpx aiohttp aiofiles beautifulsoup4 pyjwt pyyaml requests sslyze python-nmap dnspython pdfkit dotenv ``` -------------------------------------------------------------------------------- /docker-compose.yml: -------------------------------------------------------------------------------- ```yaml version: '3.8' services: pentest-tools: build: . container_name: pentest-tools volumes: - ./reports:/app/reports - ./templates:/app/templates - ./wordlists:/usr/share/wordlists/pentest-tools environment: - GITHUB_TOKEN=${GITHUB_TOKEN} cap_add: - NET_ADMIN # Required for some network scanning tools - NET_RAW # Required for raw socket operations security_opt: - seccomp:unconfined # Required for some security tools stdin_open: true # Keep STDIN open for MCP tty: true # Allocate a pseudo-TTY networks: pentest-network: driver: bridge ``` -------------------------------------------------------------------------------- /config.json: -------------------------------------------------------------------------------- ```json { "wordlists": { "dirsearch": "/usr/share/wordlists/pentest-tools/dirsearch.txt", "common": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/Web-Content/common.txt", "api_endpoints": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/Web-Content/api-endpoints.txt", "subdomains": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/DNS/subdomains-top1million-5000.txt", "passwords": "/usr/share/wordlists/pentest-tools/SecLists/Passwords/Common-Credentials/10-million-password-list-top-1000.txt", "jwt_secrets": "/usr/share/wordlists/pentest-tools/SecLists/Passwords/Common-Credentials/common-secrets.txt", "xss": "/usr/share/wordlists/pentest-tools/xss-payloads.txt", "sqli": "/usr/share/wordlists/pentest-tools/sqli-payloads.txt", "lfi": "/usr/share/wordlists/pentest-tools/lfi-payloads.txt", "ssrf": "/usr/share/wordlists/pentest-tools/ssrf-payloads.txt" }, "tools": { "xsstrike": "/root/tools/XSStrike/xsstrike.py", "nuclei_templates": "/root/tools/nuclei-templates", "crlfuzz": "/root/go/bin/crlfuzz", "graphql_path": "/root/tools/graphql-tools" }, "reporting": { "output_dir": "/app/reports", "template_dir": "/app/templates" } } ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile FROM kalilinux/kali-rolling # Prevent interactive prompts during package installation ENV DEBIAN_FRONTEND=noninteractive # Install basic system packages RUN apt-get update && apt-get install -y \ python3 \ python3-pip \ python3-venv \ golang \ git \ wget \ nmap \ whatweb \ dnsrecon \ theharvester \ ffuf \ dirsearch \ sqlmap \ amass \ nodejs \ npm \ curl \ dnsutils \ bind9-utils \ sslyze \ && rm -rf /var/lib/apt/lists/* # Set up Python virtual environment ENV VIRTUAL_ENV=/opt/venv RUN python3 -m venv $VIRTUAL_ENV ENV PATH="$VIRTUAL_ENV/bin:$PATH" # Install Python packages in virtual environment COPY requirements.txt /tmp/ RUN pip install --no-cache-dir -r /tmp/requirements.txt # Check Go version RUN go version # Check network connectivity to GitHub RUN curl -v https://github.com # Install Go tools individually (excluding Amass) RUN go install -v github.com/projectdiscovery/subfinder/v2/cmd/subfinder@latest RUN go install -v github.com/tomnomnom/assetfinder@latest RUN go install -v github.com/tomnomnom/waybackurls@latest RUN go install -v github.com/lc/gau/v2/cmd/gau@latest RUN go install -v github.com/tillson/git-hound@latest RUN go install -v github.com/hakluke/hakrawler@latest RUN go install -v github.com/projectdiscovery/nuclei/v2/cmd/nuclei@latest RUN go install -v github.com/projectdiscovery/httpx/cmd/httpx@latest # Install npm packages RUN npm install -g wappalyzer-cli lighthouse snyk # Set up wordlists directory RUN mkdir -p /usr/share/wordlists/pentest-tools # Download SecLists RUN git clone https://github.com/danielmiessler/SecLists.git /usr/share/wordlists/pentest-tools/SecLists # Download other wordlists RUN wget -O /usr/share/wordlists/pentest-tools/dirsearch.txt https://raw.githubusercontent.com/maurosoria/dirsearch/master/db/dicc.txt && \ wget -O /usr/share/wordlists/pentest-tools/xss-payloads.txt https://raw.githubusercontent.com/payloadbox/xss-payload-list/master/Intruder/xss-payload-list.txt && \ wget -O /usr/share/wordlists/pentest-tools/sqli-payloads.txt https://raw.githubusercontent.com/payloadbox/sql-injection-payload-list/refs/heads/master/Intruder/detect/Generic_SQLI.txt && \ wget -O /usr/share/wordlists/pentest-tools/lfi-payloads.txt https://raw.githubusercontent.com/emadshanab/LFI-Payload-List/master/LFI%20payloads.txt && \ wget -O /usr/share/wordlists/pentest-tools/ssrf-payloads.txt https://gist.githubusercontent.com/rootsploit/66c9ae8fc3ef387fa5ffbb67fcef0766/raw/d5a4088d628ed05f161b9dd9bf3c6755910a164f/SSRF-Payloads.txt # Set up tools directory RUN mkdir -p /root/tools # Clone useful repositories RUN git clone https://github.com/s0md3v/XSStrike.git /root/tools/XSStrike && \ git clone https://github.com/projectdiscovery/nuclei-templates.git /root/tools/nuclei-templates && \ git clone https://github.com/graphql/graphql-js.git /root/tools/graphql-tools # Create necessary directories for the application RUN mkdir -p /app/reports /app/templates # Set working directory WORKDIR /app # Copy application files COPY . /app/ # Add Go binaries to PATH ENV PATH="/root/go/bin:$PATH" # Set entrypoint CMD ["python", "pentest-tools-mcp-server.py"] ``` -------------------------------------------------------------------------------- /pentest-tools-mcp-server.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 from typing import Any, List, Optional, Dict from mcp.server.fastmcp import FastMCP import httpx import subprocess import json import asyncio import re from pathlib import Path from datetime import datetime import yaml import aiofiles import aiohttp from bs4 import BeautifulSoup # import jwt import base64 import logging import threading from concurrent.futures import ThreadPoolExecutor import urllib.parse import os from dotenv import load_dotenv # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filename='pentest.log' ) logger = logging.getLogger('pentest-tools') # Initialize FastMCP server mcp = FastMCP("pentest-tools") # Configuration CONFIG = { "wordlists": { "dirsearch": "/usr/share/wordlists/pentest-tools/dirsearch.txt", "common": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/Web-Content/common.txt", "api_endpoints": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/Web-Content/api-endpoints.txt", "subdomains": "/usr/share/wordlists/pentest-tools/SecLists/Discovery/DNS/subdomains-top1million-5000.txt", "passwords": "/usr/share/wordlists/pentest-tools/SecLists/Passwords/Common-Credentials/10-million-password-list-top-1000.txt", "jwt_secrets": "/usr/share/wordlists/pentest-tools/SecLists/Passwords/Common-Credentials/common-secrets.txt", "xss": "/usr/share/wordlists/pentest-tools/xss-payloads.txt", "sqli": "/usr/share/wordlists/pentest-tools/sqli-payloads.txt", "lfi": "/usr/share/wordlists/pentest-tools/lfi-payloads.txt", "ssrf": "/usr/share/wordlists/pentest-tools/ssrf-payloads.txt" }, "tools": { "xsstrike": "/root/tools/XSStrike/xsstrike.py", "nuclei_templates": "/root/tools/nuclei-templates", "crlfuzz": "/root/go/bin/crlfuzz", "graphql_path": "/root/tools/graphql-tools" }, "reporting": { "output_dir": "reports", "template_dir": "templates" } } # Load environment variables load_dotenv() def format_ffuf_results(results): """Format the FFuf results in a readable format. Args: results: FFuf JSON results Returns: Formatted string with FFuf results """ formatted = [] # Add summary if "stats" in results: stats = results["stats"] formatted.append(f"Total requests: {stats.get('total', 0)}") formatted.append(f"Duration: {stats.get('elapsed', 0)} seconds") formatted.append(f"Requests per second: {stats.get('req_sec', 0)}") # Add results if "results" in results: formatted.append("\nDiscovered URLs:") for item in results["results"]: status = item.get("status", "") url = item.get("url", "") size = item.get("length", 0) formatted.append(f"[Status: {status}] [{size} bytes] {url}") return "\n".join(formatted) @mcp.tool() async def advanced_directory_scan(url: str, extensions: List[str] = None) -> str: """Advanced directory and file scanning with multiple tools and techniques. Args: url: Target URL extensions: List of file extensions to scan for """ if extensions is None: extensions = ["php", "asp", "aspx", "jsp", "js", "txt", "conf", "bak", "backup", "swp", "old", "db", "sql"] results = [] # 1. FFuf with advanced options try: ffuf_cmd = [ "ffuf", "-u", f"{url}/FUZZ", "-w", CONFIG["wordlists"]["dirsearch"], "-e", ",".join(extensions), "-recursion", "-recursion-depth", "3", "-mc", "all", "-ac", "-o", "ffuf.json", "-of", "json" ] subprocess.run(ffuf_cmd, check=True) with open("ffuf.json") as f: ffuf_results = json.load(f) results.append("=== FFuf Results ===\n" + format_ffuf_results(ffuf_results)) except Exception as e: results.append(f"FFuf error: {str(e)}") # 2. Dirsearch with advanced options try: dirsearch_cmd = [ "dirsearch", "-u", url, "-e", ",".join(extensions), "--deep-recursive", "--force-recursive", "--exclude-status", "404", "-o", "dirsearch.json", "--format", "json" ] subprocess.run(dirsearch_cmd, check=True) with open("dirsearch.json") as f: dirsearch_results = json.load(f) results.append("=== Dirsearch Results ===\n" + json.dumps(dirsearch_results, indent=2)) except Exception as e: results.append(f"Dirsearch error: {str(e)}") return "\n\n".join(results) @mcp.tool() async def advanced_api_scan(url: str) -> str: """Advanced API security testing with multiple techniques. Args: url: Target API URL """ results = [] # 1. GraphQL Security Testing if "/graphql" in url or "/graphiql" in url: try: # Introspection query graphql_query = """ query IntrospectionQuery { __schema { types { name, fields { name, type { name } } } queryType { name } mutationType { name } subscriptionType { name } } } """ async with httpx.AsyncClient(verify=False) as client: response = await client.post(f"{url}", json={"query": graphql_query}) if response.status_code == 200: results.append("=== GraphQL Schema ===\n" + json.dumps(response.json(), indent=2)) # Test for common GraphQL vulnerabilities vulns = await test_graphql_vulnerabilities(url, response.json()) results.append("=== GraphQL Vulnerabilities ===\n" + vulns) except Exception as e: results.append(f"GraphQL testing error: {str(e)}") # 2. REST API Testing try: # Test common REST endpoints common_paths = ["/v1", "/v2", "/api", "/api/v1", "/api/v2", "/swagger", "/docs", "/openapi.json"] async with httpx.AsyncClient(verify=False) as client: for path in common_paths: response = await client.get(f"{url}{path}") if response.status_code != 404: results.append(f"\nFound API endpoint: {path}") results.append(f"Status: {response.status_code}") results.append(f"Response: {response.text[:500]}...") # If Swagger/OpenAPI found, parse and test endpoints if "swagger" in path or "openapi" in path: api_spec = response.json() results.append("\n=== Testing API Endpoints ===") for endpoint, methods in api_spec.get("paths", {}).items(): for method, details in methods.items(): test_result = await test_api_endpoint(url, endpoint, method, details) results.append(test_result) except Exception as e: results.append(f"REST API testing error: {str(e)}") return "\n\n".join(results) @mcp.tool() async def advanced_xss_scan(url: str) -> str: """Advanced XSS vulnerability scanning. Args: url: Target URL """ results = [] # 1. XSStrike with advanced options try: xsstrike_cmd = [ "python3", CONFIG["tools"]["xsstrike"], "-u", url, "--crawl", "--params", "--fuzzer", "--blind", "--vectors", CONFIG["wordlists"]["xss"] ] xsstrike = subprocess.check_output(xsstrike_cmd, text=True) results.append("=== XSStrike Results ===\n" + xsstrike) except Exception as e: results.append(f"XSStrike error: {str(e)}") # 2. Custom XSS testing try: async with httpx.AsyncClient(verify=False) as client: # Get all input parameters response = await client.get(url) soup = BeautifulSoup(response.text, 'html.parser') # Test input fields for input_field in soup.find_all(['input', 'textarea']): field_name = input_field.get('name', '') if field_name: # Test various XSS payloads with open(CONFIG["wordlists"]["xss"]) as f: for payload in f: payload = payload.strip() data = {field_name: payload} response = await client.post(url, data=data) if payload in response.text: results.append(f"Potential XSS found in {field_name} with payload: {payload}") except Exception as e: results.append(f"Custom XSS testing error: {str(e)}") return "\n\n".join(results) @mcp.tool() async def advanced_sqli_scan(url: str) -> str: """Advanced SQL injection testing. Args: url: Target URL """ results = [] # 1. SQLMap with advanced options try: sqlmap_cmd = [ "sqlmap", "-u", url, "--batch", "--random-agent", "--level", "5", "--risk", "3", "--threads", "10", "--tamper=space2comment,between,randomcase", "--time-sec", "1", "--dump" ] sqlmap = subprocess.check_output(sqlmap_cmd, text=True) results.append("=== SQLMap Results ===\n" + sqlmap) except Exception as e: results.append(f"SQLMap error: {str(e)}") # 2. Custom SQL injection testing try: async with httpx.AsyncClient(verify=False) as client: # Test parameters params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query) for param in params: with open(CONFIG["wordlists"]["sqli"]) as f: for payload in f: payload = payload.strip() test_url = url.replace(f"{param}={params[param][0]}", f"{param}={payload}") response = await client.get(test_url) # Check for SQL errors sql_errors = [ "SQL syntax", "mysql_fetch_array", "ORA-", "PostgreSQL", "SQLite3::" ] for error in sql_errors: if error in response.text: results.append(f"Potential SQL injection in parameter {param} with payload: {payload}") except Exception as e: results.append(f"Custom SQLi testing error: {str(e)}") return "\n\n".join(results) @mcp.tool() async def advanced_ssrf_scan(url: str) -> str: """Advanced Server-Side Request Forgery testing. Args: url: Target URL """ results = [] try: async with httpx.AsyncClient(verify=False) as client: # Test various SSRF payloads with open(CONFIG["wordlists"]["ssrf"]) as f: for payload in f: payload = payload.strip() # Test in different parameter positions params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query) for param in params: test_url = url.replace(f"{param}={params[param][0]}", f"{param}={payload}") response = await client.get(test_url) # Check for successful SSRF if response.status_code == 200 and len(response.text) > 0: results.append(f"Potential SSRF in parameter {param} with payload: {payload}") results.append(f"Response length: {len(response.text)}") results.append(f"Response preview: {response.text[:200]}...") except Exception as e: results.append(f"SSRF testing error: {str(e)}") return "\n\n".join(results) @mcp.tool() async def test_graphql_vulnerabilities(url: str, schema: dict) -> str: """Test GraphQL specific vulnerabilities. Args: url: GraphQL endpoint URL schema: GraphQL schema from introspection """ results = [] try: async with httpx.AsyncClient(verify=False) as client: # 1. Test for DoS via nested queries nested_query = "query {\n " + "user { ".repeat(10) + "id " + "}".repeat(10) + "\n}" response = await client.post(url, json={"query": nested_query}) if response.status_code != 200: results.append("Potential DoS vulnerability - nested queries not properly limited") # 2. Test for sensitive data exposure sensitive_types = ["User", "Admin", "Password", "Token", "Secret"] for type_obj in schema["__schema"]["types"]: if any(sensitive in type_obj["name"] for sensitive in sensitive_types): results.append(f"Potential sensitive data exposure in type: {type_obj['name']}") # 3. Test for batch query attacks batch_query = [{"query": "{ __schema { types { name } } }"} for _ in range(100)] response = await client.post(url, json=batch_query) if response.status_code == 200: results.append("Batch queries allowed - potential DoS vector") except Exception as e: results.append(f"GraphQL vulnerability testing error: {str(e)}") return "\n\n".join(results) @mcp.tool() async def test_api_endpoint(base_url: str, endpoint: str, method: str, details: dict) -> str: """Test individual API endpoint for vulnerabilities. Args: base_url: Base API URL endpoint: API endpoint path method: HTTP method details: Endpoint details from OpenAPI spec """ results = [] try: async with httpx.AsyncClient(verify=False) as client: # 1. Test without authentication response = await client.request(method, f"{base_url}{endpoint}") if response.status_code != 401: results.append(f"Endpoint {endpoint} accessible without auth") # 2. Test parameter fuzzing if "parameters" in details: for param in details["parameters"]: if param["in"] == "query": # Test SQL injection test_url = f"{base_url}{endpoint}?{param['name']}=1' OR '1'='1" response = await client.request(method, test_url) if "error" in response.text.lower(): results.append(f"Potential SQL injection in parameter {param['name']}") # Test XSS test_url = f"{base_url}{endpoint}?{param['name']}=<script>alert(1)</script>" response = await client.request(method, test_url) if "<script>alert(1)</script>" in response.text: results.append(f"Potential XSS in parameter {param['name']}") # 3. Test for mass assignment if method.lower() in ["post", "put"] and "requestBody" in details: schema = details["requestBody"]["content"]["application/json"]["schema"] if "properties" in schema: # Try to inject admin/privileged fields payload = { "isAdmin": True, "role": "admin", "privileges": ["admin"], **{k: "test" for k in schema["properties"].keys()} } response = await client.request(method, f"{base_url}{endpoint}", json=payload) if response.status_code == 200: results.append(f"Potential mass assignment vulnerability in {endpoint}") except Exception as e: results.append(f"API endpoint testing error: {str(e)}") return "\n\n".join(results) @mcp.tool() async def advanced_recon(domain: str) -> str: """Perform advanced reconnaissance on a target domain. Args: domain: Target domain name """ results = [] # 1. Subdomain Enumeration try: # Subfinder subdomains = subprocess.check_output(["subfinder", "-d", domain, "-silent"], text=True) results.append("=== Subfinder Results ===\n" + subdomains) # Amass amass = subprocess.check_output([ "amass", "enum", "-d", domain, "-passive", "-silent" ], text=True) results.append("=== Amass Results ===\n" + amass) # Assetfinder assetfinder = subprocess.check_output(["assetfinder", "--subs-only", domain], text=True) results.append("=== Assetfinder Results ===\n" + assetfinder) # GitHub Subdomains github_subdomains = subprocess.check_output([ "github-subdomains", "-d", domain, "-t", "YOUR_GITHUB_TOKEN" ], text=True) results.append("=== GitHub Subdomains ===\n" + github_subdomains) except Exception as e: results.append(f"Subdomain enumeration error: {str(e)}") # 2. Port Scanning & Service Detection try: # Quick Nmap scan nmap_quick = subprocess.check_output([ "nmap", "-sV", "-sC", "--min-rate", "1000", "-T4", domain ], text=True) results.append("=== Quick Nmap Scan ===\n" + nmap_quick) # Detailed scan of web ports nmap_web = subprocess.check_output([ "nmap", "-p", "80,443,8080,8443", "-sV", "--script=http-enum,http-headers,http-methods,http-title", domain ], text=True) results.append("=== Web Services Scan ===\n" + nmap_web) except Exception as e: results.append(f"Port scanning error: {str(e)}") # 3. Technology Detection try: # Wappalyzer wappalyzer = subprocess.check_output(["wappalyzer", f"https://{domain}"], text=True) results.append("=== Technologies (Wappalyzer) ===\n" + wappalyzer) # Whatweb whatweb = subprocess.check_output(["whatweb", "-a", "3", domain], text=True) results.append("=== Technologies (Whatweb) ===\n" + whatweb) except Exception as e: results.append(f"Technology detection error: {str(e)}") # 4. DNS Information try: # DNSRecon dnsrecon = subprocess.check_output(["dnsrecon", "-d", domain, "-t", "std,axfr,srv"], text=True) results.append("=== DNS Information ===\n" + dnsrecon) # DNS Zone Transfer dig_axfr = subprocess.check_output(["dig", "axfr", domain], text=True) results.append("=== DNS Zone Transfer ===\n" + dig_axfr) except Exception as e: results.append(f"DNS enumeration error: {str(e)}") # 5. Web Archive try: # Waybackurls wayback = subprocess.check_output(["waybackurls", domain], text=True) results.append("=== Historical URLs ===\n" + wayback) # Gau gau = subprocess.check_output(["gau", domain], text=True) results.append("=== GAU URLs ===\n" + gau) except Exception as e: results.append(f"Web archive error: {str(e)}") # 6. SSL/TLS Analysis try: # SSLyze sslyze = subprocess.check_output([ "sslyze", "--regular", domain ], text=True) results.append("=== SSL/TLS Analysis ===\n" + sslyze) except Exception as e: results.append(f"SSL analysis error: {str(e)}") # 7. Email Discovery try: # TheHarvester harvester = subprocess.check_output([ "theHarvester", "-d", domain, "-b", "all" ], text=True) results.append("=== Email Addresses ===\n" + harvester) except Exception as e: results.append(f"Email discovery error: {str(e)}") # 8. GitHub Reconnaissance try: # GitHound githound = subprocess.check_output([ "githound", "--subdomain-file", "subdomains.txt", "--threads", "10", domain ], text=True) results.append("=== GitHub Secrets ===\n" + githound) except Exception as e: results.append(f"GitHub recon error: {str(e)}") # 9. Content Discovery try: # Hakrawler hakrawler = subprocess.check_output([ "hakrawler", "-url", f"https://{domain}", "-depth", "3", "-plain" ], text=True) results.append("=== Content Discovery ===\n" + hakrawler) except Exception as e: results.append(f"Content discovery error: {str(e)}") return "\n\n".join(results) @mcp.tool() async def analyze_recon_data(domain: str, recon_results: str) -> str: """Analyze reconnaissance data and identify potential security issues. Args: domain: Target domain recon_results: Results from reconnaissance """ findings = [] try: # 1. Analyze subdomains if "Subfinder Results" in recon_results: subdomains = re.findall(r'[\w\-\.]+\.'+domain, recon_results) findings.append(f"Found {len(subdomains)} subdomains") # Check for interesting subdomains interesting = [s for s in subdomains if any(x in s for x in ['dev', 'stage', 'test', 'admin', 'internal'])] if interesting: findings.append(f"Potentially sensitive subdomains: {', '.join(interesting)}") # 2. Analyze ports if "Nmap Scan" in recon_results: open_ports = re.findall(r'(\d+)/tcp\s+open', recon_results) findings.append(f"Found {len(open_ports)} open ports") # Check for dangerous ports dangerous = [p for p in open_ports if p in ['21', '23', '3389', '445', '135']] if dangerous: findings.append(f"Potentially dangerous ports open: {', '.join(dangerous)}") # 3. Analyze technologies if "Technologies" in recon_results: # Check for outdated versions outdated = re.findall(r'([\w\-]+) ([\d\.]+)', recon_results) for tech, version in outdated: findings.append(f"Detected {tech} version {version} - check for vulnerabilities") # 4. Analyze SSL/TLS if "SSL/TLS Analysis" in recon_results: if "SSLv2" in recon_results or "SSLv3" in recon_results: findings.append("WARNING: Outdated SSL protocols detected") if "TLSv1.0" in recon_results: findings.append("WARNING: TLS 1.0 is enabled") # 5. Analyze DNS if "DNS Information" in recon_results: if "AXFR" in recon_results: findings.append("WARNING: DNS Zone Transfer possible") # 6. Analyze web content if "Content Discovery" in recon_results: sensitive_files = re.findall(r'([\w\-\/]+\.(php|asp|aspx|jsp|config|env|git))', recon_results) if sensitive_files: findings.append(f"Found {len(sensitive_files)} potentially sensitive files") # 7. Analyze GitHub findings if "GitHub Secrets" in recon_results: if any(x in recon_results.lower() for x in ['password', 'secret', 'key', 'token']): findings.append("WARNING: Potential secrets found in GitHub repositories") except Exception as e: findings.append(f"Analysis error: {str(e)}") return "\n\n".join(findings) @mcp.tool() async def advanced_full_scan(target: str, options: dict = None) -> str: """Perform comprehensive security assessment with advanced options. Args: target: Target domain/IP/application options: Scan configuration options """ if options is None: options = { "recon": True, "directory": True, "api": True, "xss": True, "sqli": True, "ssrf": True, "report": True } results = {} scan_tasks = [] # 1. Reconnaissance if options.get("recon", True): recon_results = await advanced_recon(target) analysis = await analyze_recon_data(target, recon_results) results["recon"] = { "raw_results": recon_results, "analysis": analysis } # 2. Directory Scanning if options.get("directory", True): scan_tasks.append(advanced_directory_scan(f"https://{target}")) # Continue with other scans... if options.get("api", True): scan_tasks.append(advanced_api_scan(f"https://{target}")) if options.get("xss", True): scan_tasks.append(advanced_xss_scan(f"https://{target}")) if options.get("sqli", True): scan_tasks.append(advanced_sqli_scan(f"https://{target}")) if options.get("ssrf", True): scan_tasks.append(advanced_ssrf_scan(f"https://{target}")) # Run remaining tasks concurrently scan_results = await asyncio.gather(*scan_tasks, return_exceptions=True) # Add other results for i, task_name in enumerate(["directory", "api", "xss", "sqli", "ssrf"]): if i < len(scan_results): results[task_name] = scan_results[i] # Generate report if options.get("report", True): report_type = options.get("report_type", "html") await generate_report({"target": target, "results": results}, report_type) return json.dumps(results, indent=2) @mcp.tool() async def generate_report(scan_results: dict, report_type: str = "html") -> str: """Generate a comprehensive security report. Args: scan_results: Dictionary containing all scan results report_type: Output format (html, pdf, json) """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_file = f"{CONFIG['reporting']['output_dir']}/report_{timestamp}.{report_type}" try: if report_type == "html": # Use template to generate HTML report with open(f"{CONFIG['reporting']['template_dir']}/report.html") as f: template = f.read() # Replace placeholders with results report_content = template.replace("{{RESULTS}}", json.dumps(scan_results, indent=2)) with open(report_file, "w") as f: f.write(report_content) elif report_type == "pdf": # Convert HTML to PDF import pdfkit pdfkit.from_string(report_content, report_file) elif report_type == "json": with open(report_file, "w") as f: json.dump(scan_results, f, indent=2) return f"Report generated: {report_file}" except Exception as e: return f"Report generation error: {str(e)}" @mcp.tool() async def send_http_request(url: str, method: str = "GET", headers: dict = None, data: str = None, verify_ssl: bool = False, timeout: int = 30) -> str: """Send an HTTP request to a URL and read the response with headers. Args: url: Target URL to send request to method: HTTP method (GET, POST, PUT, DELETE, etc.) headers: Dictionary of HTTP headers to include data: Request body data verify_ssl: Whether to verify SSL certificates timeout: Request timeout in seconds Returns: String containing response headers and body """ try: result = [] result.append(f"=== Request to {url} ===") async with httpx.AsyncClient(verify=verify_ssl, timeout=float(timeout)) as client: if method.upper() == "GET": response = await client.get(url, headers=headers) elif method.upper() == "POST": response = await client.post(url, headers=headers, content=data) elif method.upper() == "PUT": response = await client.put(url, headers=headers, content=data) elif method.upper() == "DELETE": response = await client.delete(url, headers=headers) elif method.upper() == "HEAD": response = await client.head(url, headers=headers) elif method.upper() == "OPTIONS": response = await client.options(url, headers=headers) elif method.upper() == "PATCH": response = await client.patch(url, headers=headers, content=data) else: return f"Unsupported HTTP method: {method}" # Add response information result.append(f"Status Code: {response.status_code}") # Add headers result.append("\n=== Response Headers ===") for header, value in response.headers.items(): result.append(f"{header}: {value}") # Add response body result.append("\n=== Response Body ===") if 'application/json' in response.headers.get('content-type', ''): try: formatted_json = json.dumps(response.json(), indent=2) result.append(formatted_json) except: result.append(response.text) else: result.append(response.text) return "\n".join(result) except Exception as e: return f"Error sending HTTP request: {str(e)}" if __name__ == "__main__": # Create necessary directories Path(CONFIG["reporting"]["output_dir"]).mkdir(parents=True, exist_ok=True) # Initialize and run the server with stdio transport mcp.run(transport='stdio') ```