# Directory Structure ``` ├── env.example ├── LICENSE ├── pyproject.toml ├── README.md ├── requirements.txt ├── src │ └── threatzone_mcp │ ├── __init__.py │ ├── __main__.py │ └── server.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` 1 | fastmcp>=2.0.0 2 | httpx>=0.27.0 3 | pydantic>=2.5.0 4 | python-dotenv>=1.0.0 5 | typing-extensions>=4.8.0 ``` -------------------------------------------------------------------------------- /src/threatzone_mcp/__main__.py: -------------------------------------------------------------------------------- ```python 1 | """Make the package executable with python -m threatzone_mcp.""" 2 | 3 | from .server import main 4 | 5 | if __name__ == "__main__": 6 | main() ``` -------------------------------------------------------------------------------- /src/threatzone_mcp/__init__.py: -------------------------------------------------------------------------------- ```python 1 | """Threat.Zone MCP Server package.""" 2 | 3 | __version__ = "0.1.0" 4 | __author__ = "Malwation Team" 5 | __email__ = "[email protected]" 6 | 7 | from .server import app, main 8 | 9 | __all__ = ["app", "main", "__version__"] ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml 1 | [project] 2 | name = "threatzone-mcp" 3 | version = "0.1.0" 4 | description = "Model Context Protocol (MCP) server for Threat.Zone API" 5 | authors = [ 6 | {name = "Malwation Team", email = "[email protected]"} 7 | ] 8 | readme = "README.md" 9 | requires-python = ">=3.10" 10 | license = {text = "GPL-3.0-or-later"} 11 | keywords = ["mcp", "threat-zone", "malware", "security", "analysis"] 12 | classifiers = [ 13 | "Development Status :: 3 - Alpha", 14 | "Intended Audience :: Developers", 15 | "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", 16 | "Programming Language :: Python :: 3", 17 | "Programming Language :: Python :: 3.10", 18 | "Programming Language :: Python :: 3.11", 19 | "Programming Language :: Python :: 3.12", 20 | "Topic :: Security", 21 | "Topic :: Software Development :: Libraries :: Python Modules", 22 | ] 23 | 24 | dependencies = [ 25 | "fastmcp>=2.0.0", 26 | "httpx>=0.27.0", 27 | "pydantic>=2.5.0", 28 | "python-dotenv>=1.0.0", 29 | "typing-extensions>=4.8.0" 30 | ] 31 | 32 | [project.optional-dependencies] 33 | dev = [ 34 | "pytest>=7.0.0", 35 | "pytest-asyncio>=0.21.0", 36 | "black>=23.0.0", 37 | "isort>=5.12.0", 38 | "mypy>=1.8.0", 39 | "pre-commit>=3.0.0" 40 | ] 41 | 42 | [project.urls] 43 | Homepage = "https://github.com/threat-zone/threatzonemcp" 44 | Documentation = "https://threat.zone/docs" 45 | Repository = "https://github.com/threat-zone/threatzonemcp" 46 | Issues = "https://github.com/threat-zone/threatzonemcp/issues" 47 | 48 | [project.scripts] 49 | threatzone-mcp = "threatzone_mcp.server:main" 50 | 51 | [tool.uv] 52 | dev-dependencies = [ 53 | "pytest>=7.0.0", 54 | "pytest-asyncio>=0.21.0", 55 | "black>=23.0.0", 56 | "isort>=5.12.0", 57 | "mypy>=1.8.0", 58 | "pre-commit>=3.0.0" 59 | ] 60 | 61 | [build-system] 62 | requires = ["setuptools>=61.0", "wheel"] 63 | build-backend = "setuptools.build_meta" 64 | 65 | [tool.setuptools.packages.find] 66 | where = ["src"] 67 | 68 | [tool.setuptools.package-dir] 69 | "" = "src" 70 | 71 | [tool.black] 72 | line-length = 88 73 | target-version = ['py310'] 74 | include = '\.pyi?$' 75 | exclude = ''' 76 | /( 77 | \.eggs 78 | | \.git 79 | | \.hg 80 | | \.mypy_cache 81 | | \.tox 82 | | \.venv 83 | | _build 84 | | buck-out 85 | | build 86 | | dist 87 | )/ 88 | ''' 89 | 90 | [tool.isort] 91 | profile = "black" 92 | multi_line_output = 3 93 | line_length = 88 94 | known_first_party = ["threatzone_mcp"] 95 | 96 | [tool.mypy] 97 | python_version = "3.10" 98 | warn_return_any = true 99 | warn_unused_configs = true 100 | disallow_untyped_defs = true 101 | disallow_incomplete_defs = true 102 | check_untyped_defs = true 103 | disallow_untyped_decorators = true 104 | no_implicit_optional = true 105 | warn_redundant_casts = true 106 | warn_unused_ignores = true 107 | warn_no_return = true 108 | warn_unreachable = true 109 | strict_equality = true 110 | 111 | [tool.pytest.ini_options] 112 | testpaths = ["tests"] 113 | python_files = ["test_*.py", "*_test.py"] 114 | python_classes = ["Test*"] 115 | python_functions = ["test_*"] 116 | addopts = "-v --tb=short" 117 | asyncio_mode = "auto" ``` -------------------------------------------------------------------------------- /src/threatzone_mcp/server.py: -------------------------------------------------------------------------------- ```python 1 | """Threat.Zone MCP Server implementation.""" 2 | 3 | import json 4 | import os 5 | from pathlib import Path 6 | from typing import Any, Dict, List, Optional, Union 7 | 8 | import httpx 9 | from dotenv import load_dotenv 10 | from fastmcp import FastMCP 11 | from pydantic import BaseModel, Field 12 | 13 | # Load environment variables 14 | load_dotenv() 15 | 16 | # Initialize FastMCP server 17 | app = FastMCP("ThreatZone") 18 | 19 | # Configuration 20 | API_BASE_URL = os.getenv("THREATZONE_API_URL", "https://app.threat.zone") 21 | API_KEY = os.getenv("THREATZONE_API_KEY") 22 | 23 | 24 | class ThreatZoneError(Exception): 25 | """Custom exception for Threat.Zone API errors.""" 26 | pass 27 | 28 | 29 | class APIClient: 30 | """HTTP client for Threat.Zone API.""" 31 | 32 | def __init__(self, api_key: str, base_url: str = API_BASE_URL): 33 | self.api_key = api_key 34 | self.base_url = base_url 35 | self.headers = { 36 | "Authorization": f"Bearer {api_key}", 37 | "Content-Type": "application/json" 38 | } 39 | 40 | async def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 41 | """Make GET request to API.""" 42 | async with httpx.AsyncClient() as client: 43 | response = await client.get( 44 | f"{self.base_url}{endpoint}", 45 | headers=self.headers, 46 | params=params 47 | ) 48 | await self._handle_response(response) 49 | return response.json() 50 | 51 | async def post(self, endpoint: str, data: Optional[Dict[str, Any]] = None, files: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 52 | """Make POST request to API.""" 53 | async with httpx.AsyncClient() as client: 54 | headers = {"Authorization": f"Bearer {self.api_key}"} 55 | if files: 56 | # For file uploads, don't set Content-Type 57 | response = await client.post( 58 | f"{self.base_url}{endpoint}", 59 | headers=headers, 60 | data=data, 61 | files=files 62 | ) 63 | else: 64 | response = await client.post( 65 | f"{self.base_url}{endpoint}", 66 | headers=self.headers, 67 | json=data 68 | ) 69 | await self._handle_response(response) 70 | return response.json() 71 | 72 | async def download(self, endpoint: str) -> bytes: 73 | """Download file from API.""" 74 | async with httpx.AsyncClient() as client: 75 | response = await client.get( 76 | f"{self.base_url}{endpoint}", 77 | headers=self.headers 78 | ) 79 | await self._handle_response(response) 80 | return response.content 81 | 82 | async def _handle_response(self, response: httpx.Response) -> None: 83 | """Handle API response errors.""" 84 | if response.status_code == 401: 85 | raise ThreatZoneError("Authentication failed. Check your API key.") 86 | elif response.status_code == 404: 87 | raise ThreatZoneError("Resource not found.") 88 | elif response.status_code == 422: 89 | raise ThreatZoneError("Invalid request parameters.") 90 | elif response.status_code >= 400: 91 | try: 92 | error_data = response.json() 93 | error_msg = error_data.get("message", f"API error: {response.status_code}") 94 | except: 95 | error_msg = f"API error: {response.status_code}" 96 | raise ThreatZoneError(error_msg) 97 | 98 | 99 | # Initialize API client (lazy initialization) 100 | client = None 101 | 102 | def get_client(): 103 | """Get or create the API client.""" 104 | global client 105 | if client is None: 106 | if not API_KEY: 107 | raise ThreatZoneError("THREATZONE_API_KEY environment variable is required") 108 | client = APIClient(API_KEY) 109 | return client 110 | 111 | 112 | # Constants Tools 113 | @app.tool 114 | async def get_metafields() -> Dict[str, Any]: 115 | """Get available metafields for scan configuration.""" 116 | return await get_client().get("/public-api/constants/metafields") 117 | 118 | 119 | @app.tool 120 | async def get_levels() -> Dict[str, Any]: 121 | """Get threat levels used in analysis results.""" 122 | return await get_client().get("/public-api/constants/levels") 123 | 124 | 125 | @app.tool 126 | async def get_statuses() -> Dict[str, Any]: 127 | """Get submission statuses.""" 128 | return await get_client().get("/public-api/constants/statuses") 129 | 130 | 131 | @app.tool 132 | async def get_sample_metafield() -> Dict[str, Any]: 133 | """Get sample metafield configuration for sandbox analysis.""" 134 | return await get_client().get("/public-api/constants/samplemetafield") 135 | 136 | 137 | @app.tool 138 | async def interpret_status(status_value: int) -> str: 139 | """ 140 | Interpret a numeric status value from submission results. 141 | 142 | Args: 143 | status_value: Numeric status value (1-5) 144 | 145 | Returns: 146 | Human-readable status description 147 | """ 148 | status_map = { 149 | 1: "File received", 150 | 2: "Submission is failed", 151 | 3: "Submission is running", 152 | 4: "Submission VM is ready", 153 | 5: "Submission is finished" 154 | } 155 | return status_map.get(status_value, f"Unknown status: {status_value}") 156 | 157 | 158 | @app.tool 159 | async def interpret_threat_level(level_value: int) -> str: 160 | """ 161 | Interpret a numeric threat level value from analysis results. 162 | 163 | Args: 164 | level_value: Numeric threat level (0-3) 165 | 166 | Returns: 167 | Human-readable threat level description 168 | """ 169 | level_map = { 170 | 0: "Unknown", 171 | 1: "Informative", 172 | 2: "Suspicious", 173 | 3: "Malicious" 174 | } 175 | return level_map.get(level_value, f"Unknown level: {level_value}") 176 | 177 | 178 | @app.tool 179 | async def get_submission_status_summary(uuid: str) -> Dict[str, Any]: 180 | """ 181 | Get submission details with interpreted status and threat level. 182 | 183 | Args: 184 | uuid: Submission UUID 185 | 186 | Returns: 187 | Submission details with human-readable status and threat level 188 | """ 189 | submission = await get_client().get(f"/public-api/get/submission/{uuid}") 190 | 191 | # Add interpreted values if available 192 | if 'status' in submission: 193 | submission['status_description'] = await interpret_status(submission['status']) 194 | 195 | if 'level' in submission: 196 | submission['threat_level_description'] = await interpret_threat_level(submission['level']) 197 | 198 | return submission 199 | 200 | 201 | # User Information Tools 202 | @app.tool 203 | async def get_user_info() -> Dict[str, Any]: 204 | """Get current user information, workspace details, and usage limits.""" 205 | return await get_client().get("/public-api/me") 206 | 207 | 208 | @app.tool 209 | async def get_server_config() -> Dict[str, Any]: 210 | """ 211 | Get current server configuration including API URL and connection status. 212 | 213 | Returns: 214 | Configuration details including API URL, version, and status 215 | """ 216 | config = { 217 | "api_url": API_BASE_URL, 218 | "version": "0.1.0", 219 | "api_key_configured": bool(API_KEY and len(API_KEY) > 10) 220 | } 221 | 222 | # Test connection if API key is available 223 | if config["api_key_configured"]: 224 | try: 225 | user_info = await get_client().get("/public-api/me") 226 | config["connection_status"] = "connected" 227 | config["workspace"] = user_info.get("userInfo", {}).get("workspace", {}).get("name", "Unknown") 228 | except Exception as e: 229 | config["connection_status"] = "failed" 230 | config["error"] = str(e) 231 | else: 232 | config["connection_status"] = "no_api_key" 233 | 234 | return config 235 | 236 | 237 | # Scanning Tools 238 | @app.tool 239 | async def scan_url(url: str, is_public: bool = False) -> Dict[str, Any]: 240 | """ 241 | Analyze a URL for threats and malicious content. 242 | 243 | Args: 244 | url: The URL to analyze 245 | is_public: Whether the scan results should be public 246 | """ 247 | data = { 248 | "url": url, 249 | "isPublic": is_public 250 | } 251 | return await get_client().post("/public-api/scan/url", data=data) 252 | 253 | 254 | @app.tool 255 | async def scan_file_sandbox( 256 | file_path: str, 257 | is_public: bool = False, 258 | entrypoint: Optional[str] = None, 259 | password: Optional[str] = None, 260 | environment: str = "w10_x64", 261 | timeout: int = 180, 262 | work_path: str = "desktop", 263 | mouse_simulation: bool = True, 264 | https_inspection: bool = False, 265 | internet_connection: bool = False, 266 | raw_logs: bool = False, 267 | snapshot: bool = False, 268 | sleep_evasion: bool = False, 269 | smart_tracing: bool = False, 270 | dump_collector: bool = False, 271 | open_in_browser: bool = False, 272 | extension_check: bool = True, 273 | modules: Optional[List[str]] = None, 274 | auto_config: bool = False 275 | ) -> Dict[str, Any]: 276 | """ 277 | Submit a file for advanced sandbox analysis with detailed configuration. 278 | 279 | Args: 280 | file_path: Path to the file to analyze 281 | is_public: Whether the scan results should be public (default: False) 282 | entrypoint: File to execute within archive (if applicable) 283 | password: Password for archive files (if applicable) 284 | environment: Analysis environment - w7_x64, w10_x64, w11_x64, macos, android, linux (default: w10_x64) 285 | timeout: Analysis timeout in seconds - 60, 120, 180, 240, 300 (default: 180) 286 | work_path: Working directory - desktop, root, %AppData%, windows, temp (default: desktop) 287 | mouse_simulation: Enable mouse simulation (default: True) 288 | https_inspection: Enable HTTPS inspection (default: False) 289 | internet_connection: Enable internet connection (default: False) 290 | raw_logs: Include raw logs (default: False) 291 | snapshot: Take VM snapshots (default: False) 292 | sleep_evasion: Enable sleep evasion techniques (default: False) 293 | smart_tracing: Enable smart tracing (default: False) 294 | dump_collector: Enable dump collection (default: False) 295 | open_in_browser: Open files in browser (default: False) 296 | extension_check: Perform extension check (default: True) 297 | modules: Analysis modules to use, e.g., ["csi", "cdr"] (default: None) 298 | auto_config: Use automatic configuration (default: False) 299 | """ 300 | if not Path(file_path).exists(): 301 | raise ThreatZoneError(f"File not found: {file_path}") 302 | 303 | # Build the analyze configuration 304 | analyze_config = [ 305 | {"metafieldId": "environment", "value": environment}, 306 | {"metafieldId": "private", "value": not is_public}, 307 | {"metafieldId": "timeout", "value": timeout}, 308 | {"metafieldId": "work_path", "value": work_path}, 309 | {"metafieldId": "mouse_simulation", "value": mouse_simulation}, 310 | {"metafieldId": "https_inspection", "value": https_inspection}, 311 | {"metafieldId": "internet_connection", "value": internet_connection}, 312 | {"metafieldId": "raw_logs", "value": raw_logs}, 313 | {"metafieldId": "snapshot", "value": snapshot}, 314 | {"metafieldId": "sleep_evasion", "value": sleep_evasion}, 315 | {"metafieldId": "smart_tracing", "value": smart_tracing}, 316 | {"metafieldId": "dump_collector", "value": dump_collector}, 317 | {"metafieldId": "open_in_browser", "value": open_in_browser} 318 | ] 319 | 320 | # Prepare form data 321 | data = { 322 | "analyzeConfig": json.dumps(analyze_config), 323 | "extensionCheck": str(extension_check).lower() 324 | } 325 | 326 | if entrypoint: 327 | data["entrypoint"] = entrypoint 328 | if password: 329 | data["password"] = password 330 | if modules: 331 | data["modules"] = ",".join(modules) 332 | 333 | # Build URL with auto parameter 334 | url = f"/public-api/scan/sandbox?auto={str(auto_config).lower()}" 335 | 336 | files = {"file": open(file_path, "rb")} 337 | try: 338 | return await get_client().post(url, data=data, files=files) 339 | finally: 340 | files["file"].close() 341 | 342 | 343 | @app.tool 344 | async def scan_file_sandbox_simple( 345 | file_path: str, 346 | is_public: bool = False, 347 | entrypoint: Optional[str] = None, 348 | password: Optional[str] = None 349 | ) -> Dict[str, Any]: 350 | """ 351 | Submit a file for simple sandbox analysis using default settings. 352 | 353 | This is a simplified version of scan_file_sandbox with default configurations. 354 | Use scan_file_sandbox for advanced configuration options. 355 | 356 | Args: 357 | file_path: Path to the file to analyze 358 | is_public: Whether the scan results should be public (default: False) 359 | entrypoint: File to execute within archive (if applicable) 360 | password: Password for archive files (if applicable) 361 | """ 362 | return await scan_file_sandbox( 363 | file_path=file_path, 364 | is_public=is_public, 365 | entrypoint=entrypoint, 366 | password=password, 367 | auto_config=True # Use automatic configuration for simplicity 368 | ) 369 | 370 | 371 | @app.tool 372 | async def scan_file_static( 373 | file_path: str, 374 | is_public: bool = False, 375 | entrypoint: Optional[str] = None, 376 | password: Optional[str] = None 377 | ) -> Dict[str, Any]: 378 | """ 379 | Submit a file for static analysis. 380 | 381 | Args: 382 | file_path: Path to the file to analyze 383 | is_public: Whether the scan results should be public 384 | entrypoint: File to execute within archive (if applicable) 385 | password: Password for archive files (if applicable) 386 | """ 387 | if not Path(file_path).exists(): 388 | raise ThreatZoneError(f"File not found: {file_path}") 389 | 390 | data = {"isPublic": is_public} 391 | if entrypoint: 392 | data["entrypoint"] = entrypoint 393 | if password: 394 | data["password"] = password 395 | 396 | files = {"file": open(file_path, "rb")} 397 | try: 398 | return await get_client().post("/public-api/scan/static", data=data, files=files) 399 | finally: 400 | files["file"].close() 401 | 402 | 403 | @app.tool 404 | async def scan_file_cdr( 405 | file_path: str, 406 | is_public: bool = False, 407 | entrypoint: Optional[str] = None, 408 | password: Optional[str] = None 409 | ) -> Dict[str, Any]: 410 | """ 411 | Submit a file for CDR (Content Disarm and Reconstruction) processing. 412 | 413 | Args: 414 | file_path: Path to the file to process 415 | is_public: Whether the scan results should be public 416 | entrypoint: File to execute within archive (if applicable) 417 | password: Password for archive files (if applicable) 418 | """ 419 | if not Path(file_path).exists(): 420 | raise ThreatZoneError(f"File not found: {file_path}") 421 | 422 | data = {"isPublic": is_public} 423 | if entrypoint: 424 | data["entrypoint"] = entrypoint 425 | if password: 426 | data["password"] = password 427 | 428 | files = {"file": open(file_path, "rb")} 429 | try: 430 | return await get_client().post("/public-api/scan/cdr", data=data, files=files) 431 | finally: 432 | files["file"].close() 433 | 434 | 435 | # Submission Retrieval Tools 436 | @app.tool 437 | async def get_submission(uuid: str) -> Dict[str, Any]: 438 | """ 439 | Get submission details by UUID. 440 | 441 | Args: 442 | uuid: Submission UUID 443 | """ 444 | return await get_client().get(f"/public-api/get/submission/{uuid}") 445 | 446 | 447 | @app.tool 448 | async def get_submission_indicators(uuid: str) -> Dict[str, Any]: 449 | """ 450 | Get all indicators for a specific submission. 451 | 452 | Args: 453 | uuid: Submission UUID 454 | """ 455 | return await get_client().get(f"/public-api/get/submission/{uuid}/indicators") 456 | 457 | 458 | @app.tool 459 | async def get_submission_iocs(uuid: str) -> Dict[str, Any]: 460 | """ 461 | Get all Indicators of Compromise for a specific submission. 462 | 463 | Args: 464 | uuid: Submission UUID 465 | """ 466 | return await get_client().get(f"/public-api/get/submission/{uuid}/iocs") 467 | 468 | 469 | @app.tool 470 | async def get_submission_yara_rules(uuid: str) -> Dict[str, Any]: 471 | """ 472 | Get all matched YARA rules for a specific submission. 473 | 474 | Args: 475 | uuid: Submission UUID 476 | """ 477 | return await get_client().get(f"/public-api/get/submission/{uuid}/matched-yara-rules") 478 | 479 | 480 | @app.tool 481 | async def get_submission_varist_results(uuid: str) -> Dict[str, Any]: 482 | """ 483 | Get Varist Hybrid Analyzer results for a specific submission. 484 | 485 | Args: 486 | uuid: Submission UUID 487 | """ 488 | return await get_client().get(f"/public-api/get/submission/{uuid}/varist-hybrid-analyzer-results") 489 | 490 | 491 | @app.tool 492 | async def get_submission_artifacts(uuid: str) -> Dict[str, Any]: 493 | """ 494 | Get all artifacts for a specific submission. 495 | 496 | Args: 497 | uuid: Submission UUID 498 | """ 499 | return await get_client().get(f"/public-api/get/submission/{uuid}/analysis-artifacts") 500 | 501 | 502 | @app.tool 503 | async def get_submission_config_extractor(uuid: str) -> Dict[str, Any]: 504 | """ 505 | Get all extracted configurations for a specific submission. 506 | 507 | Args: 508 | uuid: Submission UUID 509 | """ 510 | return await get_client().get(f"/public-api/get/submission/{uuid}/config-extractor-results") 511 | 512 | 513 | # Network Analysis Tools 514 | @app.tool 515 | async def get_submission_dns(uuid: str) -> Dict[str, Any]: 516 | """ 517 | Get all DNS queries for a specific submission. 518 | 519 | Args: 520 | uuid: Submission UUID 521 | """ 522 | return await get_client().get(f"/public-api/get/submission/{uuid}/dns") 523 | 524 | 525 | @app.tool 526 | async def get_submission_http(uuid: str) -> Dict[str, Any]: 527 | """ 528 | Get all HTTP requests and packets for a specific submission. 529 | 530 | Args: 531 | uuid: Submission UUID 532 | """ 533 | return await get_client().get(f"/public-api/get/submission/{uuid}/http") 534 | 535 | 536 | @app.tool 537 | async def get_submission_tcp(uuid: str) -> Dict[str, Any]: 538 | """ 539 | Get all TCP requests and packets for a specific submission. 540 | 541 | Args: 542 | uuid: Submission UUID 543 | """ 544 | return await get_client().get(f"/public-api/get/submission/{uuid}/tcp") 545 | 546 | 547 | @app.tool 548 | async def get_submission_udp(uuid: str) -> Dict[str, Any]: 549 | """ 550 | Get all UDP requests and packets for a specific submission. 551 | 552 | Args: 553 | uuid: Submission UUID 554 | """ 555 | return await get_client().get(f"/public-api/get/submission/{uuid}/udp") 556 | 557 | 558 | @app.tool 559 | async def get_submission_network_threats(uuid: str) -> Dict[str, Any]: 560 | """ 561 | Get all network threats for a specific submission. 562 | 563 | Args: 564 | uuid: Submission UUID 565 | """ 566 | return await get_client().get(f"/public-api/get/submission/{uuid}/threats") 567 | 568 | 569 | # User Submissions Tools 570 | @app.tool 571 | async def get_my_submissions(page: int = 1, jump: int = 10) -> Dict[str, Any]: 572 | """ 573 | Get user's submissions with pagination. 574 | 575 | Args: 576 | page: Page number (default: 1) 577 | jump: Number of items per page (default: 10) 578 | """ 579 | return await get_client().get(f"/public-api/get/my-submissions/{page}/{jump}") 580 | 581 | 582 | @app.tool 583 | async def get_public_submissions(page: int = 1, jump: int = 10) -> Dict[str, Any]: 584 | """ 585 | Get public submissions with pagination. 586 | 587 | Args: 588 | page: Page number (default: 1) 589 | jump: Number of items per page (default: 10) 590 | """ 591 | return await get_client().get(f"/public-api/get/public-submissions/{page}/{jump}") 592 | 593 | 594 | @app.tool 595 | async def search_by_hash(hash: str, page: int = 1, jump: int = 10) -> Dict[str, Any]: 596 | """ 597 | Search submissions by file hash (MD5, SHA1, or SHA256). 598 | 599 | Args: 600 | hash: File hash to search for 601 | page: Page number (default: 1) 602 | jump: Number of items per page (default: 10) 603 | """ 604 | return await get_client().get(f"/public-api/get/{hash}/{page}/{jump}") 605 | 606 | 607 | # Download Tools 608 | @app.tool 609 | async def download_sanitized_file(uuid: str) -> str: 610 | """ 611 | Download the CDR-sanitized file for a given submission UUID. 612 | 613 | Args: 614 | uuid: Submission UUID 615 | 616 | Returns: 617 | Base64-encoded file content 618 | """ 619 | import base64 620 | content = await get_client().download(f"/public-api/download/cdr/{uuid}") 621 | return base64.b64encode(content).decode('utf-8') 622 | 623 | 624 | @app.tool 625 | async def download_html_report(uuid: str) -> str: 626 | """ 627 | Download HTML analysis report for a submission. 628 | 629 | Args: 630 | uuid: Submission UUID 631 | 632 | Returns: 633 | HTML report content 634 | """ 635 | content = await get_client().download(f"/public-api/download/html-report/{uuid}") 636 | return content.decode('utf-8') 637 | 638 | 639 | def main() -> None: 640 | """Main entry point for the MCP server.""" 641 | if not API_KEY: 642 | print("Error: THREATZONE_API_KEY environment variable is required") 643 | exit(1) 644 | 645 | print("Starting Threat.Zone MCP Server...") 646 | print(f"API URL: {API_BASE_URL}") 647 | print(f"API Key: {API_KEY[:8]}...") 648 | app.run() 649 | 650 | 651 | if __name__ == "__main__": 652 | main() ```