#
tokens: 5834/50000 15/15 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── main.py
├── README.md
├── requirements.txt
├── src
│   ├── __init__.py
│   ├── api
│   │   ├── __init__.py
│   │   ├── routes.py
│   │   └── schemas.py
│   ├── binary_reader
│   │   ├── __init__.py
│   │   ├── base_reader.py
│   │   ├── unreal_reader.py
│   │   └── utils.py
│   └── config.py
└── tests
    ├── __init__.py
    ├── test_api.py
    └── test_binary_reader.py
```

# Files

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | # Binary Reader MCP
  2 | 
  3 | A Model Context Protocol server for reading and analyzing binary files. This server provides tools for reading and analyzing various binary file formats, with initial support for Unreal Engine asset files (.uasset).
  4 | 
  5 | ## Features
  6 | 
  7 | - Read and analyze Unreal Engine .uasset files
  8 | - Extract binary file metadata and structure
  9 | - Auto-detect file formats
 10 | - Extensible architecture for adding new binary format support
 11 | 
 12 | ## Installation
 13 | 
 14 | 1. Clone the repository:
 15 | ```bash
 16 | git clone https://github.com/berlinbra/binary-reader-mcp.git
 17 | cd binary-reader-mcp
 18 | ```
 19 | 
 20 | 2. Create a virtual environment and activate it:
 21 | ```bash
 22 | python -m venv venv
 23 | source venv/bin/activate  # On Windows: venv\Scripts\activate
 24 | ```
 25 | 
 26 | 3. Install dependencies:
 27 | ```bash
 28 | pip install -r requirements.txt
 29 | ```
 30 | 
 31 | ## Usage
 32 | 
 33 | The server provides several tools through the Model Context Protocol:
 34 | 
 35 | ### 1. Read Unreal Asset Files
 36 | 
 37 | ```python
 38 | # Example usage through MCP
 39 | tool: read-unreal-asset
 40 | arguments:
 41 |     file_path: "path/to/your/asset.uasset"
 42 | ```
 43 | 
 44 | ### 2. Read Generic Binary Files
 45 | 
 46 | ```python
 47 | # Example usage through MCP
 48 | tool: read-binary-metadata
 49 | arguments:
 50 |     file_path: "path/to/your/file.bin"
 51 |     format: "auto"  # or "unreal", "custom"
 52 | ```
 53 | 
 54 | ## Development
 55 | 
 56 | ### Project Structure
 57 | 
 58 | ```
 59 | binary-reader-mcp/
 60 | ├── README.md
 61 | ├── requirements.txt
 62 | ├── main.py
 63 | ├── src/
 64 | │   ├── __init__.py
 65 | │   ├── binary_reader/
 66 | │   │   ├── __init__.py
 67 | │   │   ├── base_reader.py
 68 | │   │   ├── unreal_reader.py
 69 | │   │   └── utils.py
 70 | │   ├── api/
 71 | │   │   ├── __init__.py
 72 | │   │   ├── routes.py
 73 | │   │   └── schemas.py
 74 | │   └── config.py
 75 | └── tests/
 76 |     ├── __init__.py
 77 |     ├── test_binary_reader.py
 78 |     └── test_api.py
 79 | ```
 80 | 
 81 | ### Adding New Binary Format Support
 82 | 
 83 | To add support for a new binary format:
 84 | 
 85 | 1. Create a new reader class that inherits from `BinaryReader`
 86 | 2. Implement the required methods (`read_header`, `read_metadata`)
 87 | 3. Add the new format to the format auto-detection logic
 88 | 4. Update the tools list to include the new format
 89 | 
 90 | ## Contributing
 91 | 
 92 | 1. Fork the repository
 93 | 2. Create your feature branch (`git checkout -b feature/amazing-feature`)
 94 | 3. Commit your changes (`git commit -m 'Add some amazing feature'`)
 95 | 4. Push to the branch (`git push origin feature/amazing-feature`)
 96 | 5. Open a Pull Request
 97 | 
 98 | ## License
 99 | 
100 | This project is licensed under the MIT License - see the LICENSE file for details.
```

--------------------------------------------------------------------------------
/src/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/src/api/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
1 | mcp-core>=0.1.0
2 | fastapi>=0.68.0
3 | uvicorn>=0.15.0
4 | httpx>=0.24.0
5 | python-dotenv>=0.19.0
6 | structlog>=21.1.0
7 | typing-extensions>=4.0.0
```

--------------------------------------------------------------------------------
/src/binary_reader/__init__.py:
--------------------------------------------------------------------------------

```python
1 | from .base_reader import BinaryReader
2 | from .unreal_reader import UnrealAssetReader
3 | 
4 | __all__ = ['BinaryReader', 'UnrealAssetReader']
```

--------------------------------------------------------------------------------
/src/api/schemas.py:
--------------------------------------------------------------------------------

```python
 1 | from pydantic import BaseModel
 2 | from typing import Dict, Any, Optional
 3 | 
 4 | class BinaryAnalysisRequest(BaseModel):
 5 |     file_path: str
 6 |     format: Optional[str] = "auto"
 7 | 
 8 | class BinaryAnalysisResponse(BaseModel):
 9 |     file_path: str
10 |     header: Dict[str, Any]
11 |     metadata: Dict[str, Any]
12 |     format: str
```

--------------------------------------------------------------------------------
/src/config.py:
--------------------------------------------------------------------------------

```python
 1 | import os
 2 | from typing import Dict, Any
 3 | 
 4 | class Config:
 5 |     DEBUG: bool = os.getenv("DEBUG", "False").lower() == "true"
 6 |     MAX_FILE_SIZE: int = 1024 * 1024 * 100  # 100MB
 7 |     
 8 |     SUPPORTED_FORMATS: Dict[str, Any] = {
 9 |         "unreal": {
10 |             "extensions": [".uasset"],
11 |             "magic": bytes.fromhex('C1832A9E')
12 |         }
13 |     }
```

--------------------------------------------------------------------------------
/src/binary_reader/utils.py:
--------------------------------------------------------------------------------

```python
 1 | from typing import BinaryIO, Dict, Any
 2 | import os
 3 | 
 4 | def get_file_size(file_path: str) -> int:
 5 |     """Get the size of a file in bytes."""
 6 |     return os.path.getsize(file_path)
 7 | 
 8 | def read_file_header(file: BinaryIO, size: int = 16) -> bytes:
 9 |     """Read the first n bytes of a file."""
10 |     current_pos = file.tell()
11 |     file.seek(0)
12 |     header = file.read(size)
13 |     file.seek(current_pos)
14 |     return header
15 | 
16 | def detect_file_format(header: bytes) -> str:
17 |     """Detect file format based on magic numbers/headers."""
18 |     if len(header) < 4:
19 |         return "unknown"
20 |         
21 |     # Unreal Engine .uasset
22 |     if header[:4] == bytes.fromhex('C1832A9E'):
23 |         return "unreal"
24 |         
25 |     # Add more format detection here
26 |     
27 |     return "unknown"
```

--------------------------------------------------------------------------------
/src/api/routes.py:
--------------------------------------------------------------------------------

```python
 1 | from fastapi import APIRouter, HTTPException
 2 | from .schemas import BinaryAnalysisRequest, BinaryAnalysisResponse
 3 | from ..binary_reader import UnrealAssetReader
 4 | 
 5 | router = APIRouter()
 6 | 
 7 | @router.post("/analyze", response_model=BinaryAnalysisResponse)
 8 | async def analyze_binary_file(request: BinaryAnalysisRequest):
 9 |     try:
10 |         with UnrealAssetReader(request.file_path) as reader:
11 |             header = reader.read_header()
12 |             metadata = reader.read_metadata()
13 |             
14 |             return BinaryAnalysisResponse(
15 |                 file_path=request.file_path,
16 |                 header=header,
17 |                 metadata=metadata,
18 |                 format="unreal"
19 |             )
20 |     except Exception as e:
21 |         raise HTTPException(status_code=400, detail=str(e))
```

--------------------------------------------------------------------------------
/tests/test_api.py:
--------------------------------------------------------------------------------

```python
 1 | import pytest
 2 | from fastapi.testclient import TestClient
 3 | from src.api.routes import router
 4 | from fastapi import FastAPI
 5 | 
 6 | app = FastAPI()
 7 | app.include_router(router)
 8 | 
 9 | @pytest.fixture
10 | def client():
11 |     return TestClient(app)
12 | 
13 | def test_analyze_endpoint_missing_file(client):
14 |     response = client.post("/analyze", json={"file_path": "nonexistent.uasset"})
15 |     assert response.status_code == 400
16 |     assert "File not found" in response.json()["detail"]
17 | 
18 | def test_analyze_endpoint_invalid_request(client):
19 |     response = client.post("/analyze", json={})
20 |     assert response.status_code == 422  # Validation error
21 | 
22 | def test_analyze_endpoint_with_mock_file(client, tmp_path):
23 |     # Create a mock .uasset file
24 |     file_path = tmp_path / "test.uasset"
25 |     with open(file_path, "wb") as f:
26 |         f.write(bytes.fromhex('C1832A9E'))
27 |         f.write(b"\x00" * 28)  # Padding for header and metadata
28 |     
29 |     response = client.post("/analyze", json={"file_path": str(file_path)})
30 |     assert response.status_code == 200
31 |     data = response.json()
32 |     assert data["format"] == "unreal"
33 |     assert "header" in data
34 |     assert "metadata" in data
```

--------------------------------------------------------------------------------
/src/binary_reader/unreal_reader.py:
--------------------------------------------------------------------------------

```python
 1 | from typing import Dict, Any
 2 | from .base_reader import BinaryReader
 3 | 
 4 | class UnrealAssetReader(BinaryReader):
 5 |     MAGIC = 0xC1832A9E
 6 | 
 7 |     def read_header(self) -> Dict[str, Any]:
 8 |         magic = self.read_uint32()
 9 |         if magic != self.MAGIC:
10 |             raise ValueError("Not a valid .uasset file")
11 | 
12 |         return {
13 |             'magic': hex(magic),
14 |             'legacy_version': self.read_uint32(),
15 |             'legacy_ue3_version': self.read_uint32(),
16 |             'file_version_ue4': self.read_uint32(),
17 |             'file_size': self.read_uint32()
18 |         }
19 | 
20 |     def read_metadata(self) -> Dict[str, Any]:
21 |         # Read bulk data flags
22 |         flags = self.read_uint32()
23 |         
24 |         # Read element count
25 |         element_count = self.read_uint32()
26 |         
27 |         # Read bulk data size
28 |         bulk_data_size = self.read_uint32()
29 |         
30 |         return {
31 |             'flags': flags,
32 |             'element_count': element_count,
33 |             'bulk_data_size': bulk_data_size
34 |         }
35 | 
36 |     def read_name_table(self) -> list[str]:
37 |         names_count = self.read_int32()
38 |         names = []
39 |         
40 |         for _ in range(names_count):
41 |             name = self.read_string()
42 |             names.append(name)
43 |             
44 |         return names
```

--------------------------------------------------------------------------------
/src/binary_reader/base_reader.py:
--------------------------------------------------------------------------------

```python
 1 | from abc import ABC, abstractmethod
 2 | from typing import BinaryIO, Dict, Any
 3 | import struct
 4 | 
 5 | class BinaryReader(ABC):
 6 |     def __init__(self, file_path: str):
 7 |         self.file_path = file_path
 8 |         self._file: BinaryIO | None = None
 9 |         self._position = 0
10 | 
11 |     def __enter__(self):
12 |         self._file = open(self.file_path, 'rb')
13 |         return self
14 |         
15 |     def __exit__(self, exc_type, exc_val, exc_tb):
16 |         if self._file:
17 |             self._file.close()
18 |     
19 |     def read_bytes(self, size: int) -> bytes:
20 |         if not self._file:
21 |             raise ValueError("File is not open")
22 |         data = self._file.read(size)
23 |         self._position += len(data)
24 |         return data
25 | 
26 |     def read_uint32(self) -> int:
27 |         return struct.unpack('<I', self.read_bytes(4))[0]
28 | 
29 |     def read_int32(self) -> int:
30 |         return struct.unpack('<i', self.read_bytes(4))[0]
31 | 
32 |     def read_float(self) -> float:
33 |         return struct.unpack('<f', self.read_bytes(4))[0]
34 | 
35 |     def read_string(self, encoding='utf-8') -> str:
36 |         length = self.read_int32()
37 |         if length == 0:
38 |             return ""
39 |         elif length < 0:
40 |             length = abs(length)
41 |             data = self.read_bytes(length * 2)
42 |             return data.decode('utf-16-le').rstrip('\0')
43 |         else:
44 |             data = self.read_bytes(length)
45 |             return data.decode(encoding).rstrip('\0')
46 | 
47 |     def seek(self, position: int) -> None:
48 |         if not self._file:
49 |             raise ValueError("File is not open")
50 |         self._file.seek(position)
51 |         self._position = position
52 | 
53 |     @property
54 |     def position(self) -> int:
55 |         return self._position
56 | 
57 |     @abstractmethod
58 |     def read_header(self) -> Dict[str, Any]:
59 |         pass
60 | 
61 |     @abstractmethod
62 |     def read_metadata(self) -> Dict[str, Any]:
63 |         pass
```

--------------------------------------------------------------------------------
/tests/test_binary_reader.py:
--------------------------------------------------------------------------------

```python
 1 | import pytest
 2 | import os
 3 | from src.binary_reader import UnrealAssetReader, BinaryReader
 4 | 
 5 | @pytest.fixture
 6 | def test_unreal_asset_file(tmp_path):
 7 |     # Create a mock .uasset file for testing
 8 |     file_path = tmp_path / "test.uasset"
 9 |     with open(file_path, "wb") as f:
10 |         # Write magic number
11 |         f.write(bytes.fromhex('C1832A9E'))
12 |         # Write mock version numbers
13 |         f.write(int(1).to_bytes(4, 'little'))  # legacy_version
14 |         f.write(int(2).to_bytes(4, 'little'))  # legacy_ue3_version
15 |         f.write(int(3).to_bytes(4, 'little'))  # file_version_ue4
16 |         f.write(int(100).to_bytes(4, 'little'))  # file_size
17 |         # Write mock metadata
18 |         f.write(int(1).to_bytes(4, 'little'))  # flags
19 |         f.write(int(10).to_bytes(4, 'little'))  # element_count
20 |         f.write(int(1000).to_bytes(4, 'little'))  # bulk_data_size
21 |     return file_path
22 | 
23 | def test_unreal_asset_reader_header(test_unreal_asset_file):
24 |     with UnrealAssetReader(test_unreal_asset_file) as reader:
25 |         header = reader.read_header()
26 |         assert header['magic'] == '0xc1832a9e'
27 |         assert header['legacy_version'] == 1
28 |         assert header['legacy_ue3_version'] == 2
29 |         assert header['file_version_ue4'] == 3
30 |         assert header['file_size'] == 100
31 | 
32 | def test_unreal_asset_reader_metadata(test_unreal_asset_file):
33 |     with UnrealAssetReader(test_unreal_asset_file) as reader:
34 |         # Skip header
35 |         reader.read_header()
36 |         metadata = reader.read_metadata()
37 |         assert metadata['flags'] == 1
38 |         assert metadata['element_count'] == 10
39 |         assert metadata['bulk_data_size'] == 1000
40 | 
41 | def test_invalid_unreal_asset_file(tmp_path):
42 |     file_path = tmp_path / "invalid.uasset"
43 |     with open(file_path, "wb") as f:
44 |         f.write(b"invalid data")
45 |     
46 |     with pytest.raises(ValueError, match="Not a valid .uasset file"):
47 |         with UnrealAssetReader(file_path) as reader:
48 |             reader.read_header()
```

--------------------------------------------------------------------------------
/main.py:
--------------------------------------------------------------------------------

```python
  1 | from typing import Any
  2 | import asyncio
  3 | from mcp.server.models import InitializationOptions
  4 | import mcp.types as types
  5 | from mcp.server import NotificationOptions, Server
  6 | import mcp.server.stdio
  7 | import os
  8 | from src.binary_reader.unreal_reader import UnrealAssetReader
  9 | from src.binary_reader.base_reader import BinaryReader
 10 | 
 11 | server = Server("binary_reader")
 12 | 
 13 | @server.list_tools()
 14 | async def handle_list_tools() -> list[types.Tool]:
 15 |     """
 16 |     List available tools for binary file analysis.
 17 |     Each tool specifies its arguments using JSON Schema validation.
 18 |     """
 19 |     return [
 20 |         types.Tool(
 21 |             name="read-unreal-asset",
 22 |             description="Read and analyze Unreal Engine .uasset file structure",
 23 |             inputSchema={
 24 |                 "type": "object",
 25 |                 "properties": {
 26 |                     "file_path": {
 27 |                         "type": "string",
 28 |                         "description": "Path to the .uasset file",
 29 |                     },
 30 |                 },
 31 |                 "required": ["file_path"],
 32 |             },
 33 |         ),
 34 |         types.Tool(
 35 |             name="read-binary-metadata",
 36 |             description="Read generic binary file metadata and structure",
 37 |             inputSchema={
 38 |                 "type": "object",
 39 |                 "properties": {
 40 |                     "file_path": {
 41 |                         "type": "string",
 42 |                         "description": "Path to the binary file",
 43 |                     },
 44 |                     "format": {
 45 |                         "type": "string",
 46 |                         "description": "File format specification (if known)",
 47 |                         "enum": ["auto", "unreal", "custom"],
 48 |                         "default": "auto"
 49 |                     }
 50 |                 },
 51 |                 "required": ["file_path"],
 52 |             },
 53 |         )
 54 |     ]
 55 | 
 56 | def format_unreal_asset_data(data: dict) -> str:
 57 |     """Format Unreal asset data into a readable string."""
 58 |     try:
 59 |         return (
 60 |             f"Unreal Asset Analysis:\n\n"
 61 |             f"Header Information:\n"
 62 |             f"Magic: {data.get('header', {}).get('magic', 'N/A')}\n"
 63 |             f"Legacy Version: {data.get('header', {}).get('legacy_version', 'N/A')}\n"
 64 |             f"UE4 Version: {data.get('header', {}).get('file_version_ue4', 'N/A')}\n"
 65 |             f"File Size: {data.get('header', {}).get('file_size', 'N/A')} bytes\n\n"
 66 |             f"Metadata:\n"
 67 |             f"Flags: {data.get('metadata', {}).get('flags', 'N/A')}\n"
 68 |             f"Element Count: {data.get('metadata', {}).get('element_count', 'N/A')}\n"
 69 |             f"Bulk Data Size: {data.get('metadata', {}).get('bulk_data_size', 'N/A')} bytes\n"
 70 |             "---"
 71 |         )
 72 |     except Exception as e:
 73 |         return f"Error formatting asset data: {str(e)}"
 74 | 
 75 | @server.call_tool()
 76 | async def handle_call_tool(
 77 |     name: str, arguments: dict | None
 78 | ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
 79 |     """
 80 |     Handle tool execution requests.
 81 |     Tools can read and analyze binary files.
 82 |     """
 83 |     if not arguments:
 84 |         return [types.TextContent(type="text", text="Missing arguments for the request")]
 85 |     
 86 |     if name == "read-unreal-asset":
 87 |         file_path = arguments.get("file_path")
 88 |         if not file_path:
 89 |             return [types.TextContent(type="text", text="Missing file_path parameter")]
 90 | 
 91 |         try:
 92 |             with UnrealAssetReader(file_path) as reader:
 93 |                 data = {
 94 |                     "header": reader.read_header(),
 95 |                     "metadata": reader.read_metadata(),
 96 |                 }
 97 |                 
 98 |                 formatted_data = format_unreal_asset_data(data)
 99 |                 return [types.TextContent(type="text", text=formatted_data)]
100 |                 
101 |         except FileNotFoundError:
102 |             return [types.TextContent(type="text", text=f"Error: File not found - {file_path}")]
103 |         except ValueError as e:
104 |             return [types.TextContent(type="text", text=f"Error: {str(e)}")]
105 |         except Exception as e:
106 |             return [types.TextContent(type="text", text=f"Unexpected error: {str(e)}")]
107 | 
108 |     elif name == "read-binary-metadata":
109 |         file_path = arguments.get("file_path")
110 |         if not file_path:
111 |             return [types.TextContent(type="text", text="Missing file_path parameter")]
112 | 
113 |         format_type = arguments.get("format", "auto")
114 |         
115 |         try:
116 |             # Auto-detect format or use specified format
117 |             if format_type == "unreal" or (format_type == "auto" and file_path.endswith(".uasset")):
118 |                 reader_class = UnrealAssetReader
119 |             else:
120 |                 reader_class = BinaryReader
121 | 
122 |             with reader_class(file_path) as reader:
123 |                 metadata = {
124 |                     "file_size": os.path.getsize(file_path),
125 |                     "header": reader.read_header() if hasattr(reader, "read_header") else None,
126 |                     "metadata": reader.read_metadata() if hasattr(reader, "read_metadata") else None
127 |                 }
128 | 
129 |                 formatted_text = (
130 |                     f"Binary File Analysis:\n\n"
131 |                     f"File Path: {file_path}\n"
132 |                     f"File Size: {metadata['file_size']} bytes\n"
133 |                     f"Format: {format_type}\n\n"
134 |                 )
135 | 
136 |                 if metadata["header"]:
137 |                     formatted_text += f"Header Information:\n{metadata['header']}\n\n"
138 |                 if metadata["metadata"]:
139 |                     formatted_text += f"Metadata Information:\n{metadata['metadata']}"
140 | 
141 |                 return [types.TextContent(type="text", text=formatted_text)]
142 | 
143 |         except FileNotFoundError:
144 |             return [types.TextContent(type="text", text=f"Error: File not found - {file_path}")]
145 |         except Exception as e:
146 |             return [types.TextContent(type="text", text=f"Error analyzing file: {str(e)}")]
147 |     else:
148 |         return [types.TextContent(type="text", text=f"Unknown tool: {name}")]
149 | 
150 | async def main():
151 |     async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
152 |         await server.run(
153 |             read_stream,
154 |             write_stream,
155 |             InitializationOptions(
156 |                 server_name="binary_reader",
157 |                 server_version="0.1.0",
158 |                 capabilities=server.get_capabilities(
159 |                     notification_options=NotificationOptions(),
160 |                     experimental_capabilities={},
161 |                 ),
162 |             ),
163 |         )
164 | 
165 | if __name__ == "__main__":
166 |     asyncio.run(main())
```