# Directory Structure
```
├── .gitignore
├── .python-version
├── images
│ ├── mcp-server-config.png
│ └── mcp-trivy-demo.gif
├── LICENSE
├── README.md
├── requirements.txt
└── server.py
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.12.8
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# PyPI configuration file
.pypirc
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# Trivy Security Scanner MCP Server
A Model Context Protocol (MCP) server that provides Trivy security scanning capabilities through a standardized interface.
> ⚠️ **Note**: This is a proof of concept project to demonstrate the integration capabilities between MCP, Cursor IDE, and Trivy. It's intended for experimentation and learning purposes only and is not production-ready. Use at your own risk.
## Features
- 🔍 **Project Scanning**: Automatically scan your project directory for security vulnerabilities using Trivy
- 🛠️ **Automated Fixes**: Automatically update vulnerable dependencies to secure versions
- 📦 **Multi-Package Support**: Handles multiple package managers (Python, Node.js, Ruby, Go)
# Demo

## Architecture
```plaintext
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Cursor IDE │ --> │ MCP Server │ --> │ Trivy │
│ (Composer) │ │ │ │ │
└─────────────┘ └──────────────┘ └─────────────┘
```
## Prerequisites
- Python 3.12 or higher
- Trivy installed on your system:
```bash
# macOS
brew install trivy
```
## Installation
```bash
# Create and activate virtual environment
python -m venv .venv
source .venv/bin/activate
# Install dependencies
pip install -r requirements.txt
```
## Usage
Start the server using SSE transport:
```bash
# Using SSE transport (default)
python server.py --transport sse --port 54321
```
The server exposes two tools:
1. `scan_project`: Scans a directory for security vulnerabilities
- Required argument: `workspace` - The directory path to scan
2. `fix_vulnerability`: Updates a vulnerable package to a secure version
- Required arguments:
- `workspace` - The directory to modify
- `pkg_name` - Name of the package to update
- `target_version` - Version to update to
## Using with Cursor IDE
1. Start the server with SSE transport:
```bash
python server.py --transport sse --port 54321
```
2. Configure in Cursor:
- Open Settings
- Go to Features > MCP Servers
- Add: `http://127.0.0.1:54321/sse`
3. Add the following to your .cursorrules file, create it if you don't have one yet:
```
After making changes in any of the package dependency/manifest files, scan the project for security vulnerabilities.
Fixes should only be according to the desired version reported by the scanner.
If the scanner reports a fix unrelated to our change, ignore it.
After performing the fix, scan the project for security vulnerabilities again.
```
This configuration will:
- Automatically trigger a security scan when any dependency file is modified
- Help identify vulnerabilities as soon as new dependencies are added
- Ensure your project stays secure throughout development
if you want to use the tool manually, you can use prompt the agent to use the tool with the following prompt through the composer interface:
```
Please scan my project for security vulnerabilities
```
## Why MCP?
MCP (Model Context Protocol) exists to solve a fundamental problem in working with large language models (LLMs): how to efficiently and consistently connect these models to external data sources and tools.
Learn more at [modelcontextprotocol.io](https://modelcontextprotocol.io).
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
## License
MIT License
## Acknowledgments
- [Model Context Protocol](https://modelcontextprotocol.io)
- [Trivy](https://github.com/aquasecurity/trivy)
- [Cursor IDE](https://cursor.sh)
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
mcp[cli]>=1.2.0
uvicorn>=0.21.1
starlette>=0.27.0
anyio>=3.7.1
click>=8.0.0
```
--------------------------------------------------------------------------------
/server.py:
--------------------------------------------------------------------------------
```python
import click
import json
import logging
import os
import anyio
import shutil
import subprocess
import sys
from typing import Optional, List
import mcp.types as types
from mcp.server.lowlevel import Server
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stdout
)
logger = logging.getLogger(__name__)
workspace_path: Optional[str] = None
async def scan_project_impl(project_dir: str) -> str:
logger.debug("scan_project tool called")
print(f"\nScanning directory: {project_dir}")
try:
print("Running Trivy scan... (this may take a moment)")
result = subprocess.run(
["trivy", "fs", "--format", "json", project_dir],
capture_output=True,
text=True,
check=True
)
scan_results = json.loads(result.stdout)
formatted_results = []
for result in scan_results.get("Results", []):
if result.get("Vulnerabilities"):
for vuln in result.get("Vulnerabilities", []):
formatted_results.append({
"Target": result.get("Target", "Unknown"),
"VulnerabilityID": vuln.get("VulnerabilityID", "Unknown"),
"PkgName": vuln.get("PkgName", "Unknown"),
"InstalledVersion": vuln.get("InstalledVersion", "Unknown"),
"FixedVersion": vuln.get("FixedVersion", "Unknown"),
"Severity": vuln.get("Severity", "Unknown"),
"Description": vuln.get("Description", "No description available")
})
if not formatted_results:
return "No vulnerabilities found!"
report = "Security Scan Results:\n\n"
for vuln in formatted_results:
report += f"Target: {vuln['Target']}\n"
report += f"Vulnerability: {vuln['VulnerabilityID']}\n"
report += f"Package: {vuln['PkgName']} (Current: {vuln['InstalledVersion']})\n"
report += f"Fixed Version: {vuln['FixedVersion']}\n"
report += f"Severity: {vuln['Severity']}\n"
report += f"Description: {vuln['Description']}\n"
report += "-" * 80 + "\n"
return report
except subprocess.CalledProcessError as e:
error_msg = f"Error running Trivy scan: {e.stderr}"
logger.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"Error: {str(e)}"
logger.error(error_msg)
return error_msg
async def fix_vulnerability_impl(pkg_name: str, target_version: str, project_dir: str) -> str:
logger.debug(f"fix_vulnerability tool called with pkg_name={pkg_name}, target_version={target_version}")
print(f"\nAttempting to fix vulnerability in package: {pkg_name}")
package_files = []
for root, _, files in os.walk(project_dir):
for file in files:
if file in ["requirements.txt", "package.json", "Gemfile", "go.mod"]:
package_files.append(os.path.join(root, file))
if not package_files:
return "No package manifest files found in the project"
results = []
for file_path in package_files:
file_name = os.path.basename(file_path)
backup_path = file_path + ".bak"
shutil.copy2(file_path, backup_path)
try:
if file_name == "requirements.txt":
with open(file_path, 'r') as f:
lines = f.readlines()
with open(file_path, 'w') as f:
for line in lines:
if line.strip().startswith(pkg_name):
f.write(f"{pkg_name}=={target_version}\n")
else:
f.write(line)
results.append(f"Updated {pkg_name} to version {target_version} in {file_path}")
elif file_name == "package.json":
with open(file_path, 'r') as f:
package_json = json.load(f)
updated = False
for dep_type in ["dependencies", "devDependencies"]:
if dep_type in package_json and pkg_name in package_json[dep_type]:
package_json[dep_type][pkg_name] = target_version
updated = True
if updated:
with open(file_path, 'w') as f:
json.dump(package_json, f, indent=2)
results.append(f"Updated {pkg_name} to version {target_version} in {file_path}")
elif file_name == "Gemfile":
with open(file_path, 'r') as f:
lines = f.readlines()
with open(file_path, 'w') as f:
for line in lines:
if f"gem '{pkg_name}'" in line or f'gem "{pkg_name}"' in line:
f.write(f"gem '{pkg_name}', '~> {target_version}'\n")
else:
f.write(line)
results.append(f"Updated {pkg_name} to version {target_version} in {file_path}")
elif file_name == "go.mod":
with open(file_path, 'r') as f:
lines = f.readlines()
with open(file_path, 'w') as f:
for line in lines:
if line.strip().startswith(pkg_name):
f.write(f"{pkg_name} v{target_version}\n")
else:
f.write(line)
results.append(f"Updated {pkg_name} to version {target_version} in {file_path}")
except Exception as e:
shutil.copy2(backup_path, file_path)
error_msg = f"Error updating {file_path}: {str(e)}"
logger.error(error_msg)
results.append(error_msg)
finally:
if os.path.exists(backup_path):
os.remove(backup_path)
if not results:
return f"Could not find {pkg_name} in any package manifest files"
return "\n".join(results)
@click.command()
@click.option("--port", default=54321, help="Port to listen on for SSE")
@click.option(
"--transport",
type=click.Choice(["stdio", "sse"]),
default="sse",
help="Transport type",
)
def main(port: int, transport: str) -> int:
print("Starting Trivy Security Scanner MCP server...")
app = Server("trivy-security-scanner")
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> List[types.TextContent]:
logger.info(f"Tool called with name: {name}")
logger.info(f"Arguments received: {arguments}")
workspace = arguments.get("workspace", os.getcwd())
workspace = workspace.replace("file://", "")
logger.info(f"Using workspace: {workspace}")
if name == "scan_project":
result = await scan_project_impl(workspace)
return [types.TextContent(type="text", text=result)]
elif name == "fix_vulnerability":
if "pkg_name" not in arguments or "target_version" not in arguments:
raise ValueError("Missing required arguments 'pkg_name' or 'target_version'")
result = await fix_vulnerability_impl(arguments["pkg_name"], arguments["target_version"], workspace)
return [types.TextContent(type="text", text=result)]
else:
raise ValueError(f"Unknown tool: {name}")
@app.list_tools()
async def list_tools() -> List[types.Tool]:
return [
types.Tool(
name="scan_project",
description="Scan the current project directory with Trivy for vulnerabilities",
inputSchema={
"type": "object",
"required": ["workspace"],
"properties": {
"workspace": {
"type": "string",
"description": "The workspace directory to scan",
}
},
},
),
types.Tool(
name="fix_vulnerability",
description="Attempt to fix a vulnerability by updating the specified package",
inputSchema={
"type": "object",
"required": ["pkg_name", "target_version", "workspace"],
"properties": {
"workspace": {
"type": "string",
"description": "The workspace directory to modify",
},
"pkg_name": {
"type": "string",
"description": "Name of the package to update",
},
"target_version": {
"type": "string",
"description": "Version to update to",
}
},
},
)
]
if transport == "sse":
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Mount, Route
sse = SseServerTransport("/messages/")
async def handle_sse(request):
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await app.run(
streams[0], streams[1], app.create_initialization_options()
)
starlette_app = Starlette(
debug=True,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
)
import uvicorn
uvicorn.run(starlette_app, host="127.0.0.1", port=port)
else:
return anyio.run(app.handle_stdio)
if __name__ == "__main__":
main()
```