#
tokens: 5511/50000 5/5 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .gitignore
├── .python-version
├── images
│   ├── mcp-server-config.png
│   └── mcp-trivy-demo.gif
├── LICENSE
├── README.md
├── requirements.txt
└── server.py
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
1 | 3.12.8
2 | 
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
  1 | # Byte-compiled / optimized / DLL files
  2 | __pycache__/
  3 | *.py[cod]
  4 | *$py.class
  5 | 
  6 | # C extensions
  7 | *.so
  8 | 
  9 | # Distribution / packaging
 10 | .Python
 11 | build/
 12 | develop-eggs/
 13 | dist/
 14 | downloads/
 15 | eggs/
 16 | .eggs/
 17 | lib/
 18 | lib64/
 19 | parts/
 20 | sdist/
 21 | var/
 22 | wheels/
 23 | share/python-wheels/
 24 | *.egg-info/
 25 | .installed.cfg
 26 | *.egg
 27 | MANIFEST
 28 | 
 29 | # PyInstaller
 30 | #  Usually these files are written by a python script from a template
 31 | #  before PyInstaller builds the exe, so as to inject date/other infos into it.
 32 | *.manifest
 33 | *.spec
 34 | 
 35 | # Installer logs
 36 | pip-log.txt
 37 | pip-delete-this-directory.txt
 38 | 
 39 | # Unit test / coverage reports
 40 | htmlcov/
 41 | .tox/
 42 | .nox/
 43 | .coverage
 44 | .coverage.*
 45 | .cache
 46 | nosetests.xml
 47 | coverage.xml
 48 | *.cover
 49 | *.py,cover
 50 | .hypothesis/
 51 | .pytest_cache/
 52 | cover/
 53 | 
 54 | # Translations
 55 | *.mo
 56 | *.pot
 57 | 
 58 | # Django stuff:
 59 | *.log
 60 | local_settings.py
 61 | db.sqlite3
 62 | db.sqlite3-journal
 63 | 
 64 | # Flask stuff:
 65 | instance/
 66 | .webassets-cache
 67 | 
 68 | # Scrapy stuff:
 69 | .scrapy
 70 | 
 71 | # Sphinx documentation
 72 | docs/_build/
 73 | 
 74 | # PyBuilder
 75 | .pybuilder/
 76 | target/
 77 | 
 78 | # Jupyter Notebook
 79 | .ipynb_checkpoints
 80 | 
 81 | # IPython
 82 | profile_default/
 83 | ipython_config.py
 84 | 
 85 | # pyenv
 86 | #   For a library or package, you might want to ignore these files since the code is
 87 | #   intended to run in multiple environments; otherwise, check them in:
 88 | # .python-version
 89 | 
 90 | # pipenv
 91 | #   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
 92 | #   However, in case of collaboration, if having platform-specific dependencies or dependencies
 93 | #   having no cross-platform support, pipenv may install dependencies that don't work, or not
 94 | #   install all needed dependencies.
 95 | #Pipfile.lock
 96 | 
 97 | # UV
 98 | #   Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
 99 | #   This is especially recommended for binary packages to ensure reproducibility, and is more
100 | #   commonly ignored for libraries.
101 | #uv.lock
102 | 
103 | # poetry
104 | #   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
105 | #   This is especially recommended for binary packages to ensure reproducibility, and is more
106 | #   commonly ignored for libraries.
107 | #   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
108 | #poetry.lock
109 | 
110 | # pdm
111 | #   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
112 | #pdm.lock
113 | #   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
114 | #   in version control.
115 | #   https://pdm.fming.dev/latest/usage/project/#working-with-version-control
116 | .pdm.toml
117 | .pdm-python
118 | .pdm-build/
119 | 
120 | # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
121 | __pypackages__/
122 | 
123 | # Celery stuff
124 | celerybeat-schedule
125 | celerybeat.pid
126 | 
127 | # SageMath parsed files
128 | *.sage.py
129 | 
130 | # Environments
131 | .env
132 | .venv
133 | env/
134 | venv/
135 | ENV/
136 | env.bak/
137 | venv.bak/
138 | 
139 | # Spyder project settings
140 | .spyderproject
141 | .spyproject
142 | 
143 | # Rope project settings
144 | .ropeproject
145 | 
146 | # mkdocs documentation
147 | /site
148 | 
149 | # mypy
150 | .mypy_cache/
151 | .dmypy.json
152 | dmypy.json
153 | 
154 | # Pyre type checker
155 | .pyre/
156 | 
157 | # pytype static type analyzer
158 | .pytype/
159 | 
160 | # Cython debug symbols
161 | cython_debug/
162 | 
163 | # PyCharm
164 | #  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
165 | #  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
166 | #  and can be added to the global gitignore or merged into this file.  For a more nuclear
167 | #  option (not recommended) you can uncomment the following to ignore the entire idea folder.
168 | #.idea/
169 | 
170 | # PyPI configuration file
171 | .pypirc
172 | 
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | # Trivy Security Scanner MCP Server
  2 | 
  3 | A Model Context Protocol (MCP) server that provides Trivy security scanning capabilities through a standardized interface.
  4 | 
  5 | > ⚠️ **Note**: This is a proof of concept project to demonstrate the integration capabilities between MCP, Cursor IDE, and Trivy. It's intended for experimentation and learning purposes only and is not production-ready. Use at your own risk.
  6 | 
  7 | ## Features
  8 | 
  9 | - 🔍 **Project Scanning**: Automatically scan your project directory for security vulnerabilities using Trivy
 10 | - 🛠️ **Automated Fixes**: Automatically update vulnerable dependencies to secure versions
 11 | - 📦 **Multi-Package Support**: Handles multiple package managers (Python, Node.js, Ruby, Go)
 12 | 
 13 | # Demo
 14 | 
 15 | ![Demo](/images/mcp-trivy-demo.gif)
 16 | 
 17 | ## Architecture
 18 | 
 19 | ```plaintext
 20 | ┌─────────────┐     ┌──────────────┐     ┌─────────────┐
 21 | │  Cursor IDE │ --> │   MCP Server │ --> │    Trivy    │
 22 | │  (Composer) │     │              │     │             │
 23 | └─────────────┘     └──────────────┘     └─────────────┘
 24 | ```
 25 | 
 26 | ## Prerequisites
 27 | 
 28 | - Python 3.12 or higher
 29 | - Trivy installed on your system:
 30 |   ```bash
 31 |   # macOS
 32 |   brew install trivy
 33 |   ```
 34 | 
 35 | ## Installation
 36 | 
 37 | ```bash
 38 | # Create and activate virtual environment
 39 | python -m venv .venv
 40 | source .venv/bin/activate
 41 | 
 42 | # Install dependencies
 43 | pip install -r requirements.txt
 44 | ```
 45 | 
 46 | ## Usage
 47 | 
 48 | Start the server using SSE transport:
 49 | 
 50 | ```bash
 51 | # Using SSE transport (default)
 52 | python server.py --transport sse --port 54321
 53 | ```
 54 | 
 55 | The server exposes two tools:
 56 | 
 57 | 1. `scan_project`: Scans a directory for security vulnerabilities
 58 |    - Required argument: `workspace` - The directory path to scan
 59 | 
 60 | 2. `fix_vulnerability`: Updates a vulnerable package to a secure version
 61 |    - Required arguments:
 62 |      - `workspace` - The directory to modify
 63 |      - `pkg_name` - Name of the package to update
 64 |      - `target_version` - Version to update to
 65 | 
 66 | ## Using with Cursor IDE
 67 | 
 68 | 1. Start the server with SSE transport:
 69 |    ```bash
 70 |    python server.py --transport sse --port 54321
 71 |    ```
 72 | 
 73 | 2. Configure in Cursor:
 74 |    - Open Settings
 75 |    - Go to Features > MCP Servers
 76 |    - Add: `http://127.0.0.1:54321/sse`
 77 | 
 78 | 3. Add the following to your .cursorrules file, create it if you don't have one yet:
 79 |    ```
 80 |    After making changes in any of the package dependency/manifest files, scan the project for security vulnerabilities.
 81 |    Fixes should only be according to the desired version reported by the scanner.
 82 |    If the scanner reports a fix unrelated to our change, ignore it.
 83 |    After performing the fix, scan the project for security vulnerabilities again.
 84 |    ```
 85 | 
 86 |     This configuration will:
 87 |     - Automatically trigger a security scan when any dependency file is modified
 88 |     - Help identify vulnerabilities as soon as new dependencies are added
 89 |     - Ensure your project stays secure throughout development
 90 |    
 91 |    if you want to use the tool manually, you can use prompt the agent to use the tool with the following prompt through the composer interface:
 92 |    ```
 93 |    Please scan my project for security vulnerabilities
 94 |    ```
 95 | 
 96 | ## Why MCP?
 97 | 
 98 | MCP (Model Context Protocol) exists to solve a fundamental problem in working with large language models (LLMs): how to efficiently and consistently connect these models to external data sources and tools.
 99 | 
100 | Learn more at [modelcontextprotocol.io](https://modelcontextprotocol.io).
101 | 
102 | ## Contributing
103 | 
104 | Contributions are welcome! Please feel free to submit a Pull Request.
105 | 
106 | ## License
107 | 
108 | MIT License
109 | 
110 | ## Acknowledgments
111 | 
112 | - [Model Context Protocol](https://modelcontextprotocol.io)
113 | - [Trivy](https://github.com/aquasecurity/trivy)
114 | - [Cursor IDE](https://cursor.sh)
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
1 | mcp[cli]>=1.2.0
2 | uvicorn>=0.21.1
3 | starlette>=0.27.0
4 | anyio>=3.7.1
5 | click>=8.0.0
```

--------------------------------------------------------------------------------
/server.py:
--------------------------------------------------------------------------------

```python
  1 | import click
  2 | import json
  3 | import logging
  4 | import os
  5 | import anyio
  6 | import shutil
  7 | import subprocess
  8 | import sys
  9 | from typing import Optional, List
 10 | 
 11 | import mcp.types as types
 12 | from mcp.server.lowlevel import Server
 13 | 
 14 | logging.basicConfig(
 15 |     level=logging.INFO,
 16 |     format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
 17 |     stream=sys.stdout
 18 | )
 19 | logger = logging.getLogger(__name__)
 20 | 
 21 | workspace_path: Optional[str] = None
 22 | 
 23 | async def scan_project_impl(project_dir: str) -> str:
 24 |     logger.debug("scan_project tool called")
 25 |     print(f"\nScanning directory: {project_dir}")
 26 |     
 27 |     try:
 28 |         print("Running Trivy scan... (this may take a moment)")
 29 |         result = subprocess.run(
 30 |             ["trivy", "fs", "--format", "json", project_dir],
 31 |             capture_output=True,
 32 |             text=True,
 33 |             check=True
 34 |         )
 35 |         scan_results = json.loads(result.stdout)
 36 |         
 37 |         formatted_results = []
 38 |         for result in scan_results.get("Results", []):
 39 |             if result.get("Vulnerabilities"):
 40 |                 for vuln in result.get("Vulnerabilities", []):
 41 |                     formatted_results.append({
 42 |                         "Target": result.get("Target", "Unknown"),
 43 |                         "VulnerabilityID": vuln.get("VulnerabilityID", "Unknown"),
 44 |                         "PkgName": vuln.get("PkgName", "Unknown"),
 45 |                         "InstalledVersion": vuln.get("InstalledVersion", "Unknown"),
 46 |                         "FixedVersion": vuln.get("FixedVersion", "Unknown"),
 47 |                         "Severity": vuln.get("Severity", "Unknown"),
 48 |                         "Description": vuln.get("Description", "No description available")
 49 |                     })
 50 |         
 51 |         if not formatted_results:
 52 |             return "No vulnerabilities found!"
 53 |         
 54 |         report = "Security Scan Results:\n\n"
 55 |         for vuln in formatted_results:
 56 |             report += f"Target: {vuln['Target']}\n"
 57 |             report += f"Vulnerability: {vuln['VulnerabilityID']}\n"
 58 |             report += f"Package: {vuln['PkgName']} (Current: {vuln['InstalledVersion']})\n"
 59 |             report += f"Fixed Version: {vuln['FixedVersion']}\n"
 60 |             report += f"Severity: {vuln['Severity']}\n"
 61 |             report += f"Description: {vuln['Description']}\n"
 62 |             report += "-" * 80 + "\n"
 63 |         
 64 |         return report
 65 |         
 66 |     except subprocess.CalledProcessError as e:
 67 |         error_msg = f"Error running Trivy scan: {e.stderr}"
 68 |         logger.error(error_msg)
 69 |         return error_msg
 70 |     except Exception as e:
 71 |         error_msg = f"Error: {str(e)}"
 72 |         logger.error(error_msg)
 73 |         return error_msg
 74 | 
 75 | async def fix_vulnerability_impl(pkg_name: str, target_version: str, project_dir: str) -> str:
 76 |     logger.debug(f"fix_vulnerability tool called with pkg_name={pkg_name}, target_version={target_version}")
 77 |     print(f"\nAttempting to fix vulnerability in package: {pkg_name}")
 78 |     
 79 |     package_files = []
 80 |     for root, _, files in os.walk(project_dir):
 81 |         for file in files:
 82 |             if file in ["requirements.txt", "package.json", "Gemfile", "go.mod"]:
 83 |                 package_files.append(os.path.join(root, file))
 84 |     
 85 |     if not package_files:
 86 |         return "No package manifest files found in the project"
 87 |     
 88 |     results = []
 89 |     for file_path in package_files:
 90 |         file_name = os.path.basename(file_path)
 91 |         backup_path = file_path + ".bak"
 92 |         shutil.copy2(file_path, backup_path)
 93 |         
 94 |         try:
 95 |             if file_name == "requirements.txt":
 96 |                 with open(file_path, 'r') as f:
 97 |                     lines = f.readlines()
 98 |                 with open(file_path, 'w') as f:
 99 |                     for line in lines:
100 |                         if line.strip().startswith(pkg_name):
101 |                             f.write(f"{pkg_name}=={target_version}\n")
102 |                         else:
103 |                             f.write(line)
104 |                 results.append(f"Updated {pkg_name} to version {target_version} in {file_path}")
105 |                 
106 |             elif file_name == "package.json":
107 |                 with open(file_path, 'r') as f:
108 |                     package_json = json.load(f)
109 |                 
110 |                 updated = False
111 |                 for dep_type in ["dependencies", "devDependencies"]:
112 |                     if dep_type in package_json and pkg_name in package_json[dep_type]:
113 |                         package_json[dep_type][pkg_name] = target_version
114 |                         updated = True
115 |                 
116 |                 if updated:
117 |                     with open(file_path, 'w') as f:
118 |                         json.dump(package_json, f, indent=2)
119 |                     results.append(f"Updated {pkg_name} to version {target_version} in {file_path}")
120 |                 
121 |             elif file_name == "Gemfile":
122 |                 with open(file_path, 'r') as f:
123 |                     lines = f.readlines()
124 |                 
125 |                 with open(file_path, 'w') as f:
126 |                     for line in lines:
127 |                         if f"gem '{pkg_name}'" in line or f'gem "{pkg_name}"' in line:
128 |                             f.write(f"gem '{pkg_name}', '~> {target_version}'\n")
129 |                         else:
130 |                             f.write(line)
131 |                 results.append(f"Updated {pkg_name} to version {target_version} in {file_path}")
132 |                 
133 |             elif file_name == "go.mod":
134 |                 with open(file_path, 'r') as f:
135 |                     lines = f.readlines()
136 |                 
137 |                 with open(file_path, 'w') as f:
138 |                     for line in lines:
139 |                         if line.strip().startswith(pkg_name):
140 |                             f.write(f"{pkg_name} v{target_version}\n")
141 |                         else:
142 |                             f.write(line)
143 |                 results.append(f"Updated {pkg_name} to version {target_version} in {file_path}")
144 |         
145 |         except Exception as e:
146 |             shutil.copy2(backup_path, file_path)
147 |             error_msg = f"Error updating {file_path}: {str(e)}"
148 |             logger.error(error_msg)
149 |             results.append(error_msg)
150 |         finally:
151 |             if os.path.exists(backup_path):
152 |                 os.remove(backup_path)
153 |     
154 |     if not results:
155 |         return f"Could not find {pkg_name} in any package manifest files"
156 |     
157 |     return "\n".join(results)
158 | 
159 | @click.command()
160 | @click.option("--port", default=54321, help="Port to listen on for SSE")
161 | @click.option(
162 |     "--transport",
163 |     type=click.Choice(["stdio", "sse"]),
164 |     default="sse",
165 |     help="Transport type",
166 | )
167 | def main(port: int, transport: str) -> int:
168 |     print("Starting Trivy Security Scanner MCP server...")
169 |     app = Server("trivy-security-scanner")
170 | 
171 |     @app.call_tool()
172 |     async def call_tool(name: str, arguments: dict) -> List[types.TextContent]:
173 |         logger.info(f"Tool called with name: {name}")
174 |         logger.info(f"Arguments received: {arguments}")
175 |         
176 |         workspace = arguments.get("workspace", os.getcwd())
177 |         workspace = workspace.replace("file://", "")
178 |         logger.info(f"Using workspace: {workspace}")
179 |         
180 |         if name == "scan_project":
181 |             result = await scan_project_impl(workspace)
182 |             return [types.TextContent(type="text", text=result)]
183 |         elif name == "fix_vulnerability":
184 |             if "pkg_name" not in arguments or "target_version" not in arguments:
185 |                 raise ValueError("Missing required arguments 'pkg_name' or 'target_version'")
186 |             result = await fix_vulnerability_impl(arguments["pkg_name"], arguments["target_version"], workspace)
187 |             return [types.TextContent(type="text", text=result)]
188 |         else:
189 |             raise ValueError(f"Unknown tool: {name}")
190 | 
191 |     @app.list_tools()
192 |     async def list_tools() -> List[types.Tool]:
193 |         return [
194 |             types.Tool(
195 |                 name="scan_project",
196 |                 description="Scan the current project directory with Trivy for vulnerabilities",
197 |                 inputSchema={
198 |                     "type": "object",
199 |                     "required": ["workspace"],
200 |                     "properties": {
201 |                         "workspace": {
202 |                             "type": "string",
203 |                             "description": "The workspace directory to scan",
204 |                         }
205 |                     },
206 |                 },
207 |             ),
208 |             types.Tool(
209 |                 name="fix_vulnerability",
210 |                 description="Attempt to fix a vulnerability by updating the specified package",
211 |                 inputSchema={
212 |                     "type": "object",
213 |                     "required": ["pkg_name", "target_version", "workspace"],
214 |                     "properties": {
215 |                         "workspace": {
216 |                             "type": "string",
217 |                             "description": "The workspace directory to modify",
218 |                         },
219 |                         "pkg_name": {
220 |                             "type": "string",
221 |                             "description": "Name of the package to update",
222 |                         },
223 |                         "target_version": {
224 |                             "type": "string",
225 |                             "description": "Version to update to",
226 |                         }
227 |                     },
228 |                 },
229 |             )
230 |         ]
231 | 
232 |     if transport == "sse":
233 |         from mcp.server.sse import SseServerTransport
234 |         from starlette.applications import Starlette
235 |         from starlette.routing import Mount, Route
236 | 
237 |         sse = SseServerTransport("/messages/")
238 | 
239 |         async def handle_sse(request):
240 |             async with sse.connect_sse(
241 |                 request.scope, request.receive, request._send
242 |             ) as streams:
243 |                 await app.run(
244 |                     streams[0], streams[1], app.create_initialization_options()
245 |                 )
246 | 
247 |         starlette_app = Starlette(
248 |             debug=True,
249 |             routes=[
250 |                 Route("/sse", endpoint=handle_sse),
251 |                 Mount("/messages/", app=sse.handle_post_message),
252 |             ],
253 |         )
254 | 
255 |         import uvicorn
256 |         uvicorn.run(starlette_app, host="127.0.0.1", port=port)
257 |     else:
258 |         return anyio.run(app.handle_stdio)
259 | 
260 | if __name__ == "__main__":
261 |     main()
```