#
tokens: 5446/50000 9/9 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .dockerignore
├── client_config.json
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── src
│   └── nmap_mcp
│       ├── __init__.py
│       ├── __main__.py
│       ├── __pycache__
│       │   ├── __init__.cpython-313.pyc
│       │   ├── __main__.cpython-313.pyc
│       │   └── server.cpython-313.pyc
│       ├── server.py
│       └── test.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------

```
.git
.gitignore
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
.env
.venv
venv/
ENV/
.idea/
.vscode/
*.swp
*.swo 
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Nmap MCP Server

This is a Model Control Protocol (MCP) server that provides access to nmap network scanning functionality.

## Features

- Run nmap scans on specified targets with customizable options
- Store and retrieve scan results
- Analyze scan results using AI prompts

## Installation

Requirements:
- Python 3.10+
- python-libnmap
- nmap (installed on the system)

```bash
pip install python-libnmap
```

Make sure nmap is installed on your system:
```bash
# On Debian/Ubuntu
sudo apt-get install nmap

# On Fedora/CentOS
sudo dnf install nmap
```

## Usage

### Running the Server

To run the server directly from the source code:

```bash
python -m src.nmap_mcp
```

To install the package and run as a command:

```bash
pip install -e .
nmap-mcp
```

### Available Tools

1. **run-nmap-scan**
   - Run an nmap scan on specified targets
   - Parameters:
     - `target`: Target host or network (e.g., 192.168.1.1 or 192.168.1.0/24)
     - `options`: Nmap options (e.g., -sV -p 1-1000)

2. **get-scan-details**
   - Get detailed information about a specific scan
   - Parameters:
     - `scan_id`: ID of the scan to retrieve

3. **list-all-scans**
   - List all available scan results
   - No parameters required

### Available Prompts

1. **analyze-scan**
   - Analyze an nmap scan result
   - Parameters:
     - `scan_id`: ID of the scan to analyze
     - `focus`: Focus area (security/services/overview)

### Resources

Scan results are available as resources with the `nmap://scan/{scan_id}` URI scheme.

## Example Workflow

1. Run a scan:
   ```
   Call tool: run-nmap-scan
   Parameters: {"target": "192.168.1.0/24", "options": "-sV -p 22,80,443"}
   ```

2. Get scan details:
   ```
   Call tool: get-scan-details
   Parameters: {"scan_id": "<scan_id from previous step>"}
   ```

3. List all scans:
   ```
   Call tool: list-all-scans
   ```

4. Analyze scan results:
   ```
   Get prompt: analyze-scan
   Parameters: {"scan_id": "<scan_id>", "focus": "security"}
   ```

## Security Considerations

This server executes nmap commands on your system. Be cautious when scanning networks you don't own or have permission to scan, as unauthorized scanning may be illegal in some jurisdictions.

## Troubleshooting

If you encounter errors related to nmap not being found or being executed incorrectly:

1. Make sure nmap is installed and available in your PATH
2. Check the logs for which nmap executable is being used
3. The server will attempt to use the full path to nmap to avoid conflicts

## Docker Usage

You can run the MCP server in a Docker container:

```bash
# Build the Docker image
docker build -t nmap-mcp-server .

# Run the Docker container
docker run -it --rm nmap-mcp-server
```

For integration with the Glama MCP directory, the Docker container allows others to easily use this MCP server without worrying about installation dependencies.

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
```

--------------------------------------------------------------------------------
/client_config.json:
--------------------------------------------------------------------------------

```json
{
  "mcpServers": {
    "nmap-mcp": {
      "command": "uv",
      "args": [
        "--directory",
        "/home/kali/mcp-servers/nmap",
        "run",
        "nmap-mcp"
      ]
    }
  }
} 
```

--------------------------------------------------------------------------------
/src/nmap_mcp/__main__.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Main entry point for the nmap MCP server.
This allows running the server as `python -m src.nmap_mcp`.
"""

from . import main

if __name__ == "__main__":
    # Run the package's main function
    main() 
```

--------------------------------------------------------------------------------
/src/nmap_mcp/__init__.py:
--------------------------------------------------------------------------------

```python
"""
Nmap MCP Server Package

This package provides an MCP (Model Control Protocol) interface for running nmap scans.
It allows AI assistants to run network scans and analyze the results.
"""

__version__ = "0.1.0"

from . import server
import asyncio

def main():
    """Main entry point for the package."""
    asyncio.run(server.main())

# Optionally expose other important items at package level
__all__ = ['main', 'server']
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "nmap-mcp"
version = "0.1.0"
description = "A MCP server for nmap network scanning"
readme = "README.md"
requires-python = ">=3.10"
license = {text = "MIT"}
dependencies = [
    "mcp>=1.4.1",
    "python-libnmap>=0.7.2"
]

[build-system]
requires = [ "hatchling",]
build-backend = "hatchling.build"

[project.scripts]
nmap-mcp = "nmap_mcp:main"

[tool.hatch.build.targets.wheel]
packages = ["src/nmap_mcp"]

[tool.hatch.build]
packages = ["src"]

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM python:3.10-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \
    nmap \
    && rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /app

# Copy project files
COPY pyproject.toml README.md LICENSE ./
COPY src ./src/

# Install Python dependencies and the package
RUN pip install --no-cache-dir -e .

# Expose port if needed
# Note: MCP servers often use stdio, but if you need a port, uncomment and set accordingly
# EXPOSE 8080

# Run the MCP server
CMD ["python", "-m", "src.nmap_mcp"] 
```

--------------------------------------------------------------------------------
/src/nmap_mcp/test.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Simple test script for the nmap MCP server.
This script tests basic functionality without requiring a full MCP client.
"""

import asyncio
import sys
import time
from libnmap.process import NmapProcess
from libnmap.parser import NmapParser

async def test_nmap_functionality():
    """Test that we can run nmap scans and parse results."""
    print("Testing nmap functionality...")
    
    # Test local scan with minimal options to ensure nmap works
    target = "127.0.0.1"
    options = "-p 22,80 -sV"
    
    print(f"Running nmap scan on {target} with options: {options}")
    nm_process = NmapProcess(target, options)
    rc = nm_process.run()
    
    if rc != 0:
        print(f"Error running nmap: {nm_process.stderr}")
        return False
    
    try:
        parsed = NmapParser.parse(nm_process.stdout)
        
        print("\nScan results:")
        print(f"  Target: {target}")
        print(f"  Start time: {parsed.started}")
        print(f"  Hosts found: {len(parsed.hosts)}")
        
        for host in parsed.hosts:
            print(f"\n  Host: {host.address}")
            print(f"  Status: {host.status}")
            print(f"  Services:")
            
            for service in host.services:
                print(f"    - Port {service.port}/{service.protocol}: {service.state}")
                if service.service:
                    print(f"      Service: {service.service}")
                if service.banner:
                    print(f"      Banner: {service.banner}")
        
        print("\nNmap functionality test passed!")
        return True
        
    except Exception as e:
        print(f"Error parsing nmap results: {str(e)}")
        return False

if __name__ == "__main__":
    print("Nmap MCP Server Test Script")
    print("--------------------------")
    
    # Check if python-libnmap is installed
    try:
        print("Checking for python-libnmap...")
        import libnmap
        print(f"Found libnmap version: {libnmap.__version__}")
    except ImportError:
        print("Error: python-libnmap is not installed.")
        print("Please install it with: pip install python-libnmap")
        sys.exit(1)
    
    # Check if nmap is installed and available
    try:
        print("\nChecking for nmap...")
        test_process = NmapProcess("localhost", "-sV -p 22")
        test_process.run_background()
        
        # Wait for the process to complete with a timeout
        timeout = 30  # 30 seconds timeout
        start_time = time.time()
        while test_process.is_running() and time.time() - start_time < timeout:
            time.sleep(0.5)
            
        if test_process.is_running():
            test_process.stop()
            raise Exception("Nmap test timed out")
            
        if test_process.rc != 0:
            raise Exception(f"Nmap test failed with return code {test_process.rc}")
            
        print("Nmap is available and working.")
    except Exception as e:
        print(f"Error: {str(e)}")
        print("Please make sure nmap is installed and available in your PATH.")
        sys.exit(1)
    
    # Run the async test
    print("\nRunning functional tests...")
    result = asyncio.run(test_nmap_functionality())
    
    if not result:
        print("\nTEST FAILED: There were errors during the test.")
        sys.exit(1)
    
    print("\nAll tests passed! The nmap MCP server should work correctly.")
    print("You can now run the server with: python -m src.nmap.server") 
```

--------------------------------------------------------------------------------
/src/nmap_mcp/server.py:
--------------------------------------------------------------------------------

```python
import asyncio
import json
import uuid
import time
import logging
import shutil
import subprocess
from typing import Dict, List, Optional, Set

from libnmap.process import NmapProcess
from libnmap.parser import NmapParser
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
from pydantic import AnyUrl
import mcp.server.stdio

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("nmap-mcp")

# Store scan results as a dictionary with scan_id as the key
scan_results: Dict[str, Dict] = {}

# Track ongoing scans to prevent duplicates
ongoing_scans: Set[str] = set()

# Rate limiting settings
RATE_LIMIT_PERIOD = 60  # seconds
RATE_LIMIT_MAX_SCANS = 3
last_scan_times = []

# Find the full path to the nmap executable
NMAP_PATH = shutil.which("nmap")
if not NMAP_PATH:
    logger.error("Could not find nmap executable. Please ensure nmap is installed.")
    # Default to standard path if not found
    NMAP_PATH = "/usr/bin/nmap"

logger.info(f"Using nmap executable at: {NMAP_PATH}")

server = Server("nmap")

@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
    """
    List available scan results as resources.
    Each scan result is exposed as a resource with a custom nmap:// URI scheme.
    """
    return [
        types.Resource(
            uri=AnyUrl(f"nmap://scan/{scan_id}"),
            name=f"Scan: {scan_data.get('target', 'Unknown')}",
            description=f"Nmap scan of {scan_data.get('target', 'Unknown')} - {scan_data.get('timestamp', 'Unknown')}",
            mimeType="application/json",
        )
        for scan_id, scan_data in scan_results.items()
    ]

@server.read_resource()
async def handle_read_resource(uri: AnyUrl) -> str:
    """
    Read a specific scan result by its URI.
    The scan ID is extracted from the URI path component.
    """
    if uri.scheme != "nmap":
        raise ValueError(f"Unsupported URI scheme: {uri.scheme}")

    scan_id = uri.path
    if scan_id is not None:
        scan_id = scan_id.lstrip("/")
        if scan_id in scan_results:
            return json.dumps(scan_results[scan_id], indent=2)
    raise ValueError(f"Scan result not found: {scan_id}")

@server.list_prompts()
async def handle_list_prompts() -> list[types.Prompt]:
    """
    List available prompts related to nmap scanning.
    """
    return [
        types.Prompt(
            name="analyze-scan",
            description="Analyze an nmap scan result",
            arguments=[
                types.PromptArgument(
                    name="scan_id",
                    description="ID of the scan to analyze",
                    required=True,
                ),
                types.PromptArgument(
                    name="focus",
                    description="Focus area (security/services/overview)",
                    required=False,
                )
            ],
        )
    ]

@server.get_prompt()
async def handle_get_prompt(
    name: str, arguments: dict[str, str] | None
) -> types.GetPromptResult:
    """
    Generate a prompt for analyzing nmap scan results.
    """
    if name != "analyze-scan":
        raise ValueError(f"Unknown prompt: {name}")

    if not arguments or "scan_id" not in arguments:
        raise ValueError("Missing scan_id argument")

    scan_id = arguments["scan_id"]
    focus = arguments.get("focus", "overview")

    if scan_id not in scan_results:
        raise ValueError(f"Scan result not found: {scan_id}")

    scan_data = scan_results[scan_id]
    
    focus_prompt = ""
    if focus == "security":
        focus_prompt = "Focus on security vulnerabilities and potential risks."
    elif focus == "services":
        focus_prompt = "Focus on identifying running services and their versions."
    else:  # overview
        focus_prompt = "Provide a general overview of the scan results."

    return types.GetPromptResult(
        description=f"Analyze nmap scan results for {scan_data.get('target', 'Unknown')}",
        messages=[
            types.PromptMessage(
                role="user",
                content=types.TextContent(
                    type="text",
                    text=f"Analyze the following nmap scan results. {focus_prompt}\n\n{json.dumps(scan_data, indent=2)}",
                ),
            )
        ],
    )

@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
    """
    List available tools for nmap scanning.
    """
    return [
        types.Tool(
            name="run-nmap-scan",
            description="Run an nmap scan on specified targets",
            inputSchema={
                "type": "object",
                "properties": {
                    "target": {"type": "string", "description": "Target host or network (e.g., 192.168.1.1 or 192.168.1.0/24)"},
                    "options": {"type": "string", "description": "Nmap options (e.g., -sV -p 1-1000)"},
                },
                "required": ["target"],
            },
        ),
        types.Tool(
            name="get-scan-details",
            description="Get detailed information about a specific scan",
            inputSchema={
                "type": "object",
                "properties": {
                    "scan_id": {"type": "string", "description": "ID of the scan to retrieve"},
                },
                "required": ["scan_id"],
            },
        ),
        types.Tool(
            name="list-all-scans",
            description="List all available scan results",
            inputSchema={
                "type": "object",
                "properties": {},
            },
        )
    ]

def check_rate_limit() -> bool:
    """Check if we're exceeding the rate limit."""
    global last_scan_times
    current_time = time.time()
    
    # Remove timestamps older than the rate limit period
    last_scan_times = [t for t in last_scan_times if current_time - t < RATE_LIMIT_PERIOD]
    
    # Check if we're under the limit
    return len(last_scan_times) < RATE_LIMIT_MAX_SCANS

def add_scan_timestamp():
    """Add current timestamp to track rate limiting."""
    global last_scan_times
    last_scan_times.append(time.time())

def run_nmap_directly(target, options):
    """Run nmap directly using subprocess instead of relying on python-libnmap."""
    try:
        # Construct the basic command with XML output
        cmd = [NMAP_PATH, "-oX", "-"]
        
        # Split options into separate arguments
        if options:
            option_args = options.split()
            cmd.extend(option_args)
            
        # Add target at the end
        cmd.append(target)
        logger.info(f"Executing nmap command: {' '.join(cmd)}")
        
        # Run the command and capture both stdout and stderr
        process = subprocess.run(
            cmd,
            capture_output=True,
            text=False,
            check=True
        )
        
        return process.stdout, None
    except subprocess.CalledProcessError as e:
        return None, f"nmap failed with exit code {e.returncode}: {e.stderr.decode('utf-8', errors='replace')}"
    except Exception as e:
        return None, str(e)

@server.call_tool()
async def handle_call_tool(
    name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    """
    Handle tool execution requests for nmap scanning.
    """
    if not arguments and name != "list-all-scans":
        raise ValueError("Missing arguments")

    if name == "run-nmap-scan":
        target = arguments.get("target")
        options = arguments.get("options", "-sV")  # Default to version detection

        if not target:
            raise ValueError("Missing target")
            
        # Create a unique scan identifier based on target and options
        scan_key = f"{target}:{options}"
        
        # Check if an identical scan is already running
        if scan_key in ongoing_scans:
            return [
                types.TextContent(
                    type="text",
                    text=f"A scan with the same target and options is already running. Please wait for it to complete.",
                )
            ]
            
        # Check rate limiting
        if not check_rate_limit():
            return [
                types.TextContent(
                    type="text",
                    text=f"Rate limit exceeded. Please wait before starting another scan. Maximum {RATE_LIMIT_MAX_SCANS} scans per {RATE_LIMIT_PERIOD} seconds.",
                )
            ]
            
        try:
            # Mark this scan as ongoing
            ongoing_scans.add(scan_key)
            add_scan_timestamp()
            
            logger.info(f"Starting nmap scan on {target} with options {options}")
            
            # Use direct subprocess call instead of NmapProcess
            stdout, stderr = run_nmap_directly(target, options)
            
            if stderr:
                logger.error(f"Nmap scan failed: {stderr}")
                return [
                    types.TextContent(
                        type="text",
                        text=f"Nmap scan failed: {stderr}",
                    )
                ]

            # Parse results - convert bytes to string first
            try:
                xml_string = stdout.decode('utf-8', errors='replace')
                parsed = NmapParser.parse_fromstring(xml_string)
            except Exception as e:
                logger.error(f"Error parsing nmap results: {str(e)}")
                return [
                    types.TextContent(
                        type="text",
                        text=f"Error parsing nmap results: {str(e)}",
                    )
                ]
            
            # Generate a unique ID for this scan
            scan_id = str(uuid.uuid4())
            
            # Store scan results
            scan_results[scan_id] = {
                "target": target,
                "options": options,
                "timestamp": parsed.started,
                "hosts": [
                    {
                        "address": host.address,
                        "status": host.status,
                        "hostnames": [
                            hostname.name if hasattr(hostname, 'name') else str(hostname)
                            for hostname in host.hostnames
                        ],
                        "services": [
                            {
                                "port": service.port,
                                "protocol": service.protocol,
                                "state": service.state,
                                "service": service.service,
                                "banner": service.banner
                            }
                            for service in host.services
                        ]
                    }
                    for host in parsed.hosts
                ]
            }
            
            # Notify clients that new resources are available
            await server.request_context.session.send_resource_list_changed()
            
            logger.info(f"Scan completed. Found {len(parsed.hosts)} hosts. Scan ID: {scan_id}")
            
            return [
                types.TextContent(
                    type="text",
                    text=f"Scan completed. Found {len(parsed.hosts)} hosts. Scan ID: {scan_id}",
                )
            ]
        except Exception as e:
            logger.error(f"Error during nmap scan: {str(e)}")
            return [
                types.TextContent(
                    type="text",
                    text=f"Error during nmap scan: {str(e)}",
                )
            ]
        finally:
            # Remove from ongoing scans when done
            ongoing_scans.discard(scan_key)
            
    elif name == "get-scan-details":
        scan_id = arguments.get("scan_id")
        
        if not scan_id:
            raise ValueError("Missing scan_id")
            
        if scan_id not in scan_results:
            return [
                types.TextContent(
                    type="text",
                    text=f"Scan with ID {scan_id} not found",
                )
            ]
            
        scan_data = scan_results[scan_id]
        
        # Extract summary information
        hosts_up = sum(1 for host in scan_data.get("hosts", []) if host.get("status") == "up")
        total_ports = sum(len(host.get("services", [])) for host in scan_data.get("hosts", []))
        
        return [
            types.TextContent(
                type="text",
                text=f"Scan of {scan_data.get('target')} (ID: {scan_id}):\n"
                     f"- Options: {scan_data.get('options')}\n"
                     f"- Timestamp: {scan_data.get('timestamp')}\n"
                     f"- Hosts: {len(scan_data.get('hosts', []))} ({hosts_up} up)\n"
                     f"- Total ports/services: {total_ports}\n\n"
                     f"Use the nmap://scan/{scan_id} resource to access full results",
            )
        ]
    elif name == "list-all-scans":
        if not scan_results:
            return [
                types.TextContent(
                    type="text",
                    text="No scans have been performed yet.",
                )
            ]
            
        scan_list = []
        for scan_id, scan_data in scan_results.items():
            hosts_count = len(scan_data.get("hosts", []))
            scan_list.append(f"- Scan ID: {scan_id}")
            scan_list.append(f"  Target: {scan_data.get('target')}")
            scan_list.append(f"  Options: {scan_data.get('options')}")
            scan_list.append(f"  Hosts: {hosts_count}")
            scan_list.append("")
            
        return [
            types.TextContent(
                type="text",
                text="Available scans:\n\n" + "\n".join(scan_list),
            )
        ]
    else:
        raise ValueError(f"Unknown tool: {name}")

async def main():
    logger.info("Starting nmap MCP server")
    # Run the server using stdin/stdout streams
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="nmap",
                server_version="0.1.0",
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={},
                ),
            ),
        )
```