# Directory Structure
```
├── env.example
├── LICENSE
├── pyproject.toml
├── README.md
├── requirements.txt
├── src
│ └── threatzone_mcp
│ ├── __init__.py
│ ├── __main__.py
│ └── server.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
fastmcp>=2.0.0
httpx>=0.27.0
pydantic>=2.5.0
python-dotenv>=1.0.0
typing-extensions>=4.8.0
```
--------------------------------------------------------------------------------
/src/threatzone_mcp/__main__.py:
--------------------------------------------------------------------------------
```python
"""Make the package executable with python -m threatzone_mcp."""
from .server import main
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/src/threatzone_mcp/__init__.py:
--------------------------------------------------------------------------------
```python
"""Threat.Zone MCP Server package."""
__version__ = "0.1.0"
__author__ = "Malwation Team"
__email__ = "[email protected]"
from .server import app, main
__all__ = ["app", "main", "__version__"]
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "threatzone-mcp"
version = "0.1.0"
description = "Model Context Protocol (MCP) server for Threat.Zone API"
authors = [
{name = "Malwation Team", email = "[email protected]"}
]
readme = "README.md"
requires-python = ">=3.10"
license = {text = "GPL-3.0-or-later"}
keywords = ["mcp", "threat-zone", "malware", "security", "analysis"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Security",
"Topic :: Software Development :: Libraries :: Python Modules",
]
dependencies = [
"fastmcp>=2.0.0",
"httpx>=0.27.0",
"pydantic>=2.5.0",
"python-dotenv>=1.0.0",
"typing-extensions>=4.8.0"
]
[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
"black>=23.0.0",
"isort>=5.12.0",
"mypy>=1.8.0",
"pre-commit>=3.0.0"
]
[project.urls]
Homepage = "https://github.com/threat-zone/threatzonemcp"
Documentation = "https://threat.zone/docs"
Repository = "https://github.com/threat-zone/threatzonemcp"
Issues = "https://github.com/threat-zone/threatzonemcp/issues"
[project.scripts]
threatzone-mcp = "threatzone_mcp.server:main"
[tool.uv]
dev-dependencies = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
"black>=23.0.0",
"isort>=5.12.0",
"mypy>=1.8.0",
"pre-commit>=3.0.0"
]
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["src"]
[tool.setuptools.package-dir]
"" = "src"
[tool.black]
line-length = 88
target-version = ['py310']
include = '\.pyi?$'
exclude = '''
/(
\.eggs
| \.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| _build
| buck-out
| build
| dist
)/
'''
[tool.isort]
profile = "black"
multi_line_output = 3
line_length = 88
known_first_party = ["threatzone_mcp"]
[tool.mypy]
python_version = "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true
warn_no_return = true
warn_unreachable = true
strict_equality = true
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py", "*_test.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "-v --tb=short"
asyncio_mode = "auto"
```
--------------------------------------------------------------------------------
/src/threatzone_mcp/server.py:
--------------------------------------------------------------------------------
```python
"""Threat.Zone MCP Server implementation."""
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import httpx
from dotenv import load_dotenv
from fastmcp import FastMCP
from pydantic import BaseModel, Field
# Load environment variables
load_dotenv()
# Initialize FastMCP server
app = FastMCP("ThreatZone")
# Configuration
API_BASE_URL = os.getenv("THREATZONE_API_URL", "https://app.threat.zone")
API_KEY = os.getenv("THREATZONE_API_KEY")
class ThreatZoneError(Exception):
"""Custom exception for Threat.Zone API errors."""
pass
class APIClient:
"""HTTP client for Threat.Zone API."""
def __init__(self, api_key: str, base_url: str = API_BASE_URL):
self.api_key = api_key
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Make GET request to API."""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}{endpoint}",
headers=self.headers,
params=params
)
await self._handle_response(response)
return response.json()
async def post(self, endpoint: str, data: Optional[Dict[str, Any]] = None, files: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Make POST request to API."""
async with httpx.AsyncClient() as client:
headers = {"Authorization": f"Bearer {self.api_key}"}
if files:
# For file uploads, don't set Content-Type
response = await client.post(
f"{self.base_url}{endpoint}",
headers=headers,
data=data,
files=files
)
else:
response = await client.post(
f"{self.base_url}{endpoint}",
headers=self.headers,
json=data
)
await self._handle_response(response)
return response.json()
async def download(self, endpoint: str) -> bytes:
"""Download file from API."""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}{endpoint}",
headers=self.headers
)
await self._handle_response(response)
return response.content
async def _handle_response(self, response: httpx.Response) -> None:
"""Handle API response errors."""
if response.status_code == 401:
raise ThreatZoneError("Authentication failed. Check your API key.")
elif response.status_code == 404:
raise ThreatZoneError("Resource not found.")
elif response.status_code == 422:
raise ThreatZoneError("Invalid request parameters.")
elif response.status_code >= 400:
try:
error_data = response.json()
error_msg = error_data.get("message", f"API error: {response.status_code}")
except:
error_msg = f"API error: {response.status_code}"
raise ThreatZoneError(error_msg)
# Initialize API client (lazy initialization)
client = None
def get_client():
"""Get or create the API client."""
global client
if client is None:
if not API_KEY:
raise ThreatZoneError("THREATZONE_API_KEY environment variable is required")
client = APIClient(API_KEY)
return client
# Constants Tools
@app.tool
async def get_metafields() -> Dict[str, Any]:
"""Get available metafields for scan configuration."""
return await get_client().get("/public-api/constants/metafields")
@app.tool
async def get_levels() -> Dict[str, Any]:
"""Get threat levels used in analysis results."""
return await get_client().get("/public-api/constants/levels")
@app.tool
async def get_statuses() -> Dict[str, Any]:
"""Get submission statuses."""
return await get_client().get("/public-api/constants/statuses")
@app.tool
async def get_sample_metafield() -> Dict[str, Any]:
"""Get sample metafield configuration for sandbox analysis."""
return await get_client().get("/public-api/constants/samplemetafield")
@app.tool
async def interpret_status(status_value: int) -> str:
"""
Interpret a numeric status value from submission results.
Args:
status_value: Numeric status value (1-5)
Returns:
Human-readable status description
"""
status_map = {
1: "File received",
2: "Submission is failed",
3: "Submission is running",
4: "Submission VM is ready",
5: "Submission is finished"
}
return status_map.get(status_value, f"Unknown status: {status_value}")
@app.tool
async def interpret_threat_level(level_value: int) -> str:
"""
Interpret a numeric threat level value from analysis results.
Args:
level_value: Numeric threat level (0-3)
Returns:
Human-readable threat level description
"""
level_map = {
0: "Unknown",
1: "Informative",
2: "Suspicious",
3: "Malicious"
}
return level_map.get(level_value, f"Unknown level: {level_value}")
@app.tool
async def get_submission_status_summary(uuid: str) -> Dict[str, Any]:
"""
Get submission details with interpreted status and threat level.
Args:
uuid: Submission UUID
Returns:
Submission details with human-readable status and threat level
"""
submission = await get_client().get(f"/public-api/get/submission/{uuid}")
# Add interpreted values if available
if 'status' in submission:
submission['status_description'] = await interpret_status(submission['status'])
if 'level' in submission:
submission['threat_level_description'] = await interpret_threat_level(submission['level'])
return submission
# User Information Tools
@app.tool
async def get_user_info() -> Dict[str, Any]:
"""Get current user information, workspace details, and usage limits."""
return await get_client().get("/public-api/me")
@app.tool
async def get_server_config() -> Dict[str, Any]:
"""
Get current server configuration including API URL and connection status.
Returns:
Configuration details including API URL, version, and status
"""
config = {
"api_url": API_BASE_URL,
"version": "0.1.0",
"api_key_configured": bool(API_KEY and len(API_KEY) > 10)
}
# Test connection if API key is available
if config["api_key_configured"]:
try:
user_info = await get_client().get("/public-api/me")
config["connection_status"] = "connected"
config["workspace"] = user_info.get("userInfo", {}).get("workspace", {}).get("name", "Unknown")
except Exception as e:
config["connection_status"] = "failed"
config["error"] = str(e)
else:
config["connection_status"] = "no_api_key"
return config
# Scanning Tools
@app.tool
async def scan_url(url: str, is_public: bool = False) -> Dict[str, Any]:
"""
Analyze a URL for threats and malicious content.
Args:
url: The URL to analyze
is_public: Whether the scan results should be public
"""
data = {
"url": url,
"isPublic": is_public
}
return await get_client().post("/public-api/scan/url", data=data)
@app.tool
async def scan_file_sandbox(
file_path: str,
is_public: bool = False,
entrypoint: Optional[str] = None,
password: Optional[str] = None,
environment: str = "w10_x64",
timeout: int = 180,
work_path: str = "desktop",
mouse_simulation: bool = True,
https_inspection: bool = False,
internet_connection: bool = False,
raw_logs: bool = False,
snapshot: bool = False,
sleep_evasion: bool = False,
smart_tracing: bool = False,
dump_collector: bool = False,
open_in_browser: bool = False,
extension_check: bool = True,
modules: Optional[List[str]] = None,
auto_config: bool = False
) -> Dict[str, Any]:
"""
Submit a file for advanced sandbox analysis with detailed configuration.
Args:
file_path: Path to the file to analyze
is_public: Whether the scan results should be public (default: False)
entrypoint: File to execute within archive (if applicable)
password: Password for archive files (if applicable)
environment: Analysis environment - w7_x64, w10_x64, w11_x64, macos, android, linux (default: w10_x64)
timeout: Analysis timeout in seconds - 60, 120, 180, 240, 300 (default: 180)
work_path: Working directory - desktop, root, %AppData%, windows, temp (default: desktop)
mouse_simulation: Enable mouse simulation (default: True)
https_inspection: Enable HTTPS inspection (default: False)
internet_connection: Enable internet connection (default: False)
raw_logs: Include raw logs (default: False)
snapshot: Take VM snapshots (default: False)
sleep_evasion: Enable sleep evasion techniques (default: False)
smart_tracing: Enable smart tracing (default: False)
dump_collector: Enable dump collection (default: False)
open_in_browser: Open files in browser (default: False)
extension_check: Perform extension check (default: True)
modules: Analysis modules to use, e.g., ["csi", "cdr"] (default: None)
auto_config: Use automatic configuration (default: False)
"""
if not Path(file_path).exists():
raise ThreatZoneError(f"File not found: {file_path}")
# Build the analyze configuration
analyze_config = [
{"metafieldId": "environment", "value": environment},
{"metafieldId": "private", "value": not is_public},
{"metafieldId": "timeout", "value": timeout},
{"metafieldId": "work_path", "value": work_path},
{"metafieldId": "mouse_simulation", "value": mouse_simulation},
{"metafieldId": "https_inspection", "value": https_inspection},
{"metafieldId": "internet_connection", "value": internet_connection},
{"metafieldId": "raw_logs", "value": raw_logs},
{"metafieldId": "snapshot", "value": snapshot},
{"metafieldId": "sleep_evasion", "value": sleep_evasion},
{"metafieldId": "smart_tracing", "value": smart_tracing},
{"metafieldId": "dump_collector", "value": dump_collector},
{"metafieldId": "open_in_browser", "value": open_in_browser}
]
# Prepare form data
data = {
"analyzeConfig": json.dumps(analyze_config),
"extensionCheck": str(extension_check).lower()
}
if entrypoint:
data["entrypoint"] = entrypoint
if password:
data["password"] = password
if modules:
data["modules"] = ",".join(modules)
# Build URL with auto parameter
url = f"/public-api/scan/sandbox?auto={str(auto_config).lower()}"
files = {"file": open(file_path, "rb")}
try:
return await get_client().post(url, data=data, files=files)
finally:
files["file"].close()
@app.tool
async def scan_file_sandbox_simple(
file_path: str,
is_public: bool = False,
entrypoint: Optional[str] = None,
password: Optional[str] = None
) -> Dict[str, Any]:
"""
Submit a file for simple sandbox analysis using default settings.
This is a simplified version of scan_file_sandbox with default configurations.
Use scan_file_sandbox for advanced configuration options.
Args:
file_path: Path to the file to analyze
is_public: Whether the scan results should be public (default: False)
entrypoint: File to execute within archive (if applicable)
password: Password for archive files (if applicable)
"""
return await scan_file_sandbox(
file_path=file_path,
is_public=is_public,
entrypoint=entrypoint,
password=password,
auto_config=True # Use automatic configuration for simplicity
)
@app.tool
async def scan_file_static(
file_path: str,
is_public: bool = False,
entrypoint: Optional[str] = None,
password: Optional[str] = None
) -> Dict[str, Any]:
"""
Submit a file for static analysis.
Args:
file_path: Path to the file to analyze
is_public: Whether the scan results should be public
entrypoint: File to execute within archive (if applicable)
password: Password for archive files (if applicable)
"""
if not Path(file_path).exists():
raise ThreatZoneError(f"File not found: {file_path}")
data = {"isPublic": is_public}
if entrypoint:
data["entrypoint"] = entrypoint
if password:
data["password"] = password
files = {"file": open(file_path, "rb")}
try:
return await get_client().post("/public-api/scan/static", data=data, files=files)
finally:
files["file"].close()
@app.tool
async def scan_file_cdr(
file_path: str,
is_public: bool = False,
entrypoint: Optional[str] = None,
password: Optional[str] = None
) -> Dict[str, Any]:
"""
Submit a file for CDR (Content Disarm and Reconstruction) processing.
Args:
file_path: Path to the file to process
is_public: Whether the scan results should be public
entrypoint: File to execute within archive (if applicable)
password: Password for archive files (if applicable)
"""
if not Path(file_path).exists():
raise ThreatZoneError(f"File not found: {file_path}")
data = {"isPublic": is_public}
if entrypoint:
data["entrypoint"] = entrypoint
if password:
data["password"] = password
files = {"file": open(file_path, "rb")}
try:
return await get_client().post("/public-api/scan/cdr", data=data, files=files)
finally:
files["file"].close()
# Submission Retrieval Tools
@app.tool
async def get_submission(uuid: str) -> Dict[str, Any]:
"""
Get submission details by UUID.
Args:
uuid: Submission UUID
"""
return await get_client().get(f"/public-api/get/submission/{uuid}")
@app.tool
async def get_submission_indicators(uuid: str) -> Dict[str, Any]:
"""
Get all indicators for a specific submission.
Args:
uuid: Submission UUID
"""
return await get_client().get(f"/public-api/get/submission/{uuid}/indicators")
@app.tool
async def get_submission_iocs(uuid: str) -> Dict[str, Any]:
"""
Get all Indicators of Compromise for a specific submission.
Args:
uuid: Submission UUID
"""
return await get_client().get(f"/public-api/get/submission/{uuid}/iocs")
@app.tool
async def get_submission_yara_rules(uuid: str) -> Dict[str, Any]:
"""
Get all matched YARA rules for a specific submission.
Args:
uuid: Submission UUID
"""
return await get_client().get(f"/public-api/get/submission/{uuid}/matched-yara-rules")
@app.tool
async def get_submission_varist_results(uuid: str) -> Dict[str, Any]:
"""
Get Varist Hybrid Analyzer results for a specific submission.
Args:
uuid: Submission UUID
"""
return await get_client().get(f"/public-api/get/submission/{uuid}/varist-hybrid-analyzer-results")
@app.tool
async def get_submission_artifacts(uuid: str) -> Dict[str, Any]:
"""
Get all artifacts for a specific submission.
Args:
uuid: Submission UUID
"""
return await get_client().get(f"/public-api/get/submission/{uuid}/analysis-artifacts")
@app.tool
async def get_submission_config_extractor(uuid: str) -> Dict[str, Any]:
"""
Get all extracted configurations for a specific submission.
Args:
uuid: Submission UUID
"""
return await get_client().get(f"/public-api/get/submission/{uuid}/config-extractor-results")
# Network Analysis Tools
@app.tool
async def get_submission_dns(uuid: str) -> Dict[str, Any]:
"""
Get all DNS queries for a specific submission.
Args:
uuid: Submission UUID
"""
return await get_client().get(f"/public-api/get/submission/{uuid}/dns")
@app.tool
async def get_submission_http(uuid: str) -> Dict[str, Any]:
"""
Get all HTTP requests and packets for a specific submission.
Args:
uuid: Submission UUID
"""
return await get_client().get(f"/public-api/get/submission/{uuid}/http")
@app.tool
async def get_submission_tcp(uuid: str) -> Dict[str, Any]:
"""
Get all TCP requests and packets for a specific submission.
Args:
uuid: Submission UUID
"""
return await get_client().get(f"/public-api/get/submission/{uuid}/tcp")
@app.tool
async def get_submission_udp(uuid: str) -> Dict[str, Any]:
"""
Get all UDP requests and packets for a specific submission.
Args:
uuid: Submission UUID
"""
return await get_client().get(f"/public-api/get/submission/{uuid}/udp")
@app.tool
async def get_submission_network_threats(uuid: str) -> Dict[str, Any]:
"""
Get all network threats for a specific submission.
Args:
uuid: Submission UUID
"""
return await get_client().get(f"/public-api/get/submission/{uuid}/threats")
# User Submissions Tools
@app.tool
async def get_my_submissions(page: int = 1, jump: int = 10) -> Dict[str, Any]:
"""
Get user's submissions with pagination.
Args:
page: Page number (default: 1)
jump: Number of items per page (default: 10)
"""
return await get_client().get(f"/public-api/get/my-submissions/{page}/{jump}")
@app.tool
async def get_public_submissions(page: int = 1, jump: int = 10) -> Dict[str, Any]:
"""
Get public submissions with pagination.
Args:
page: Page number (default: 1)
jump: Number of items per page (default: 10)
"""
return await get_client().get(f"/public-api/get/public-submissions/{page}/{jump}")
@app.tool
async def search_by_hash(hash: str, page: int = 1, jump: int = 10) -> Dict[str, Any]:
"""
Search submissions by file hash (MD5, SHA1, or SHA256).
Args:
hash: File hash to search for
page: Page number (default: 1)
jump: Number of items per page (default: 10)
"""
return await get_client().get(f"/public-api/get/{hash}/{page}/{jump}")
# Download Tools
@app.tool
async def download_sanitized_file(uuid: str) -> str:
"""
Download the CDR-sanitized file for a given submission UUID.
Args:
uuid: Submission UUID
Returns:
Base64-encoded file content
"""
import base64
content = await get_client().download(f"/public-api/download/cdr/{uuid}")
return base64.b64encode(content).decode('utf-8')
@app.tool
async def download_html_report(uuid: str) -> str:
"""
Download HTML analysis report for a submission.
Args:
uuid: Submission UUID
Returns:
HTML report content
"""
content = await get_client().download(f"/public-api/download/html-report/{uuid}")
return content.decode('utf-8')
def main() -> None:
"""Main entry point for the MCP server."""
if not API_KEY:
print("Error: THREATZONE_API_KEY environment variable is required")
exit(1)
print("Starting Threat.Zone MCP Server...")
print(f"API URL: {API_BASE_URL}")
print(f"API Key: {API_KEY[:8]}...")
app.run()
if __name__ == "__main__":
main()
```