# Directory Structure ``` ├── main.py ├── README.md ├── requirements.txt ├── src │ ├── __init__.py │ ├── api │ │ ├── __init__.py │ │ ├── routes.py │ │ └── schemas.py │ ├── binary_reader │ │ ├── __init__.py │ │ ├── base_reader.py │ │ ├── unreal_reader.py │ │ └── utils.py │ └── config.py └── tests ├── __init__.py ├── test_api.py └── test_binary_reader.py ``` # Files -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # Binary Reader MCP 2 | 3 | A Model Context Protocol server for reading and analyzing binary files. This server provides tools for reading and analyzing various binary file formats, with initial support for Unreal Engine asset files (.uasset). 4 | 5 | ## Features 6 | 7 | - Read and analyze Unreal Engine .uasset files 8 | - Extract binary file metadata and structure 9 | - Auto-detect file formats 10 | - Extensible architecture for adding new binary format support 11 | 12 | ## Installation 13 | 14 | 1. Clone the repository: 15 | ```bash 16 | git clone https://github.com/berlinbra/binary-reader-mcp.git 17 | cd binary-reader-mcp 18 | ``` 19 | 20 | 2. Create a virtual environment and activate it: 21 | ```bash 22 | python -m venv venv 23 | source venv/bin/activate # On Windows: venv\Scripts\activate 24 | ``` 25 | 26 | 3. Install dependencies: 27 | ```bash 28 | pip install -r requirements.txt 29 | ``` 30 | 31 | ## Usage 32 | 33 | The server provides several tools through the Model Context Protocol: 34 | 35 | ### 1. Read Unreal Asset Files 36 | 37 | ```python 38 | # Example usage through MCP 39 | tool: read-unreal-asset 40 | arguments: 41 | file_path: "path/to/your/asset.uasset" 42 | ``` 43 | 44 | ### 2. Read Generic Binary Files 45 | 46 | ```python 47 | # Example usage through MCP 48 | tool: read-binary-metadata 49 | arguments: 50 | file_path: "path/to/your/file.bin" 51 | format: "auto" # or "unreal", "custom" 52 | ``` 53 | 54 | ## Development 55 | 56 | ### Project Structure 57 | 58 | ``` 59 | binary-reader-mcp/ 60 | ├── README.md 61 | ├── requirements.txt 62 | ├── main.py 63 | ├── src/ 64 | │ ├── __init__.py 65 | │ ├── binary_reader/ 66 | │ │ ├── __init__.py 67 | │ │ ├── base_reader.py 68 | │ │ ├── unreal_reader.py 69 | │ │ └── utils.py 70 | │ ├── api/ 71 | │ │ ├── __init__.py 72 | │ │ ├── routes.py 73 | │ │ └── schemas.py 74 | │ └── config.py 75 | └── tests/ 76 | ├── __init__.py 77 | ├── test_binary_reader.py 78 | └── test_api.py 79 | ``` 80 | 81 | ### Adding New Binary Format Support 82 | 83 | To add support for a new binary format: 84 | 85 | 1. Create a new reader class that inherits from `BinaryReader` 86 | 2. Implement the required methods (`read_header`, `read_metadata`) 87 | 3. Add the new format to the format auto-detection logic 88 | 4. Update the tools list to include the new format 89 | 90 | ## Contributing 91 | 92 | 1. Fork the repository 93 | 2. Create your feature branch (`git checkout -b feature/amazing-feature`) 94 | 3. Commit your changes (`git commit -m 'Add some amazing feature'`) 95 | 4. Push to the branch (`git push origin feature/amazing-feature`) 96 | 5. Open a Pull Request 97 | 98 | ## License 99 | 100 | This project is licensed under the MIT License - see the LICENSE file for details. ``` -------------------------------------------------------------------------------- /src/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /src/api/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /tests/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` 1 | mcp-core>=0.1.0 2 | fastapi>=0.68.0 3 | uvicorn>=0.15.0 4 | httpx>=0.24.0 5 | python-dotenv>=0.19.0 6 | structlog>=21.1.0 7 | typing-extensions>=4.0.0 ``` -------------------------------------------------------------------------------- /src/binary_reader/__init__.py: -------------------------------------------------------------------------------- ```python 1 | from .base_reader import BinaryReader 2 | from .unreal_reader import UnrealAssetReader 3 | 4 | __all__ = ['BinaryReader', 'UnrealAssetReader'] ``` -------------------------------------------------------------------------------- /src/api/schemas.py: -------------------------------------------------------------------------------- ```python 1 | from pydantic import BaseModel 2 | from typing import Dict, Any, Optional 3 | 4 | class BinaryAnalysisRequest(BaseModel): 5 | file_path: str 6 | format: Optional[str] = "auto" 7 | 8 | class BinaryAnalysisResponse(BaseModel): 9 | file_path: str 10 | header: Dict[str, Any] 11 | metadata: Dict[str, Any] 12 | format: str ``` -------------------------------------------------------------------------------- /src/config.py: -------------------------------------------------------------------------------- ```python 1 | import os 2 | from typing import Dict, Any 3 | 4 | class Config: 5 | DEBUG: bool = os.getenv("DEBUG", "False").lower() == "true" 6 | MAX_FILE_SIZE: int = 1024 * 1024 * 100 # 100MB 7 | 8 | SUPPORTED_FORMATS: Dict[str, Any] = { 9 | "unreal": { 10 | "extensions": [".uasset"], 11 | "magic": bytes.fromhex('C1832A9E') 12 | } 13 | } ``` -------------------------------------------------------------------------------- /src/binary_reader/utils.py: -------------------------------------------------------------------------------- ```python 1 | from typing import BinaryIO, Dict, Any 2 | import os 3 | 4 | def get_file_size(file_path: str) -> int: 5 | """Get the size of a file in bytes.""" 6 | return os.path.getsize(file_path) 7 | 8 | def read_file_header(file: BinaryIO, size: int = 16) -> bytes: 9 | """Read the first n bytes of a file.""" 10 | current_pos = file.tell() 11 | file.seek(0) 12 | header = file.read(size) 13 | file.seek(current_pos) 14 | return header 15 | 16 | def detect_file_format(header: bytes) -> str: 17 | """Detect file format based on magic numbers/headers.""" 18 | if len(header) < 4: 19 | return "unknown" 20 | 21 | # Unreal Engine .uasset 22 | if header[:4] == bytes.fromhex('C1832A9E'): 23 | return "unreal" 24 | 25 | # Add more format detection here 26 | 27 | return "unknown" ``` -------------------------------------------------------------------------------- /src/api/routes.py: -------------------------------------------------------------------------------- ```python 1 | from fastapi import APIRouter, HTTPException 2 | from .schemas import BinaryAnalysisRequest, BinaryAnalysisResponse 3 | from ..binary_reader import UnrealAssetReader 4 | 5 | router = APIRouter() 6 | 7 | @router.post("/analyze", response_model=BinaryAnalysisResponse) 8 | async def analyze_binary_file(request: BinaryAnalysisRequest): 9 | try: 10 | with UnrealAssetReader(request.file_path) as reader: 11 | header = reader.read_header() 12 | metadata = reader.read_metadata() 13 | 14 | return BinaryAnalysisResponse( 15 | file_path=request.file_path, 16 | header=header, 17 | metadata=metadata, 18 | format="unreal" 19 | ) 20 | except Exception as e: 21 | raise HTTPException(status_code=400, detail=str(e)) ``` -------------------------------------------------------------------------------- /tests/test_api.py: -------------------------------------------------------------------------------- ```python 1 | import pytest 2 | from fastapi.testclient import TestClient 3 | from src.api.routes import router 4 | from fastapi import FastAPI 5 | 6 | app = FastAPI() 7 | app.include_router(router) 8 | 9 | @pytest.fixture 10 | def client(): 11 | return TestClient(app) 12 | 13 | def test_analyze_endpoint_missing_file(client): 14 | response = client.post("/analyze", json={"file_path": "nonexistent.uasset"}) 15 | assert response.status_code == 400 16 | assert "File not found" in response.json()["detail"] 17 | 18 | def test_analyze_endpoint_invalid_request(client): 19 | response = client.post("/analyze", json={}) 20 | assert response.status_code == 422 # Validation error 21 | 22 | def test_analyze_endpoint_with_mock_file(client, tmp_path): 23 | # Create a mock .uasset file 24 | file_path = tmp_path / "test.uasset" 25 | with open(file_path, "wb") as f: 26 | f.write(bytes.fromhex('C1832A9E')) 27 | f.write(b"\x00" * 28) # Padding for header and metadata 28 | 29 | response = client.post("/analyze", json={"file_path": str(file_path)}) 30 | assert response.status_code == 200 31 | data = response.json() 32 | assert data["format"] == "unreal" 33 | assert "header" in data 34 | assert "metadata" in data ``` -------------------------------------------------------------------------------- /src/binary_reader/unreal_reader.py: -------------------------------------------------------------------------------- ```python 1 | from typing import Dict, Any 2 | from .base_reader import BinaryReader 3 | 4 | class UnrealAssetReader(BinaryReader): 5 | MAGIC = 0xC1832A9E 6 | 7 | def read_header(self) -> Dict[str, Any]: 8 | magic = self.read_uint32() 9 | if magic != self.MAGIC: 10 | raise ValueError("Not a valid .uasset file") 11 | 12 | return { 13 | 'magic': hex(magic), 14 | 'legacy_version': self.read_uint32(), 15 | 'legacy_ue3_version': self.read_uint32(), 16 | 'file_version_ue4': self.read_uint32(), 17 | 'file_size': self.read_uint32() 18 | } 19 | 20 | def read_metadata(self) -> Dict[str, Any]: 21 | # Read bulk data flags 22 | flags = self.read_uint32() 23 | 24 | # Read element count 25 | element_count = self.read_uint32() 26 | 27 | # Read bulk data size 28 | bulk_data_size = self.read_uint32() 29 | 30 | return { 31 | 'flags': flags, 32 | 'element_count': element_count, 33 | 'bulk_data_size': bulk_data_size 34 | } 35 | 36 | def read_name_table(self) -> list[str]: 37 | names_count = self.read_int32() 38 | names = [] 39 | 40 | for _ in range(names_count): 41 | name = self.read_string() 42 | names.append(name) 43 | 44 | return names ``` -------------------------------------------------------------------------------- /src/binary_reader/base_reader.py: -------------------------------------------------------------------------------- ```python 1 | from abc import ABC, abstractmethod 2 | from typing import BinaryIO, Dict, Any 3 | import struct 4 | 5 | class BinaryReader(ABC): 6 | def __init__(self, file_path: str): 7 | self.file_path = file_path 8 | self._file: BinaryIO | None = None 9 | self._position = 0 10 | 11 | def __enter__(self): 12 | self._file = open(self.file_path, 'rb') 13 | return self 14 | 15 | def __exit__(self, exc_type, exc_val, exc_tb): 16 | if self._file: 17 | self._file.close() 18 | 19 | def read_bytes(self, size: int) -> bytes: 20 | if not self._file: 21 | raise ValueError("File is not open") 22 | data = self._file.read(size) 23 | self._position += len(data) 24 | return data 25 | 26 | def read_uint32(self) -> int: 27 | return struct.unpack('<I', self.read_bytes(4))[0] 28 | 29 | def read_int32(self) -> int: 30 | return struct.unpack('<i', self.read_bytes(4))[0] 31 | 32 | def read_float(self) -> float: 33 | return struct.unpack('<f', self.read_bytes(4))[0] 34 | 35 | def read_string(self, encoding='utf-8') -> str: 36 | length = self.read_int32() 37 | if length == 0: 38 | return "" 39 | elif length < 0: 40 | length = abs(length) 41 | data = self.read_bytes(length * 2) 42 | return data.decode('utf-16-le').rstrip('\0') 43 | else: 44 | data = self.read_bytes(length) 45 | return data.decode(encoding).rstrip('\0') 46 | 47 | def seek(self, position: int) -> None: 48 | if not self._file: 49 | raise ValueError("File is not open") 50 | self._file.seek(position) 51 | self._position = position 52 | 53 | @property 54 | def position(self) -> int: 55 | return self._position 56 | 57 | @abstractmethod 58 | def read_header(self) -> Dict[str, Any]: 59 | pass 60 | 61 | @abstractmethod 62 | def read_metadata(self) -> Dict[str, Any]: 63 | pass ``` -------------------------------------------------------------------------------- /tests/test_binary_reader.py: -------------------------------------------------------------------------------- ```python 1 | import pytest 2 | import os 3 | from src.binary_reader import UnrealAssetReader, BinaryReader 4 | 5 | @pytest.fixture 6 | def test_unreal_asset_file(tmp_path): 7 | # Create a mock .uasset file for testing 8 | file_path = tmp_path / "test.uasset" 9 | with open(file_path, "wb") as f: 10 | # Write magic number 11 | f.write(bytes.fromhex('C1832A9E')) 12 | # Write mock version numbers 13 | f.write(int(1).to_bytes(4, 'little')) # legacy_version 14 | f.write(int(2).to_bytes(4, 'little')) # legacy_ue3_version 15 | f.write(int(3).to_bytes(4, 'little')) # file_version_ue4 16 | f.write(int(100).to_bytes(4, 'little')) # file_size 17 | # Write mock metadata 18 | f.write(int(1).to_bytes(4, 'little')) # flags 19 | f.write(int(10).to_bytes(4, 'little')) # element_count 20 | f.write(int(1000).to_bytes(4, 'little')) # bulk_data_size 21 | return file_path 22 | 23 | def test_unreal_asset_reader_header(test_unreal_asset_file): 24 | with UnrealAssetReader(test_unreal_asset_file) as reader: 25 | header = reader.read_header() 26 | assert header['magic'] == '0xc1832a9e' 27 | assert header['legacy_version'] == 1 28 | assert header['legacy_ue3_version'] == 2 29 | assert header['file_version_ue4'] == 3 30 | assert header['file_size'] == 100 31 | 32 | def test_unreal_asset_reader_metadata(test_unreal_asset_file): 33 | with UnrealAssetReader(test_unreal_asset_file) as reader: 34 | # Skip header 35 | reader.read_header() 36 | metadata = reader.read_metadata() 37 | assert metadata['flags'] == 1 38 | assert metadata['element_count'] == 10 39 | assert metadata['bulk_data_size'] == 1000 40 | 41 | def test_invalid_unreal_asset_file(tmp_path): 42 | file_path = tmp_path / "invalid.uasset" 43 | with open(file_path, "wb") as f: 44 | f.write(b"invalid data") 45 | 46 | with pytest.raises(ValueError, match="Not a valid .uasset file"): 47 | with UnrealAssetReader(file_path) as reader: 48 | reader.read_header() ``` -------------------------------------------------------------------------------- /main.py: -------------------------------------------------------------------------------- ```python 1 | from typing import Any 2 | import asyncio 3 | from mcp.server.models import InitializationOptions 4 | import mcp.types as types 5 | from mcp.server import NotificationOptions, Server 6 | import mcp.server.stdio 7 | import os 8 | from src.binary_reader.unreal_reader import UnrealAssetReader 9 | from src.binary_reader.base_reader import BinaryReader 10 | 11 | server = Server("binary_reader") 12 | 13 | @server.list_tools() 14 | async def handle_list_tools() -> list[types.Tool]: 15 | """ 16 | List available tools for binary file analysis. 17 | Each tool specifies its arguments using JSON Schema validation. 18 | """ 19 | return [ 20 | types.Tool( 21 | name="read-unreal-asset", 22 | description="Read and analyze Unreal Engine .uasset file structure", 23 | inputSchema={ 24 | "type": "object", 25 | "properties": { 26 | "file_path": { 27 | "type": "string", 28 | "description": "Path to the .uasset file", 29 | }, 30 | }, 31 | "required": ["file_path"], 32 | }, 33 | ), 34 | types.Tool( 35 | name="read-binary-metadata", 36 | description="Read generic binary file metadata and structure", 37 | inputSchema={ 38 | "type": "object", 39 | "properties": { 40 | "file_path": { 41 | "type": "string", 42 | "description": "Path to the binary file", 43 | }, 44 | "format": { 45 | "type": "string", 46 | "description": "File format specification (if known)", 47 | "enum": ["auto", "unreal", "custom"], 48 | "default": "auto" 49 | } 50 | }, 51 | "required": ["file_path"], 52 | }, 53 | ) 54 | ] 55 | 56 | def format_unreal_asset_data(data: dict) -> str: 57 | """Format Unreal asset data into a readable string.""" 58 | try: 59 | return ( 60 | f"Unreal Asset Analysis:\n\n" 61 | f"Header Information:\n" 62 | f"Magic: {data.get('header', {}).get('magic', 'N/A')}\n" 63 | f"Legacy Version: {data.get('header', {}).get('legacy_version', 'N/A')}\n" 64 | f"UE4 Version: {data.get('header', {}).get('file_version_ue4', 'N/A')}\n" 65 | f"File Size: {data.get('header', {}).get('file_size', 'N/A')} bytes\n\n" 66 | f"Metadata:\n" 67 | f"Flags: {data.get('metadata', {}).get('flags', 'N/A')}\n" 68 | f"Element Count: {data.get('metadata', {}).get('element_count', 'N/A')}\n" 69 | f"Bulk Data Size: {data.get('metadata', {}).get('bulk_data_size', 'N/A')} bytes\n" 70 | "---" 71 | ) 72 | except Exception as e: 73 | return f"Error formatting asset data: {str(e)}" 74 | 75 | @server.call_tool() 76 | async def handle_call_tool( 77 | name: str, arguments: dict | None 78 | ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: 79 | """ 80 | Handle tool execution requests. 81 | Tools can read and analyze binary files. 82 | """ 83 | if not arguments: 84 | return [types.TextContent(type="text", text="Missing arguments for the request")] 85 | 86 | if name == "read-unreal-asset": 87 | file_path = arguments.get("file_path") 88 | if not file_path: 89 | return [types.TextContent(type="text", text="Missing file_path parameter")] 90 | 91 | try: 92 | with UnrealAssetReader(file_path) as reader: 93 | data = { 94 | "header": reader.read_header(), 95 | "metadata": reader.read_metadata(), 96 | } 97 | 98 | formatted_data = format_unreal_asset_data(data) 99 | return [types.TextContent(type="text", text=formatted_data)] 100 | 101 | except FileNotFoundError: 102 | return [types.TextContent(type="text", text=f"Error: File not found - {file_path}")] 103 | except ValueError as e: 104 | return [types.TextContent(type="text", text=f"Error: {str(e)}")] 105 | except Exception as e: 106 | return [types.TextContent(type="text", text=f"Unexpected error: {str(e)}")] 107 | 108 | elif name == "read-binary-metadata": 109 | file_path = arguments.get("file_path") 110 | if not file_path: 111 | return [types.TextContent(type="text", text="Missing file_path parameter")] 112 | 113 | format_type = arguments.get("format", "auto") 114 | 115 | try: 116 | # Auto-detect format or use specified format 117 | if format_type == "unreal" or (format_type == "auto" and file_path.endswith(".uasset")): 118 | reader_class = UnrealAssetReader 119 | else: 120 | reader_class = BinaryReader 121 | 122 | with reader_class(file_path) as reader: 123 | metadata = { 124 | "file_size": os.path.getsize(file_path), 125 | "header": reader.read_header() if hasattr(reader, "read_header") else None, 126 | "metadata": reader.read_metadata() if hasattr(reader, "read_metadata") else None 127 | } 128 | 129 | formatted_text = ( 130 | f"Binary File Analysis:\n\n" 131 | f"File Path: {file_path}\n" 132 | f"File Size: {metadata['file_size']} bytes\n" 133 | f"Format: {format_type}\n\n" 134 | ) 135 | 136 | if metadata["header"]: 137 | formatted_text += f"Header Information:\n{metadata['header']}\n\n" 138 | if metadata["metadata"]: 139 | formatted_text += f"Metadata Information:\n{metadata['metadata']}" 140 | 141 | return [types.TextContent(type="text", text=formatted_text)] 142 | 143 | except FileNotFoundError: 144 | return [types.TextContent(type="text", text=f"Error: File not found - {file_path}")] 145 | except Exception as e: 146 | return [types.TextContent(type="text", text=f"Error analyzing file: {str(e)}")] 147 | else: 148 | return [types.TextContent(type="text", text=f"Unknown tool: {name}")] 149 | 150 | async def main(): 151 | async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): 152 | await server.run( 153 | read_stream, 154 | write_stream, 155 | InitializationOptions( 156 | server_name="binary_reader", 157 | server_version="0.1.0", 158 | capabilities=server.get_capabilities( 159 | notification_options=NotificationOptions(), 160 | experimental_capabilities={}, 161 | ), 162 | ), 163 | ) 164 | 165 | if __name__ == "__main__": 166 | asyncio.run(main()) ```