# Directory Structure ``` ├── main.py ├── README.md ├── requirements.txt ├── src │ ├── __init__.py │ ├── api │ │ ├── __init__.py │ │ ├── routes.py │ │ └── schemas.py │ ├── binary_reader │ │ ├── __init__.py │ │ ├── base_reader.py │ │ ├── unreal_reader.py │ │ └── utils.py │ └── config.py └── tests ├── __init__.py ├── test_api.py └── test_binary_reader.py ``` # Files -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # Binary Reader MCP A Model Context Protocol server for reading and analyzing binary files. This server provides tools for reading and analyzing various binary file formats, with initial support for Unreal Engine asset files (.uasset). ## Features - Read and analyze Unreal Engine .uasset files - Extract binary file metadata and structure - Auto-detect file formats - Extensible architecture for adding new binary format support ## Installation 1. Clone the repository: ```bash git clone https://github.com/berlinbra/binary-reader-mcp.git cd binary-reader-mcp ``` 2. Create a virtual environment and activate it: ```bash python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate ``` 3. Install dependencies: ```bash pip install -r requirements.txt ``` ## Usage The server provides several tools through the Model Context Protocol: ### 1. Read Unreal Asset Files ```python # Example usage through MCP tool: read-unreal-asset arguments: file_path: "path/to/your/asset.uasset" ``` ### 2. Read Generic Binary Files ```python # Example usage through MCP tool: read-binary-metadata arguments: file_path: "path/to/your/file.bin" format: "auto" # or "unreal", "custom" ``` ## Development ### Project Structure ``` binary-reader-mcp/ ├── README.md ├── requirements.txt ├── main.py ├── src/ │ ├── __init__.py │ ├── binary_reader/ │ │ ├── __init__.py │ │ ├── base_reader.py │ │ ├── unreal_reader.py │ │ └── utils.py │ ├── api/ │ │ ├── __init__.py │ │ ├── routes.py │ │ └── schemas.py │ └── config.py └── tests/ ├── __init__.py ├── test_binary_reader.py └── test_api.py ``` ### Adding New Binary Format Support To add support for a new binary format: 1. Create a new reader class that inherits from `BinaryReader` 2. Implement the required methods (`read_header`, `read_metadata`) 3. Add the new format to the format auto-detection logic 4. Update the tools list to include the new format ## Contributing 1. Fork the repository 2. Create your feature branch (`git checkout -b feature/amazing-feature`) 3. Commit your changes (`git commit -m 'Add some amazing feature'`) 4. Push to the branch (`git push origin feature/amazing-feature`) 5. Open a Pull Request ## License This project is licensed under the MIT License - see the LICENSE file for details. ``` -------------------------------------------------------------------------------- /src/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/api/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /tests/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` mcp-core>=0.1.0 fastapi>=0.68.0 uvicorn>=0.15.0 httpx>=0.24.0 python-dotenv>=0.19.0 structlog>=21.1.0 typing-extensions>=4.0.0 ``` -------------------------------------------------------------------------------- /src/binary_reader/__init__.py: -------------------------------------------------------------------------------- ```python from .base_reader import BinaryReader from .unreal_reader import UnrealAssetReader __all__ = ['BinaryReader', 'UnrealAssetReader'] ``` -------------------------------------------------------------------------------- /src/api/schemas.py: -------------------------------------------------------------------------------- ```python from pydantic import BaseModel from typing import Dict, Any, Optional class BinaryAnalysisRequest(BaseModel): file_path: str format: Optional[str] = "auto" class BinaryAnalysisResponse(BaseModel): file_path: str header: Dict[str, Any] metadata: Dict[str, Any] format: str ``` -------------------------------------------------------------------------------- /src/config.py: -------------------------------------------------------------------------------- ```python import os from typing import Dict, Any class Config: DEBUG: bool = os.getenv("DEBUG", "False").lower() == "true" MAX_FILE_SIZE: int = 1024 * 1024 * 100 # 100MB SUPPORTED_FORMATS: Dict[str, Any] = { "unreal": { "extensions": [".uasset"], "magic": bytes.fromhex('C1832A9E') } } ``` -------------------------------------------------------------------------------- /src/binary_reader/utils.py: -------------------------------------------------------------------------------- ```python from typing import BinaryIO, Dict, Any import os def get_file_size(file_path: str) -> int: """Get the size of a file in bytes.""" return os.path.getsize(file_path) def read_file_header(file: BinaryIO, size: int = 16) -> bytes: """Read the first n bytes of a file.""" current_pos = file.tell() file.seek(0) header = file.read(size) file.seek(current_pos) return header def detect_file_format(header: bytes) -> str: """Detect file format based on magic numbers/headers.""" if len(header) < 4: return "unknown" # Unreal Engine .uasset if header[:4] == bytes.fromhex('C1832A9E'): return "unreal" # Add more format detection here return "unknown" ``` -------------------------------------------------------------------------------- /src/api/routes.py: -------------------------------------------------------------------------------- ```python from fastapi import APIRouter, HTTPException from .schemas import BinaryAnalysisRequest, BinaryAnalysisResponse from ..binary_reader import UnrealAssetReader router = APIRouter() @router.post("/analyze", response_model=BinaryAnalysisResponse) async def analyze_binary_file(request: BinaryAnalysisRequest): try: with UnrealAssetReader(request.file_path) as reader: header = reader.read_header() metadata = reader.read_metadata() return BinaryAnalysisResponse( file_path=request.file_path, header=header, metadata=metadata, format="unreal" ) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) ``` -------------------------------------------------------------------------------- /tests/test_api.py: -------------------------------------------------------------------------------- ```python import pytest from fastapi.testclient import TestClient from src.api.routes import router from fastapi import FastAPI app = FastAPI() app.include_router(router) @pytest.fixture def client(): return TestClient(app) def test_analyze_endpoint_missing_file(client): response = client.post("/analyze", json={"file_path": "nonexistent.uasset"}) assert response.status_code == 400 assert "File not found" in response.json()["detail"] def test_analyze_endpoint_invalid_request(client): response = client.post("/analyze", json={}) assert response.status_code == 422 # Validation error def test_analyze_endpoint_with_mock_file(client, tmp_path): # Create a mock .uasset file file_path = tmp_path / "test.uasset" with open(file_path, "wb") as f: f.write(bytes.fromhex('C1832A9E')) f.write(b"\x00" * 28) # Padding for header and metadata response = client.post("/analyze", json={"file_path": str(file_path)}) assert response.status_code == 200 data = response.json() assert data["format"] == "unreal" assert "header" in data assert "metadata" in data ``` -------------------------------------------------------------------------------- /src/binary_reader/unreal_reader.py: -------------------------------------------------------------------------------- ```python from typing import Dict, Any from .base_reader import BinaryReader class UnrealAssetReader(BinaryReader): MAGIC = 0xC1832A9E def read_header(self) -> Dict[str, Any]: magic = self.read_uint32() if magic != self.MAGIC: raise ValueError("Not a valid .uasset file") return { 'magic': hex(magic), 'legacy_version': self.read_uint32(), 'legacy_ue3_version': self.read_uint32(), 'file_version_ue4': self.read_uint32(), 'file_size': self.read_uint32() } def read_metadata(self) -> Dict[str, Any]: # Read bulk data flags flags = self.read_uint32() # Read element count element_count = self.read_uint32() # Read bulk data size bulk_data_size = self.read_uint32() return { 'flags': flags, 'element_count': element_count, 'bulk_data_size': bulk_data_size } def read_name_table(self) -> list[str]: names_count = self.read_int32() names = [] for _ in range(names_count): name = self.read_string() names.append(name) return names ``` -------------------------------------------------------------------------------- /src/binary_reader/base_reader.py: -------------------------------------------------------------------------------- ```python from abc import ABC, abstractmethod from typing import BinaryIO, Dict, Any import struct class BinaryReader(ABC): def __init__(self, file_path: str): self.file_path = file_path self._file: BinaryIO | None = None self._position = 0 def __enter__(self): self._file = open(self.file_path, 'rb') return self def __exit__(self, exc_type, exc_val, exc_tb): if self._file: self._file.close() def read_bytes(self, size: int) -> bytes: if not self._file: raise ValueError("File is not open") data = self._file.read(size) self._position += len(data) return data def read_uint32(self) -> int: return struct.unpack('<I', self.read_bytes(4))[0] def read_int32(self) -> int: return struct.unpack('<i', self.read_bytes(4))[0] def read_float(self) -> float: return struct.unpack('<f', self.read_bytes(4))[0] def read_string(self, encoding='utf-8') -> str: length = self.read_int32() if length == 0: return "" elif length < 0: length = abs(length) data = self.read_bytes(length * 2) return data.decode('utf-16-le').rstrip('\0') else: data = self.read_bytes(length) return data.decode(encoding).rstrip('\0') def seek(self, position: int) -> None: if not self._file: raise ValueError("File is not open") self._file.seek(position) self._position = position @property def position(self) -> int: return self._position @abstractmethod def read_header(self) -> Dict[str, Any]: pass @abstractmethod def read_metadata(self) -> Dict[str, Any]: pass ``` -------------------------------------------------------------------------------- /tests/test_binary_reader.py: -------------------------------------------------------------------------------- ```python import pytest import os from src.binary_reader import UnrealAssetReader, BinaryReader @pytest.fixture def test_unreal_asset_file(tmp_path): # Create a mock .uasset file for testing file_path = tmp_path / "test.uasset" with open(file_path, "wb") as f: # Write magic number f.write(bytes.fromhex('C1832A9E')) # Write mock version numbers f.write(int(1).to_bytes(4, 'little')) # legacy_version f.write(int(2).to_bytes(4, 'little')) # legacy_ue3_version f.write(int(3).to_bytes(4, 'little')) # file_version_ue4 f.write(int(100).to_bytes(4, 'little')) # file_size # Write mock metadata f.write(int(1).to_bytes(4, 'little')) # flags f.write(int(10).to_bytes(4, 'little')) # element_count f.write(int(1000).to_bytes(4, 'little')) # bulk_data_size return file_path def test_unreal_asset_reader_header(test_unreal_asset_file): with UnrealAssetReader(test_unreal_asset_file) as reader: header = reader.read_header() assert header['magic'] == '0xc1832a9e' assert header['legacy_version'] == 1 assert header['legacy_ue3_version'] == 2 assert header['file_version_ue4'] == 3 assert header['file_size'] == 100 def test_unreal_asset_reader_metadata(test_unreal_asset_file): with UnrealAssetReader(test_unreal_asset_file) as reader: # Skip header reader.read_header() metadata = reader.read_metadata() assert metadata['flags'] == 1 assert metadata['element_count'] == 10 assert metadata['bulk_data_size'] == 1000 def test_invalid_unreal_asset_file(tmp_path): file_path = tmp_path / "invalid.uasset" with open(file_path, "wb") as f: f.write(b"invalid data") with pytest.raises(ValueError, match="Not a valid .uasset file"): with UnrealAssetReader(file_path) as reader: reader.read_header() ``` -------------------------------------------------------------------------------- /main.py: -------------------------------------------------------------------------------- ```python from typing import Any import asyncio from mcp.server.models import InitializationOptions import mcp.types as types from mcp.server import NotificationOptions, Server import mcp.server.stdio import os from src.binary_reader.unreal_reader import UnrealAssetReader from src.binary_reader.base_reader import BinaryReader server = Server("binary_reader") @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """ List available tools for binary file analysis. Each tool specifies its arguments using JSON Schema validation. """ return [ types.Tool( name="read-unreal-asset", description="Read and analyze Unreal Engine .uasset file structure", inputSchema={ "type": "object", "properties": { "file_path": { "type": "string", "description": "Path to the .uasset file", }, }, "required": ["file_path"], }, ), types.Tool( name="read-binary-metadata", description="Read generic binary file metadata and structure", inputSchema={ "type": "object", "properties": { "file_path": { "type": "string", "description": "Path to the binary file", }, "format": { "type": "string", "description": "File format specification (if known)", "enum": ["auto", "unreal", "custom"], "default": "auto" } }, "required": ["file_path"], }, ) ] def format_unreal_asset_data(data: dict) -> str: """Format Unreal asset data into a readable string.""" try: return ( f"Unreal Asset Analysis:\n\n" f"Header Information:\n" f"Magic: {data.get('header', {}).get('magic', 'N/A')}\n" f"Legacy Version: {data.get('header', {}).get('legacy_version', 'N/A')}\n" f"UE4 Version: {data.get('header', {}).get('file_version_ue4', 'N/A')}\n" f"File Size: {data.get('header', {}).get('file_size', 'N/A')} bytes\n\n" f"Metadata:\n" f"Flags: {data.get('metadata', {}).get('flags', 'N/A')}\n" f"Element Count: {data.get('metadata', {}).get('element_count', 'N/A')}\n" f"Bulk Data Size: {data.get('metadata', {}).get('bulk_data_size', 'N/A')} bytes\n" "---" ) except Exception as e: return f"Error formatting asset data: {str(e)}" @server.call_tool() async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """ Handle tool execution requests. Tools can read and analyze binary files. """ if not arguments: return [types.TextContent(type="text", text="Missing arguments for the request")] if name == "read-unreal-asset": file_path = arguments.get("file_path") if not file_path: return [types.TextContent(type="text", text="Missing file_path parameter")] try: with UnrealAssetReader(file_path) as reader: data = { "header": reader.read_header(), "metadata": reader.read_metadata(), } formatted_data = format_unreal_asset_data(data) return [types.TextContent(type="text", text=formatted_data)] except FileNotFoundError: return [types.TextContent(type="text", text=f"Error: File not found - {file_path}")] except ValueError as e: return [types.TextContent(type="text", text=f"Error: {str(e)}")] except Exception as e: return [types.TextContent(type="text", text=f"Unexpected error: {str(e)}")] elif name == "read-binary-metadata": file_path = arguments.get("file_path") if not file_path: return [types.TextContent(type="text", text="Missing file_path parameter")] format_type = arguments.get("format", "auto") try: # Auto-detect format or use specified format if format_type == "unreal" or (format_type == "auto" and file_path.endswith(".uasset")): reader_class = UnrealAssetReader else: reader_class = BinaryReader with reader_class(file_path) as reader: metadata = { "file_size": os.path.getsize(file_path), "header": reader.read_header() if hasattr(reader, "read_header") else None, "metadata": reader.read_metadata() if hasattr(reader, "read_metadata") else None } formatted_text = ( f"Binary File Analysis:\n\n" f"File Path: {file_path}\n" f"File Size: {metadata['file_size']} bytes\n" f"Format: {format_type}\n\n" ) if metadata["header"]: formatted_text += f"Header Information:\n{metadata['header']}\n\n" if metadata["metadata"]: formatted_text += f"Metadata Information:\n{metadata['metadata']}" return [types.TextContent(type="text", text=formatted_text)] except FileNotFoundError: return [types.TextContent(type="text", text=f"Error: File not found - {file_path}")] except Exception as e: return [types.TextContent(type="text", text=f"Error analyzing file: {str(e)}")] else: return [types.TextContent(type="text", text=f"Unknown tool: {name}")] async def main(): async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="binary_reader", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main()) ```