#
tokens: 4388/50000 15/15 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── main.py
├── README.md
├── requirements.txt
├── src
│   ├── __init__.py
│   ├── api
│   │   ├── __init__.py
│   │   ├── routes.py
│   │   └── schemas.py
│   ├── binary_reader
│   │   ├── __init__.py
│   │   ├── base_reader.py
│   │   ├── unreal_reader.py
│   │   └── utils.py
│   └── config.py
└── tests
    ├── __init__.py
    ├── test_api.py
    └── test_binary_reader.py
```

# Files

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Binary Reader MCP

A Model Context Protocol server for reading and analyzing binary files. This server provides tools for reading and analyzing various binary file formats, with initial support for Unreal Engine asset files (.uasset).

## Features

- Read and analyze Unreal Engine .uasset files
- Extract binary file metadata and structure
- Auto-detect file formats
- Extensible architecture for adding new binary format support

## Installation

1. Clone the repository:
```bash
git clone https://github.com/berlinbra/binary-reader-mcp.git
cd binary-reader-mcp
```

2. Create a virtual environment and activate it:
```bash
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
```

3. Install dependencies:
```bash
pip install -r requirements.txt
```

## Usage

The server provides several tools through the Model Context Protocol:

### 1. Read Unreal Asset Files

```python
# Example usage through MCP
tool: read-unreal-asset
arguments:
    file_path: "path/to/your/asset.uasset"
```

### 2. Read Generic Binary Files

```python
# Example usage through MCP
tool: read-binary-metadata
arguments:
    file_path: "path/to/your/file.bin"
    format: "auto"  # or "unreal", "custom"
```

## Development

### Project Structure

```
binary-reader-mcp/
├── README.md
├── requirements.txt
├── main.py
├── src/
│   ├── __init__.py
│   ├── binary_reader/
│   │   ├── __init__.py
│   │   ├── base_reader.py
│   │   ├── unreal_reader.py
│   │   └── utils.py
│   ├── api/
│   │   ├── __init__.py
│   │   ├── routes.py
│   │   └── schemas.py
│   └── config.py
└── tests/
    ├── __init__.py
    ├── test_binary_reader.py
    └── test_api.py
```

### Adding New Binary Format Support

To add support for a new binary format:

1. Create a new reader class that inherits from `BinaryReader`
2. Implement the required methods (`read_header`, `read_metadata`)
3. Add the new format to the format auto-detection logic
4. Update the tools list to include the new format

## Contributing

1. Fork the repository
2. Create your feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add some amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request

## License

This project is licensed under the MIT License - see the LICENSE file for details.
```

--------------------------------------------------------------------------------
/src/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/api/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
mcp-core>=0.1.0
fastapi>=0.68.0
uvicorn>=0.15.0
httpx>=0.24.0
python-dotenv>=0.19.0
structlog>=21.1.0
typing-extensions>=4.0.0
```

--------------------------------------------------------------------------------
/src/binary_reader/__init__.py:
--------------------------------------------------------------------------------

```python
from .base_reader import BinaryReader
from .unreal_reader import UnrealAssetReader

__all__ = ['BinaryReader', 'UnrealAssetReader']
```

--------------------------------------------------------------------------------
/src/api/schemas.py:
--------------------------------------------------------------------------------

```python
from pydantic import BaseModel
from typing import Dict, Any, Optional

class BinaryAnalysisRequest(BaseModel):
    file_path: str
    format: Optional[str] = "auto"

class BinaryAnalysisResponse(BaseModel):
    file_path: str
    header: Dict[str, Any]
    metadata: Dict[str, Any]
    format: str
```

--------------------------------------------------------------------------------
/src/config.py:
--------------------------------------------------------------------------------

```python
import os
from typing import Dict, Any

class Config:
    DEBUG: bool = os.getenv("DEBUG", "False").lower() == "true"
    MAX_FILE_SIZE: int = 1024 * 1024 * 100  # 100MB
    
    SUPPORTED_FORMATS: Dict[str, Any] = {
        "unreal": {
            "extensions": [".uasset"],
            "magic": bytes.fromhex('C1832A9E')
        }
    }
```

--------------------------------------------------------------------------------
/src/binary_reader/utils.py:
--------------------------------------------------------------------------------

```python
from typing import BinaryIO, Dict, Any
import os

def get_file_size(file_path: str) -> int:
    """Get the size of a file in bytes."""
    return os.path.getsize(file_path)

def read_file_header(file: BinaryIO, size: int = 16) -> bytes:
    """Read the first n bytes of a file."""
    current_pos = file.tell()
    file.seek(0)
    header = file.read(size)
    file.seek(current_pos)
    return header

def detect_file_format(header: bytes) -> str:
    """Detect file format based on magic numbers/headers."""
    if len(header) < 4:
        return "unknown"
        
    # Unreal Engine .uasset
    if header[:4] == bytes.fromhex('C1832A9E'):
        return "unreal"
        
    # Add more format detection here
    
    return "unknown"
```

--------------------------------------------------------------------------------
/src/api/routes.py:
--------------------------------------------------------------------------------

```python
from fastapi import APIRouter, HTTPException
from .schemas import BinaryAnalysisRequest, BinaryAnalysisResponse
from ..binary_reader import UnrealAssetReader

router = APIRouter()

@router.post("/analyze", response_model=BinaryAnalysisResponse)
async def analyze_binary_file(request: BinaryAnalysisRequest):
    try:
        with UnrealAssetReader(request.file_path) as reader:
            header = reader.read_header()
            metadata = reader.read_metadata()
            
            return BinaryAnalysisResponse(
                file_path=request.file_path,
                header=header,
                metadata=metadata,
                format="unreal"
            )
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))
```

--------------------------------------------------------------------------------
/tests/test_api.py:
--------------------------------------------------------------------------------

```python
import pytest
from fastapi.testclient import TestClient
from src.api.routes import router
from fastapi import FastAPI

app = FastAPI()
app.include_router(router)

@pytest.fixture
def client():
    return TestClient(app)

def test_analyze_endpoint_missing_file(client):
    response = client.post("/analyze", json={"file_path": "nonexistent.uasset"})
    assert response.status_code == 400
    assert "File not found" in response.json()["detail"]

def test_analyze_endpoint_invalid_request(client):
    response = client.post("/analyze", json={})
    assert response.status_code == 422  # Validation error

def test_analyze_endpoint_with_mock_file(client, tmp_path):
    # Create a mock .uasset file
    file_path = tmp_path / "test.uasset"
    with open(file_path, "wb") as f:
        f.write(bytes.fromhex('C1832A9E'))
        f.write(b"\x00" * 28)  # Padding for header and metadata
    
    response = client.post("/analyze", json={"file_path": str(file_path)})
    assert response.status_code == 200
    data = response.json()
    assert data["format"] == "unreal"
    assert "header" in data
    assert "metadata" in data
```

--------------------------------------------------------------------------------
/src/binary_reader/unreal_reader.py:
--------------------------------------------------------------------------------

```python
from typing import Dict, Any
from .base_reader import BinaryReader

class UnrealAssetReader(BinaryReader):
    MAGIC = 0xC1832A9E

    def read_header(self) -> Dict[str, Any]:
        magic = self.read_uint32()
        if magic != self.MAGIC:
            raise ValueError("Not a valid .uasset file")

        return {
            'magic': hex(magic),
            'legacy_version': self.read_uint32(),
            'legacy_ue3_version': self.read_uint32(),
            'file_version_ue4': self.read_uint32(),
            'file_size': self.read_uint32()
        }

    def read_metadata(self) -> Dict[str, Any]:
        # Read bulk data flags
        flags = self.read_uint32()
        
        # Read element count
        element_count = self.read_uint32()
        
        # Read bulk data size
        bulk_data_size = self.read_uint32()
        
        return {
            'flags': flags,
            'element_count': element_count,
            'bulk_data_size': bulk_data_size
        }

    def read_name_table(self) -> list[str]:
        names_count = self.read_int32()
        names = []
        
        for _ in range(names_count):
            name = self.read_string()
            names.append(name)
            
        return names
```

--------------------------------------------------------------------------------
/src/binary_reader/base_reader.py:
--------------------------------------------------------------------------------

```python
from abc import ABC, abstractmethod
from typing import BinaryIO, Dict, Any
import struct

class BinaryReader(ABC):
    def __init__(self, file_path: str):
        self.file_path = file_path
        self._file: BinaryIO | None = None
        self._position = 0

    def __enter__(self):
        self._file = open(self.file_path, 'rb')
        return self
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self._file:
            self._file.close()
    
    def read_bytes(self, size: int) -> bytes:
        if not self._file:
            raise ValueError("File is not open")
        data = self._file.read(size)
        self._position += len(data)
        return data

    def read_uint32(self) -> int:
        return struct.unpack('<I', self.read_bytes(4))[0]

    def read_int32(self) -> int:
        return struct.unpack('<i', self.read_bytes(4))[0]

    def read_float(self) -> float:
        return struct.unpack('<f', self.read_bytes(4))[0]

    def read_string(self, encoding='utf-8') -> str:
        length = self.read_int32()
        if length == 0:
            return ""
        elif length < 0:
            length = abs(length)
            data = self.read_bytes(length * 2)
            return data.decode('utf-16-le').rstrip('\0')
        else:
            data = self.read_bytes(length)
            return data.decode(encoding).rstrip('\0')

    def seek(self, position: int) -> None:
        if not self._file:
            raise ValueError("File is not open")
        self._file.seek(position)
        self._position = position

    @property
    def position(self) -> int:
        return self._position

    @abstractmethod
    def read_header(self) -> Dict[str, Any]:
        pass

    @abstractmethod
    def read_metadata(self) -> Dict[str, Any]:
        pass
```

--------------------------------------------------------------------------------
/tests/test_binary_reader.py:
--------------------------------------------------------------------------------

```python
import pytest
import os
from src.binary_reader import UnrealAssetReader, BinaryReader

@pytest.fixture
def test_unreal_asset_file(tmp_path):
    # Create a mock .uasset file for testing
    file_path = tmp_path / "test.uasset"
    with open(file_path, "wb") as f:
        # Write magic number
        f.write(bytes.fromhex('C1832A9E'))
        # Write mock version numbers
        f.write(int(1).to_bytes(4, 'little'))  # legacy_version
        f.write(int(2).to_bytes(4, 'little'))  # legacy_ue3_version
        f.write(int(3).to_bytes(4, 'little'))  # file_version_ue4
        f.write(int(100).to_bytes(4, 'little'))  # file_size
        # Write mock metadata
        f.write(int(1).to_bytes(4, 'little'))  # flags
        f.write(int(10).to_bytes(4, 'little'))  # element_count
        f.write(int(1000).to_bytes(4, 'little'))  # bulk_data_size
    return file_path

def test_unreal_asset_reader_header(test_unreal_asset_file):
    with UnrealAssetReader(test_unreal_asset_file) as reader:
        header = reader.read_header()
        assert header['magic'] == '0xc1832a9e'
        assert header['legacy_version'] == 1
        assert header['legacy_ue3_version'] == 2
        assert header['file_version_ue4'] == 3
        assert header['file_size'] == 100

def test_unreal_asset_reader_metadata(test_unreal_asset_file):
    with UnrealAssetReader(test_unreal_asset_file) as reader:
        # Skip header
        reader.read_header()
        metadata = reader.read_metadata()
        assert metadata['flags'] == 1
        assert metadata['element_count'] == 10
        assert metadata['bulk_data_size'] == 1000

def test_invalid_unreal_asset_file(tmp_path):
    file_path = tmp_path / "invalid.uasset"
    with open(file_path, "wb") as f:
        f.write(b"invalid data")
    
    with pytest.raises(ValueError, match="Not a valid .uasset file"):
        with UnrealAssetReader(file_path) as reader:
            reader.read_header()
```

--------------------------------------------------------------------------------
/main.py:
--------------------------------------------------------------------------------

```python
from typing import Any
import asyncio
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
import mcp.server.stdio
import os
from src.binary_reader.unreal_reader import UnrealAssetReader
from src.binary_reader.base_reader import BinaryReader

server = Server("binary_reader")

@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
    """
    List available tools for binary file analysis.
    Each tool specifies its arguments using JSON Schema validation.
    """
    return [
        types.Tool(
            name="read-unreal-asset",
            description="Read and analyze Unreal Engine .uasset file structure",
            inputSchema={
                "type": "object",
                "properties": {
                    "file_path": {
                        "type": "string",
                        "description": "Path to the .uasset file",
                    },
                },
                "required": ["file_path"],
            },
        ),
        types.Tool(
            name="read-binary-metadata",
            description="Read generic binary file metadata and structure",
            inputSchema={
                "type": "object",
                "properties": {
                    "file_path": {
                        "type": "string",
                        "description": "Path to the binary file",
                    },
                    "format": {
                        "type": "string",
                        "description": "File format specification (if known)",
                        "enum": ["auto", "unreal", "custom"],
                        "default": "auto"
                    }
                },
                "required": ["file_path"],
            },
        )
    ]

def format_unreal_asset_data(data: dict) -> str:
    """Format Unreal asset data into a readable string."""
    try:
        return (
            f"Unreal Asset Analysis:\n\n"
            f"Header Information:\n"
            f"Magic: {data.get('header', {}).get('magic', 'N/A')}\n"
            f"Legacy Version: {data.get('header', {}).get('legacy_version', 'N/A')}\n"
            f"UE4 Version: {data.get('header', {}).get('file_version_ue4', 'N/A')}\n"
            f"File Size: {data.get('header', {}).get('file_size', 'N/A')} bytes\n\n"
            f"Metadata:\n"
            f"Flags: {data.get('metadata', {}).get('flags', 'N/A')}\n"
            f"Element Count: {data.get('metadata', {}).get('element_count', 'N/A')}\n"
            f"Bulk Data Size: {data.get('metadata', {}).get('bulk_data_size', 'N/A')} bytes\n"
            "---"
        )
    except Exception as e:
        return f"Error formatting asset data: {str(e)}"

@server.call_tool()
async def handle_call_tool(
    name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    """
    Handle tool execution requests.
    Tools can read and analyze binary files.
    """
    if not arguments:
        return [types.TextContent(type="text", text="Missing arguments for the request")]
    
    if name == "read-unreal-asset":
        file_path = arguments.get("file_path")
        if not file_path:
            return [types.TextContent(type="text", text="Missing file_path parameter")]

        try:
            with UnrealAssetReader(file_path) as reader:
                data = {
                    "header": reader.read_header(),
                    "metadata": reader.read_metadata(),
                }
                
                formatted_data = format_unreal_asset_data(data)
                return [types.TextContent(type="text", text=formatted_data)]
                
        except FileNotFoundError:
            return [types.TextContent(type="text", text=f"Error: File not found - {file_path}")]
        except ValueError as e:
            return [types.TextContent(type="text", text=f"Error: {str(e)}")]
        except Exception as e:
            return [types.TextContent(type="text", text=f"Unexpected error: {str(e)}")]

    elif name == "read-binary-metadata":
        file_path = arguments.get("file_path")
        if not file_path:
            return [types.TextContent(type="text", text="Missing file_path parameter")]

        format_type = arguments.get("format", "auto")
        
        try:
            # Auto-detect format or use specified format
            if format_type == "unreal" or (format_type == "auto" and file_path.endswith(".uasset")):
                reader_class = UnrealAssetReader
            else:
                reader_class = BinaryReader

            with reader_class(file_path) as reader:
                metadata = {
                    "file_size": os.path.getsize(file_path),
                    "header": reader.read_header() if hasattr(reader, "read_header") else None,
                    "metadata": reader.read_metadata() if hasattr(reader, "read_metadata") else None
                }

                formatted_text = (
                    f"Binary File Analysis:\n\n"
                    f"File Path: {file_path}\n"
                    f"File Size: {metadata['file_size']} bytes\n"
                    f"Format: {format_type}\n\n"
                )

                if metadata["header"]:
                    formatted_text += f"Header Information:\n{metadata['header']}\n\n"
                if metadata["metadata"]:
                    formatted_text += f"Metadata Information:\n{metadata['metadata']}"

                return [types.TextContent(type="text", text=formatted_text)]

        except FileNotFoundError:
            return [types.TextContent(type="text", text=f"Error: File not found - {file_path}")]
        except Exception as e:
            return [types.TextContent(type="text", text=f"Error analyzing file: {str(e)}")]
    else:
        return [types.TextContent(type="text", text=f"Unknown tool: {name}")]

async def main():
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="binary_reader",
                server_version="0.1.0",
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={},
                ),
            ),
        )

if __name__ == "__main__":
    asyncio.run(main())
```