#
tokens: 2852/50000 3/3 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── Bridge_Zeek_MCP.py
├── images
│   ├── example1.png
│   ├── example2.png
│   ├── example3.png
│   ├── logo.png
│   └── start.png
├── LICENSE
├── pcaps
│   ├── ex1.pcapng
│   ├── ex2.pcapng
│   └── Sample.pcap
├── README.md
└── requirements.txt
```

# Files

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)
[![GitHub release (latest by date)](https://img.shields.io/badge/release-v1.0-blue)](https://github.com/Gabbo01/Zeek-MCP/releases)
[![Linkedin](https://img.shields.io/badge/Linked-in-blue)](https://www.linkedin.com/in/gabriele-bencivenga-93797b147/)

![Logo](images/logo.png)

# Zeek-MCP

This repository provides a set of utilities to build an MCP server (Model Context Protocol) that you can integrate with your conversational AI client.

---

## Table of Contents

* [Prerequisites](#prerequisites)
* [Installation](#installation)
* [Usage](#usage)

  * [1. Clone the repository](#1-clone-the-repository)
  * [2. Install dependencies](#2-install-dependencies)
  * [3. Run the MCP server](#3-run-the-mcp-server)
  * [4. Use the MCP tools](#4-use-the-mcp-tools)
* [Examples](#examples)
* [License](#license)

---

## Prerequisites

* **Python 3.7+**
* **Zeek** installed and available in your `PATH` (for the `execzeek` tool)
* **pip** (for installing Python dependencies)

---

## Installation

### 1. Clone the repository

```bash
git clone https://github.com/Gabbo01/Zeek-MCP
cd Zeek-MCP
```

### 2. Install dependencies

It's recommended to use a virtual environment:

```bash
python -m venv venv
source venv/bin/activate    # Linux/macOS
venv\Scripts\activate     # Windows
pip install -r requirements.txt
```

> **Note:** If you don’t have a `requirements.txt`, install directly:
>
> ```bash
> pip install pandas mcp
> ```

---

## Usage

The repository exposes two main MCP tools and a command-line entry point:

### 3. Run the MCP server

```bash
python Bridge_Zeek_MCP.py --mcp-host 127.0.0.1 --mcp-port 8081 --transport sse
```

* `--mcp-host`: Host for the MCP server (default: `127.0.0.1`).
* `--mcp-port`: Port for the MCP server (default: `8081`).
* `--transport`: Transport protocol, either `sse` (Server-Sent Events) or `stdio`.

![start](images/start.png)

### 4. Use the MCP tools
You need to use an LLM that can support the MCP tools usage by calling the following tools:

1. **`execzeek(pcap_path: str) -> str`**

   * **Description:** Runs Zeek on the given PCAP file after deleting existing `.log` files in the working directory.
   * **Returns:** A string listing generated `.log` filenames or `"1"` on error.

2. **`parselogs(logfile: str) -> DataFrame`**

   * **Description:** Parses a single Zeek `.log` file and returns the parsed content.


You can interact with these endpoints via HTTP (if using SSE transport) or by embedding in LLM client (eg: Claude Desktop):

#### Claude Desktop integration:

To set up Claude Desktop as a Zeek MCP client, go to `Claude` -> `Settings` -> `Developer` -> `Edit Config` -> `claude_desktop_config.json` and add the following:

```json
{
  "mcpServers": {
    "Zeek-mcp": {
      "command": "python",
      "args": [
        "/ABSOLUTE_PATH_TO/Bridge_Zeek_MCP.py",
      ]
    }
  }
}
```

Alternatively, edit this file directly:
```
/Users/YOUR_USER/Library/Application Support/Claude/claude_desktop_config.json
```
#### 5ire Integration:
Another MCP client that supports multiple models on the backend is [5ire](https://github.com/nanbingxyz/5ire). To set up Zeek-MCP, open 5ire and go to `Tools` -> `New` and set the following configurations:

1. Tool Key: ZeekMCP
2. Name: Zeek-MCP
3. Command: `python /ABSOLUTE_PATH_TO/Bridge_Zeek_MCP.py`

##### Alternatively you can use Chainlit framework and follow the [documentation](https://docs.chainlit.io/advanced-features/mcp) to integrate the MCP server.

---

## Examples
An example of MCP tools usage from a chainlit chatbot client, it was used an example pcap file (you can find fews in pcaps folder)
```
In that case the used model was claude-3.7-sonnet-reasoning-gemma3-12b
```

![example1](images/example1.png)

![example2](images/example2.png)

![example3](images/example3.png)

---

## License

See `LICENSE` for more information.

```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
pandas>=1.0
mcp
```

--------------------------------------------------------------------------------
/Bridge_Zeek_MCP.py:
--------------------------------------------------------------------------------

```python
import argparse  # Module for parsing command-line arguments
import logging   # Module for application logging
import subprocess  # Module for running external commands and subprocesses
import pandas as pd  # Library for tabular data manipulation and analysis
from mcp.server.fastmcp import FastMCP  # Import the FastMCP class from the MCP package
import os        # Module for interacting with the operating system (files, directories)
import glob      # Module for file pattern matching (glob)
import pandas as pd  # (Duplicate) Pandas for DataFrame operations

# Configure module-level logger
logger = logging.getLogger(__name__)
# Create the main FastMCP instance to expose tools as endpoints
mcp = FastMCP("Zeek-MCP")

def parse_zeek_log(path):
    """
    Parse a single Zeek .log file:
      1. Read header lines starting with '#'
      2. Extract fields defined by the '#fields' header line
      3. Build a pandas DataFrame from the tabular data

    Args:
        path (str): Path to the Zeek .log file
    Returns:
        pd.DataFrame: Table with columns corresponding to Zeek fields
    Raises:
        ValueError: If the '#fields' header line is missing
    """
    headers = []     # List to collect header lines
    data_lines = []  # List of lists to collect data row values

    # Open the log file for reading
    with open(path, "r") as f:
        for line in f:
            line = line.strip()  # Remove whitespace and newline
            if line.startswith("#"):  # Zeek header lines start with '#'
                headers.append(line)
            elif line:
                # Split data rows by tab and add to list
                data_lines.append(line.split('\t'))

    # Find the '#fields' header to determine column names
    field_line = next((h for h in headers if h.startswith("#fields")), None)
    if not field_line:
        # Raise an error if '#fields' is missing
        raise ValueError(f"Missing '#fields' header in {path}")

    # Build column list by removing the '#fields\t' prefix
    columns = field_line.replace("#fields\t", "").split('\t')
    # Create a pandas DataFrame with the parsed data
    df = pd.DataFrame(data_lines, columns=columns)
    return df


def parse_all_logs_as_str(directory="."):
    """
    Search for all .log files in the specified directory and return a single
    formatted string containing, for each file:
      - File name enclosed by '=== file_name ==='
      - Table of data (using DataFrame.to_string)
      - Error message if parsing fails

    Args:
        directory (str): Directory to search for .log files (default: current)
    Returns:
        str: Concatenated blocks separated by two blank lines
    """
    # Find and sort all .log files in the directory
    log_files = sorted(glob.glob(os.path.join(directory, "*.log")))
    parts = []  # List of text blocks for each log file

    for log_path in log_files:
        basename = os.path.basename(log_path)
        try:
            # Parse the log file and convert to a string table
            df = parse_zeek_log(log_path)
            table_str = df.to_string(index=False)
            part = f"=== {basename} ===\n\n{table_str}"
        except Exception as e:
            # Include error message if parsing fails
            part = f"[ERR] {basename}: {e}"
        parts.append(part)

    # Join all text blocks with two blank lines as separators
    return "\n\n".join(parts)


# Define an MCP tool using the @mcp.tool() decorator
@mcp.tool()
def execzeek(pcap_path: str) -> str:
    """
    Run Zeek on a specified PCAP file after cleaning existing .log files.

    Args:
        pcap_path (str): Path to the input PCAP file
    Returns:
        str: Comma-separated names of generated log files if successful,
             or "1" in case of an error
    """
    try:
        # Remove all existing .log files in the current directory
        for old in glob.glob("*.log"):
            try:
                os.remove(old)
                print(f"[INFO] Removed file: {old}")
            except Exception as e:
                print(f"[WARN] Could not remove {old}: {e}")

        # Execute the Zeek command on the PCAP file
        res = subprocess.run(["zeek", "-C", "-r", pcap_path], check=False)
        if res.returncode == 0:
            # On success, collect the new .log files
            new_logs = glob.glob("*.log")
            if new_logs:
                logs_str = ", ".join(new_logs)
                print(f"[INFO] Generated log files: {logs_str}")
                return f"Generated the following files:\n{logs_str}"
            else:
                print("[WARN] No .log files found after running Zeek.")
                return ""
        else:
            # If Zeek exits with an error code, return "1"
            print(f"[ERROR] Zeek returned exit code {res.returncode}")
            return "1"
    except Exception as e:
        # Handle unexpected exceptions during Zeek execution
        print(f"[ERROR] Error running Zeek: {e}")
        return "1"


@mcp.tool()
def parselogs(logfile: str):
    """
    MCP tool for parsing a single log file.

    Args:
        logfile (str): Path to the .log file to be parsed
    Returns:
        pd.DataFrame: DataFrame resulting from parse_zeek_log
    """
    return parse_zeek_log(logfile)


def main():
    # Set up command-line argument parser
    parser = argparse.ArgumentParser(description="MCP server for mcp")
    parser.add_argument("--mcp-host", type=str, default="127.0.0.1",
                        help="Host to run MCP server on (only used for sse), default: 127.0.0.1")
    parser.add_argument("--mcp-port", type=int,
                        help="Port to run MCP server on (only used for sse), default: 8081")
    parser.add_argument("--transport", type=str, default="sse", choices=["stdio", "sse"],
                        help="Transport protocol for MCP, default: sse")
    args = parser.parse_args()

    # Use Server-Sent Events (SSE) transport
    if args.transport == "sse":
        try:
            # Configure basic logging at INFO level
            log_level = logging.INFO
            logging.basicConfig(level=log_level)
            logging.getLogger().setLevel(log_level)

            # Apply FastMCP settings based on arguments
            mcp.settings.log_level = "INFO"
            mcp.settings.host = args.mcp_host or "127.0.0.1"
            mcp.settings.port = args.mcp_port or 8081

            logger.info(f"Starting MCP server on http://{mcp.settings.host}:{mcp.settings.port}/sse")
            logger.info(f"Using transport: {args.transport}")

            # Start the MCP server with SSE transport
            mcp.run(transport="sse")
        except KeyboardInterrupt:
            logger.info("Server stopped by user")
    else:
        # Run MCP in stdio transport mode
        mcp.run()

# Entry point of the script when executed directly
if __name__ == "__main__":
    main()

```