#
tokens: 4835/50000 8/8 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── __init__.py
├── .gitignore
├── claude-desktop-ss.png
├── Dockerfile
├── pyproject.toml
├── README.md
├── requirements.txt
├── smithery.yaml
├── src
│   └── mcp_server_iceberg
│       └── server.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Sensitive files
.env
*.env
!.env.template

# Python
__pycache__/
*.py[cod]
*$py.class
venv/
*.egg-info/

# IDE
.idea/
.vscode/

# Logs
*.log
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# MCP Iceberg Catalog

[![smithery badge](https://smithery.ai/badge/@ahodroj/mcp-iceberg-service)](https://smithery.ai/server/@ahodroj/mcp-iceberg-service)

A MCP (Model Context Protocol) server implementation for interacting with Apache Iceberg. This server provides a SQL interface for querying and managing Iceberg tables through Claude desktop.

## Claude Desktop as your Iceberg Data Lake Catalog
![image](claude-desktop-ss.png)

## How to Install in Claude Desktop

### Installing via Smithery

To install MCP Iceberg Catalog for Claude Desktop automatically via [Smithery](https://smithery.ai/server/@ahodroj/mcp-iceberg-service):

```bash
npx -y @smithery/cli install @ahodroj/mcp-iceberg-service --client claude
```

1. **Prerequisites**
   - Python 3.10 or higher
   - UV package installer (recommended) or pip
   - Access to an Iceberg REST catalog and S3-compatible storage

2. **How to install in Claude Desktop**
Add the following configuration to `claude_desktop_config.json`:

```json
{
  "mcpServers": {
    "iceberg": {
      "command": "uv",
      "args": [
        "--directory",
        "PATH_TO_/mcp-iceberg-service",
        "run",
        "mcp-server-iceberg"
      ],
      "env": {
        "ICEBERG_CATALOG_URI" : "http://localhost:8181",
        "ICEBERG_WAREHOUSE" : "YOUR ICEBERG WAREHOUSE NAME",
        "S3_ENDPOINT" : "OPTIONAL IF USING S3",
        "AWS_ACCESS_KEY_ID" : "YOUR S3 ACCESS KEY",
        "AWS_SECRET_ACCESS_KEY" : "YOUR S3 SECRET KEY"
      }
    }
  }
}
```

## Design

### Architecture

The MCP server is built on three main components:

1. **MCP Protocol Handler**
   - Implements the Model Context Protocol for communication with Claude
   - Handles request/response cycles through stdio
   - Manages server lifecycle and initialization

2. **Query Processor**
   - Parses SQL queries using `sqlparse`
   - Supports operations:
     - LIST TABLES
     - DESCRIBE TABLE
     - SELECT
     - INSERT

3. **Iceberg Integration**
   - Uses `pyiceberg` for table operations
   - Integrates with PyArrow for efficient data handling
   - Manages catalog connections and table operations

### PyIceberg Integration

The server utilizes PyIceberg in several ways:

1. **Catalog Management**
   - Connects to REST catalogs
   - Manages table metadata
   - Handles namespace operations

2. **Data Operations**
   - Converts between PyIceberg and PyArrow types
   - Handles data insertion through PyArrow tables
   - Manages table schemas and field types

3. **Query Execution**
   - Translates SQL to PyIceberg operations
   - Handles data scanning and filtering
   - Manages result set conversion

## Further Implementation Needed

1. **Query Operations**
   - [ ] Implement UPDATE operations
   - [ ] Add DELETE support
   - [ ] Support for CREATE TABLE with schema definition
   - [ ] Add ALTER TABLE operations
   - [ ] Implement table partitioning support

2. **Data Types**
   - [ ] Support for complex types (arrays, maps, structs)
   - [ ] Add timestamp with timezone handling
   - [ ] Support for decimal types
   - [ ] Add nested field support

3. **Performance Improvements**
   - [ ] Implement batch inserts
   - [ ] Add query optimization
   - [ ] Support for parallel scans
   - [ ] Add caching layer for frequently accessed data

4. **Security Features**
   - [ ] Add authentication mechanisms
   - [ ] Implement role-based access control
   - [ ] Add row-level security
   - [ ] Support for encrypted connections

5. **Monitoring and Management**
   - [ ] Add metrics collection
   - [ ] Implement query logging
   - [ ] Add performance monitoring
   - [ ] Support for table maintenance operations

6. **Error Handling**
   - [ ] Improve error messages
   - [ ] Add retry mechanisms for transient failures
   - [ ] Implement transaction support
   - [ ] Add data validation


```

--------------------------------------------------------------------------------
/__init__.py:
--------------------------------------------------------------------------------

```python
from .src.mcp_server_iceberg.server import main

__all__ = ["main"]
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
pyiceberg
python-dotenv
mcp
pyarrow  # Required for PyIceberg data handling
sqlparse  # Required for SQL parsing 
```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
FROM python:3.10-slim

# Set working directory
WORKDIR /app

# Copy project files
COPY . /app

# Install pip dependencies and the package itself
RUN pip install --upgrade pip \
    && pip install --no-cache-dir .

# Expose port if needed (not strictly required for mcp using stdio)
# EXPOSE 8080

# Command to run the MCP server
CMD ["mcp-server-iceberg"]

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "mcp-server-iceberg"
version = "0.1.0"
description = "MCP server for interacting with an Apache Iceberg catalog and data lake"
readme = "README.md"
requires-python = ">=3.10"  
dependencies = [  
    "mcp>=1.0.0",                    
    "pyiceberg>=0.5.1",             # Apache Iceberg Python SDK
    "pyarrow>=14.0.1",              # PyArrow for data handling
    "sqlparse>=0.4.4",              # SQL parsing
    "python-dotenv",                # Environment variable management
    "requests",                     # HTTP requests for REST catalog
    "fsspec",                       # Filesystem abstraction
    "s3fs",                        # S3 filesystem support
]

# Build system configuration 
[build-system]
requires = ["hatchling"]  
build-backend = "hatchling.build" 

# Build configuration 
[tool.hatch.build.targets.wheel]
packages = ["src/mcp_server_iceberg"] 


[project.scripts]
mcp-server-iceberg = "mcp_server_iceberg.server:run_server"  
```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
# Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml

startCommand:
  type: stdio
  configSchema:
    # JSON Schema defining the configuration options for the MCP.
    type: object
    required:
      - icebergCatalogUri
      - icebergWarehouse
    properties:
      icebergCatalogUri:
        type: string
        default: http://localhost:8181
        description: URI to the Iceberg REST catalog endpoint
      icebergWarehouse:
        type: string
        default: default_warehouse
        description: Name of the Iceberg warehouse
      s3Endpoint:
        type: string
        default: ""
        description: Optional S3 endpoint if using S3 for storage
      awsAccessKeyId:
        type: string
        default: ""
        description: AWS access key ID for S3
      awsSecretAccessKey:
        type: string
        default: ""
        description: AWS secret access key for S3
  commandFunction:
    # A JS function that produces the CLI command based on the given config to start the MCP on stdio.
    |-
    (config) => ({
      command: 'mcp-server-iceberg',
      args: [],
      env: {
        ICEBERG_CATALOG_URI: config.icebergCatalogUri,
        ICEBERG_WAREHOUSE: config.icebergWarehouse,
        S3_ENDPOINT: config.s3Endpoint || '',
        AWS_ACCESS_KEY_ID: config.awsAccessKeyId || '',
        AWS_SECRET_ACCESS_KEY: config.awsSecretAccessKey || ''
      }
    })
  exampleConfig:
    icebergCatalogUri: http://localhost:8181
    icebergWarehouse: my_iceberg_warehouse
    s3Endpoint: http://localhost:9000
    awsAccessKeyId: exampleAccessKeyId
    awsSecretAccessKey: exampleSecretAccessKey

```

--------------------------------------------------------------------------------
/src/mcp_server_iceberg/server.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
import os
import asyncio
import logging
import json
import time
from dotenv import load_dotenv
import mcp.server.stdio
from mcp.server import Server
from mcp.types import Tool, ServerResult, TextContent
from typing import Optional, Any, Dict, List
from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import *
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.sorting import SortOrder
from pyiceberg.types import *
import sqlparse
from sqlparse.sql import Token, TokenList
from sqlparse.tokens import Keyword, DML
import pyarrow as pa
import pyarrow.parquet as pq

# Configure logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('iceberg_server')

load_dotenv()

class IcebergConnection:
    """
    Iceberg catalog connection management class
    """
    def __init__(self):
        # Initialize configuration
        self.config = {
            "uri": os.getenv("ICEBERG_CATALOG_URI"),
            "warehouse": os.getenv("ICEBERG_WAREHOUSE"),
            "s3.endpoint": os.getenv("S3_ENDPOINT", ""),
            "s3.access-key-id": os.getenv("AWS_ACCESS_KEY_ID", ""),
            "s3.secret-access-key": os.getenv("AWS_SECRET_ACCESS_KEY", ""),
        }
        self.catalog = None
        logger.info(f"Initialized with config (excluding credentials): {json.dumps({k:v for k,v in self.config.items() if not 'key' in k})}")
    
    def ensure_connection(self):
        """
        Ensure catalog connection is available
        """
        try:
            if self.catalog is None:
                logger.info("Creating new Iceberg catalog connection...")
                self.catalog = load_catalog(
                    "iceberg",
                    **{k: v for k, v in self.config.items() if v}
                )
                logger.info("New catalog connection established")
            return self.catalog
        except Exception as e:
            logger.error(f"Connection error: {str(e)}")
            raise

    def parse_sql(self, query: str) -> Dict:
        """
        Parse SQL query and extract relevant information
        
        Args:
            query (str): SQL query to parse
            
        Returns:
            Dict: Parsed query information
        """
        parsed = sqlparse.parse(query)[0]
        tokens = [token for token in parsed.tokens if not token.is_whitespace]
        
        result = {
            "type": None,
            "table": None,
            "columns": None,
            "where": None,
            "order_by": None,
            "limit": None
        }
        
        # Determine query type
        for token in tokens:
            if token.ttype is DML:
                result["type"] = token.value.upper()
                break
        
        # Extract table name
        for i, token in enumerate(tokens):
            if token.value.upper() == "FROM":
                if i + 1 < len(tokens):
                    result["table"] = tokens[i + 1].value
                break
        
        # Extract columns for SELECT
        if result["type"] == "SELECT":
            for i, token in enumerate(tokens):
                if token.value.upper() == "SELECT":
                    if i + 1 < len(tokens):
                        cols = tokens[i + 1].value
                        result["columns"] = [c.strip() for c in cols.split(",")]
                    break
        
        return result

    def execute_query(self, query: str) -> list[dict[str, Any]]:
        """
        Execute query on Iceberg tables
        
        Args:
            query (str): Query to execute
            
        Returns:
            list[dict[str, Any]]: Query results
        """
        start_time = time.time()
        logger.info(f"Executing query: {query[:200]}...")
        
        try:
            catalog = self.ensure_connection()
            query_upper = query.strip().upper()
            
            # Handle special commands
            if query_upper.startswith("LIST TABLES"):
                results = []
                for namespace in catalog.list_namespaces():
                    for table in catalog.list_tables(namespace):
                        results.append({
                            "namespace": ".".join(namespace),
                            "table": table
                        })
                logger.info(f"Listed {len(results)} tables in {time.time() - start_time:.2f}s")
                return results
            
            elif query_upper.startswith("DESCRIBE TABLE"):
                table_name = query[len("DESCRIBE TABLE"):].strip()
                table = catalog.load_table(table_name)
                schema_dict = {
                    "schema": str(table.schema()),
                    "partition_spec": [str(field) for field in (table.spec().fields if table.spec() else [])],
                    "sort_order": [str(field) for field in (table.sort_order().fields if table.sort_order() else [])],
                    "properties": table.properties
                }
                return [schema_dict]
            
            # Handle SQL queries
            parsed = self.parse_sql(query)
            
            if parsed["type"] == "SELECT":
                table = catalog.load_table(parsed["table"])
                scan = table.scan()
                
                # Apply column projection if specified
                if parsed["columns"] and "*" not in parsed["columns"]:
                    scan = scan.select(*parsed["columns"])
                
                # Convert results to dicts
                results = []
                arrow_table = scan.to_arrow()
                
                # Convert PyArrow Table to list of dicts
                for batch in arrow_table.to_batches():
                    for row_idx in range(len(batch)):
                        row_dict = {}
                        for col_name in batch.schema.names:
                            val = batch[col_name][row_idx].as_py()
                            row_dict[col_name] = val
                        results.append(row_dict)
                
                logger.info(f"Query returned {len(results)} rows in {time.time() - start_time:.2f}s")
                return results
            
            elif parsed["type"] == "INSERT":
                # Extract table name and values
                table_name = None
                values = []
                
                # Parse INSERT INTO table_name VALUES (...) syntax
                parsed_stmt = sqlparse.parse(query)[0]
                logger.info(f"Parsed statement: {parsed_stmt}")
                
                # Find the VALUES token and extract values
                values_token = None
                table_identifier = None
                
                for token in parsed_stmt.tokens:
                    logger.info(f"Token: {token}, Type: {token.ttype}, Value: {token.value}")
                    if isinstance(token, sqlparse.sql.Identifier):
                        table_identifier = token
                    elif token.value.upper() == 'VALUES':
                        values_token = token
                        break
                
                if table_identifier:
                    # Handle multi-part identifiers (e.g., schema.table)
                    table_name = str(table_identifier)
                    logger.info(f"Found table name: {table_name}")
                
                if values_token and len(parsed_stmt.tokens) > parsed_stmt.tokens.index(values_token) + 1:
                    next_token = parsed_stmt.tokens[parsed_stmt.tokens.index(values_token) + 1]
                    if isinstance(next_token, sqlparse.sql.Parenthesis):
                        values_str = next_token.value.strip('()').split(',')
                        values = []
                        for v in values_str:
                            v = v.strip()
                            if v.startswith("'") and v.endswith("'"):
                                values.append(v.strip("'"))
                            elif v.lower() == 'true':
                                values.append(True)
                            elif v.lower() == 'false':
                                values.append(False)
                            elif v.lower() == 'null':
                                values.append(None)
                            else:
                                try:
                                    values.append(int(v))
                                except ValueError:
                                    try:
                                        values.append(float(v))
                                    except ValueError:
                                        values.append(v)
                        logger.info(f"Extracted values: {values}")
                
                if not table_name or values is None:
                    raise ValueError(f"Invalid INSERT statement format. Table: {table_name}, Values: {values}")
                
                logger.info(f"Inserting into table: {table_name}")
                logger.info(f"Values: {values}")
                
                # Load table and schema
                table = catalog.load_table(table_name)
                schema = table.schema()
                
                # Create PyArrow arrays for each field
                arrays = []
                names = []
                for i, field in enumerate(schema.fields):
                    names.append(field.name)
                    value = values[i] if i < len(values) else None
                    if isinstance(field.field_type, IntegerType):
                        arrays.append(pa.array([value], type=pa.int32()))
                    elif isinstance(field.field_type, StringType):
                        arrays.append(pa.array([value], type=pa.string()))
                    elif isinstance(field.field_type, BooleanType):
                        arrays.append(pa.array([value], type=pa.bool_()))
                    elif isinstance(field.field_type, DoubleType):
                        arrays.append(pa.array([value], type=pa.float64()))
                    elif isinstance(field.field_type, TimestampType):
                        arrays.append(pa.array([value], type=pa.timestamp('us')))
                    else:
                        arrays.append(pa.array([value], type=pa.string()))
                
                # Create PyArrow table
                pa_table = pa.Table.from_arrays(arrays, names=names)
                
                # Append the PyArrow table directly to the Iceberg table
                table.append(pa_table)
                
                return [{"status": "Inserted 1 row successfully"}]
            
            elif parsed["type"] == "CREATE":
                # Basic CREATE TABLE support
                if "CREATE TABLE" in query_upper:
                    # Extract table name and schema
                    parts = query.split("(", 1)
                    table_name = parts[0].replace("CREATE TABLE", "").strip()
                    schema_str = parts[1].strip()[:-1]  # Remove trailing )
                    
                    # Parse schema definition
                    schema_fields = []
                    for field in schema_str.split(","):
                        name, type_str = field.strip().split(" ", 1)
                        type_str = type_str.upper()
                        if "STRING" in type_str:
                            field_type = StringType()
                        elif "INT" in type_str:
                            field_type = IntegerType()
                        elif "DOUBLE" in type_str:
                            field_type = DoubleType()
                        elif "TIMESTAMP" in type_str:
                            field_type = TimestampType()
                        else:
                            field_type = StringType()
                        schema_fields.append(NestedField(len(schema_fields), name, field_type, required=False))
                    
                    schema = Schema(*schema_fields)
                    catalog.create_table(table_name, schema)
                    return [{"status": "Table created successfully"}]
            
            else:
                raise ValueError(f"Unsupported query type: {parsed['type']}")
                
        except Exception as e:
            logger.error(f"Query error: {str(e)}")
            logger.error(f"Error type: {type(e).__name__}")
            raise

    def close(self):
        """
        Clean up resources
        """
        if self.catalog:
            logger.info("Cleaning up catalog resources")
            self.catalog = None

class IcebergServer(Server):
    """
    Iceberg MCP server class, handles client interactions
    """
    def __init__(self):
        super().__init__(name="iceberg-server")
        self.db = IcebergConnection()
        logger.info("IcebergServer initialized")

        @self.list_tools()
        async def handle_tools():
            """
            Return list of available tools
            """
            return [
                Tool(
                    name="execute_query",
                    description="Execute a query on Iceberg tables",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "query": {
                                "type": "string",
                                "description": "Query to execute (supports: LIST TABLES, DESCRIBE TABLE, SELECT, CREATE TABLE)"
                            }
                        },
                        "required": ["query"]
                    }
                )
            ]

        @self.call_tool()
        async def handle_call_tool(name: str, arguments: dict):
            """
            Handle tool call requests
            
            Args:
                name (str): Tool name
                arguments (dict): Tool arguments
                
            Returns:
                list[TextContent]: Execution results
            """
            if name == "execute_query":
                start_time = time.time()
                try:
                    result = self.db.execute_query(arguments["query"])
                    execution_time = time.time() - start_time
                    
                    return [TextContent(
                        type="text",
                        text=f"Results (execution time: {execution_time:.2f}s):\n{json.dumps(result, indent=2)}"
                    )]
                except Exception as e:
                    error_message = f"Error executing query: {str(e)}"
                    logger.error(error_message)
                    return [TextContent(
                        type="text",
                        text=error_message
                    )]

    def __del__(self):
        """
        Clean up resources
        """
        if hasattr(self, 'db'):
            self.db.close()

async def main():
    """
    Main function, starts server and handles requests
    """
    try:
        server = IcebergServer()
        initialization_options = server.create_initialization_options()
        logger.info("Starting server")
        
        async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
            await server.run(
                read_stream,
                write_stream,
                initialization_options
            )
    except Exception as e:
        logger.critical(f"Server failed: {str(e)}", exc_info=True)
        raise
    finally:
        logger.info("Server shutting down")

def run_server():
    """
    Entry point for running the server
    """
    asyncio.run(main())

if __name__ == "__main__":
    run_server()
```