# Directory Structure ``` ├── __init__.py ├── .gitignore ├── claude-desktop-ss.png ├── Dockerfile ├── pyproject.toml ├── README.md ├── requirements.txt ├── smithery.yaml ├── src │ └── mcp_server_iceberg │ └── server.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` 1 | # Sensitive files 2 | .env 3 | *.env 4 | !.env.template 5 | 6 | # Python 7 | __pycache__/ 8 | *.py[cod] 9 | *$py.class 10 | venv/ 11 | *.egg-info/ 12 | 13 | # IDE 14 | .idea/ 15 | .vscode/ 16 | 17 | # Logs 18 | *.log ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # MCP Iceberg Catalog 2 | 3 | [](https://smithery.ai/server/@ahodroj/mcp-iceberg-service) 4 | 5 | A MCP (Model Context Protocol) server implementation for interacting with Apache Iceberg. This server provides a SQL interface for querying and managing Iceberg tables through Claude desktop. 6 | 7 | ## Claude Desktop as your Iceberg Data Lake Catalog 8 |  9 | 10 | ## How to Install in Claude Desktop 11 | 12 | ### Installing via Smithery 13 | 14 | To install MCP Iceberg Catalog for Claude Desktop automatically via [Smithery](https://smithery.ai/server/@ahodroj/mcp-iceberg-service): 15 | 16 | ```bash 17 | npx -y @smithery/cli install @ahodroj/mcp-iceberg-service --client claude 18 | ``` 19 | 20 | 1. **Prerequisites** 21 | - Python 3.10 or higher 22 | - UV package installer (recommended) or pip 23 | - Access to an Iceberg REST catalog and S3-compatible storage 24 | 25 | 2. **How to install in Claude Desktop** 26 | Add the following configuration to `claude_desktop_config.json`: 27 | 28 | ```json 29 | { 30 | "mcpServers": { 31 | "iceberg": { 32 | "command": "uv", 33 | "args": [ 34 | "--directory", 35 | "PATH_TO_/mcp-iceberg-service", 36 | "run", 37 | "mcp-server-iceberg" 38 | ], 39 | "env": { 40 | "ICEBERG_CATALOG_URI" : "http://localhost:8181", 41 | "ICEBERG_WAREHOUSE" : "YOUR ICEBERG WAREHOUSE NAME", 42 | "S3_ENDPOINT" : "OPTIONAL IF USING S3", 43 | "AWS_ACCESS_KEY_ID" : "YOUR S3 ACCESS KEY", 44 | "AWS_SECRET_ACCESS_KEY" : "YOUR S3 SECRET KEY" 45 | } 46 | } 47 | } 48 | } 49 | ``` 50 | 51 | ## Design 52 | 53 | ### Architecture 54 | 55 | The MCP server is built on three main components: 56 | 57 | 1. **MCP Protocol Handler** 58 | - Implements the Model Context Protocol for communication with Claude 59 | - Handles request/response cycles through stdio 60 | - Manages server lifecycle and initialization 61 | 62 | 2. **Query Processor** 63 | - Parses SQL queries using `sqlparse` 64 | - Supports operations: 65 | - LIST TABLES 66 | - DESCRIBE TABLE 67 | - SELECT 68 | - INSERT 69 | 70 | 3. **Iceberg Integration** 71 | - Uses `pyiceberg` for table operations 72 | - Integrates with PyArrow for efficient data handling 73 | - Manages catalog connections and table operations 74 | 75 | ### PyIceberg Integration 76 | 77 | The server utilizes PyIceberg in several ways: 78 | 79 | 1. **Catalog Management** 80 | - Connects to REST catalogs 81 | - Manages table metadata 82 | - Handles namespace operations 83 | 84 | 2. **Data Operations** 85 | - Converts between PyIceberg and PyArrow types 86 | - Handles data insertion through PyArrow tables 87 | - Manages table schemas and field types 88 | 89 | 3. **Query Execution** 90 | - Translates SQL to PyIceberg operations 91 | - Handles data scanning and filtering 92 | - Manages result set conversion 93 | 94 | ## Further Implementation Needed 95 | 96 | 1. **Query Operations** 97 | - [ ] Implement UPDATE operations 98 | - [ ] Add DELETE support 99 | - [ ] Support for CREATE TABLE with schema definition 100 | - [ ] Add ALTER TABLE operations 101 | - [ ] Implement table partitioning support 102 | 103 | 2. **Data Types** 104 | - [ ] Support for complex types (arrays, maps, structs) 105 | - [ ] Add timestamp with timezone handling 106 | - [ ] Support for decimal types 107 | - [ ] Add nested field support 108 | 109 | 3. **Performance Improvements** 110 | - [ ] Implement batch inserts 111 | - [ ] Add query optimization 112 | - [ ] Support for parallel scans 113 | - [ ] Add caching layer for frequently accessed data 114 | 115 | 4. **Security Features** 116 | - [ ] Add authentication mechanisms 117 | - [ ] Implement role-based access control 118 | - [ ] Add row-level security 119 | - [ ] Support for encrypted connections 120 | 121 | 5. **Monitoring and Management** 122 | - [ ] Add metrics collection 123 | - [ ] Implement query logging 124 | - [ ] Add performance monitoring 125 | - [ ] Support for table maintenance operations 126 | 127 | 6. **Error Handling** 128 | - [ ] Improve error messages 129 | - [ ] Add retry mechanisms for transient failures 130 | - [ ] Implement transaction support 131 | - [ ] Add data validation 132 | 133 | ``` -------------------------------------------------------------------------------- /__init__.py: -------------------------------------------------------------------------------- ```python 1 | from .src.mcp_server_iceberg.server import main 2 | 3 | __all__ = ["main"] ``` -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` 1 | pyiceberg 2 | python-dotenv 3 | mcp 4 | pyarrow # Required for PyIceberg data handling 5 | sqlparse # Required for SQL parsing ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile 1 | # Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile 2 | FROM python:3.10-slim 3 | 4 | # Set working directory 5 | WORKDIR /app 6 | 7 | # Copy project files 8 | COPY . /app 9 | 10 | # Install pip dependencies and the package itself 11 | RUN pip install --upgrade pip \ 12 | && pip install --no-cache-dir . 13 | 14 | # Expose port if needed (not strictly required for mcp using stdio) 15 | # EXPOSE 8080 16 | 17 | # Command to run the MCP server 18 | CMD ["mcp-server-iceberg"] 19 | ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml 1 | [project] 2 | name = "mcp-server-iceberg" 3 | version = "0.1.0" 4 | description = "MCP server for interacting with an Apache Iceberg catalog and data lake" 5 | readme = "README.md" 6 | requires-python = ">=3.10" 7 | dependencies = [ 8 | "mcp>=1.0.0", 9 | "pyiceberg>=0.5.1", # Apache Iceberg Python SDK 10 | "pyarrow>=14.0.1", # PyArrow for data handling 11 | "sqlparse>=0.4.4", # SQL parsing 12 | "python-dotenv", # Environment variable management 13 | "requests", # HTTP requests for REST catalog 14 | "fsspec", # Filesystem abstraction 15 | "s3fs", # S3 filesystem support 16 | ] 17 | 18 | # Build system configuration 19 | [build-system] 20 | requires = ["hatchling"] 21 | build-backend = "hatchling.build" 22 | 23 | # Build configuration 24 | [tool.hatch.build.targets.wheel] 25 | packages = ["src/mcp_server_iceberg"] 26 | 27 | 28 | [project.scripts] 29 | mcp-server-iceberg = "mcp_server_iceberg.server:run_server" ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml 1 | # Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml 2 | 3 | startCommand: 4 | type: stdio 5 | configSchema: 6 | # JSON Schema defining the configuration options for the MCP. 7 | type: object 8 | required: 9 | - icebergCatalogUri 10 | - icebergWarehouse 11 | properties: 12 | icebergCatalogUri: 13 | type: string 14 | default: http://localhost:8181 15 | description: URI to the Iceberg REST catalog endpoint 16 | icebergWarehouse: 17 | type: string 18 | default: default_warehouse 19 | description: Name of the Iceberg warehouse 20 | s3Endpoint: 21 | type: string 22 | default: "" 23 | description: Optional S3 endpoint if using S3 for storage 24 | awsAccessKeyId: 25 | type: string 26 | default: "" 27 | description: AWS access key ID for S3 28 | awsSecretAccessKey: 29 | type: string 30 | default: "" 31 | description: AWS secret access key for S3 32 | commandFunction: 33 | # A JS function that produces the CLI command based on the given config to start the MCP on stdio. 34 | |- 35 | (config) => ({ 36 | command: 'mcp-server-iceberg', 37 | args: [], 38 | env: { 39 | ICEBERG_CATALOG_URI: config.icebergCatalogUri, 40 | ICEBERG_WAREHOUSE: config.icebergWarehouse, 41 | S3_ENDPOINT: config.s3Endpoint || '', 42 | AWS_ACCESS_KEY_ID: config.awsAccessKeyId || '', 43 | AWS_SECRET_ACCESS_KEY: config.awsSecretAccessKey || '' 44 | } 45 | }) 46 | exampleConfig: 47 | icebergCatalogUri: http://localhost:8181 48 | icebergWarehouse: my_iceberg_warehouse 49 | s3Endpoint: http://localhost:9000 50 | awsAccessKeyId: exampleAccessKeyId 51 | awsSecretAccessKey: exampleSecretAccessKey 52 | ``` -------------------------------------------------------------------------------- /src/mcp_server_iceberg/server.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python 2 | import os 3 | import asyncio 4 | import logging 5 | import json 6 | import time 7 | from dotenv import load_dotenv 8 | import mcp.server.stdio 9 | from mcp.server import Server 10 | from mcp.types import Tool, ServerResult, TextContent 11 | from typing import Optional, Any, Dict, List 12 | from pyiceberg.catalog import load_catalog 13 | from pyiceberg.expressions import * 14 | from pyiceberg.schema import Schema 15 | from pyiceberg.table import Table 16 | from pyiceberg.table.sorting import SortOrder 17 | from pyiceberg.types import * 18 | import sqlparse 19 | from sqlparse.sql import Token, TokenList 20 | from sqlparse.tokens import Keyword, DML 21 | import pyarrow as pa 22 | import pyarrow.parquet as pq 23 | 24 | # Configure logging 25 | logging.basicConfig( 26 | level=logging.DEBUG, 27 | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' 28 | ) 29 | logger = logging.getLogger('iceberg_server') 30 | 31 | load_dotenv() 32 | 33 | class IcebergConnection: 34 | """ 35 | Iceberg catalog connection management class 36 | """ 37 | def __init__(self): 38 | # Initialize configuration 39 | self.config = { 40 | "uri": os.getenv("ICEBERG_CATALOG_URI"), 41 | "warehouse": os.getenv("ICEBERG_WAREHOUSE"), 42 | "s3.endpoint": os.getenv("S3_ENDPOINT", ""), 43 | "s3.access-key-id": os.getenv("AWS_ACCESS_KEY_ID", ""), 44 | "s3.secret-access-key": os.getenv("AWS_SECRET_ACCESS_KEY", ""), 45 | } 46 | self.catalog = None 47 | logger.info(f"Initialized with config (excluding credentials): {json.dumps({k:v for k,v in self.config.items() if not 'key' in k})}") 48 | 49 | def ensure_connection(self): 50 | """ 51 | Ensure catalog connection is available 52 | """ 53 | try: 54 | if self.catalog is None: 55 | logger.info("Creating new Iceberg catalog connection...") 56 | self.catalog = load_catalog( 57 | "iceberg", 58 | **{k: v for k, v in self.config.items() if v} 59 | ) 60 | logger.info("New catalog connection established") 61 | return self.catalog 62 | except Exception as e: 63 | logger.error(f"Connection error: {str(e)}") 64 | raise 65 | 66 | def parse_sql(self, query: str) -> Dict: 67 | """ 68 | Parse SQL query and extract relevant information 69 | 70 | Args: 71 | query (str): SQL query to parse 72 | 73 | Returns: 74 | Dict: Parsed query information 75 | """ 76 | parsed = sqlparse.parse(query)[0] 77 | tokens = [token for token in parsed.tokens if not token.is_whitespace] 78 | 79 | result = { 80 | "type": None, 81 | "table": None, 82 | "columns": None, 83 | "where": None, 84 | "order_by": None, 85 | "limit": None 86 | } 87 | 88 | # Determine query type 89 | for token in tokens: 90 | if token.ttype is DML: 91 | result["type"] = token.value.upper() 92 | break 93 | 94 | # Extract table name 95 | for i, token in enumerate(tokens): 96 | if token.value.upper() == "FROM": 97 | if i + 1 < len(tokens): 98 | result["table"] = tokens[i + 1].value 99 | break 100 | 101 | # Extract columns for SELECT 102 | if result["type"] == "SELECT": 103 | for i, token in enumerate(tokens): 104 | if token.value.upper() == "SELECT": 105 | if i + 1 < len(tokens): 106 | cols = tokens[i + 1].value 107 | result["columns"] = [c.strip() for c in cols.split(",")] 108 | break 109 | 110 | return result 111 | 112 | def execute_query(self, query: str) -> list[dict[str, Any]]: 113 | """ 114 | Execute query on Iceberg tables 115 | 116 | Args: 117 | query (str): Query to execute 118 | 119 | Returns: 120 | list[dict[str, Any]]: Query results 121 | """ 122 | start_time = time.time() 123 | logger.info(f"Executing query: {query[:200]}...") 124 | 125 | try: 126 | catalog = self.ensure_connection() 127 | query_upper = query.strip().upper() 128 | 129 | # Handle special commands 130 | if query_upper.startswith("LIST TABLES"): 131 | results = [] 132 | for namespace in catalog.list_namespaces(): 133 | for table in catalog.list_tables(namespace): 134 | results.append({ 135 | "namespace": ".".join(namespace), 136 | "table": table 137 | }) 138 | logger.info(f"Listed {len(results)} tables in {time.time() - start_time:.2f}s") 139 | return results 140 | 141 | elif query_upper.startswith("DESCRIBE TABLE"): 142 | table_name = query[len("DESCRIBE TABLE"):].strip() 143 | table = catalog.load_table(table_name) 144 | schema_dict = { 145 | "schema": str(table.schema()), 146 | "partition_spec": [str(field) for field in (table.spec().fields if table.spec() else [])], 147 | "sort_order": [str(field) for field in (table.sort_order().fields if table.sort_order() else [])], 148 | "properties": table.properties 149 | } 150 | return [schema_dict] 151 | 152 | # Handle SQL queries 153 | parsed = self.parse_sql(query) 154 | 155 | if parsed["type"] == "SELECT": 156 | table = catalog.load_table(parsed["table"]) 157 | scan = table.scan() 158 | 159 | # Apply column projection if specified 160 | if parsed["columns"] and "*" not in parsed["columns"]: 161 | scan = scan.select(*parsed["columns"]) 162 | 163 | # Convert results to dicts 164 | results = [] 165 | arrow_table = scan.to_arrow() 166 | 167 | # Convert PyArrow Table to list of dicts 168 | for batch in arrow_table.to_batches(): 169 | for row_idx in range(len(batch)): 170 | row_dict = {} 171 | for col_name in batch.schema.names: 172 | val = batch[col_name][row_idx].as_py() 173 | row_dict[col_name] = val 174 | results.append(row_dict) 175 | 176 | logger.info(f"Query returned {len(results)} rows in {time.time() - start_time:.2f}s") 177 | return results 178 | 179 | elif parsed["type"] == "INSERT": 180 | # Extract table name and values 181 | table_name = None 182 | values = [] 183 | 184 | # Parse INSERT INTO table_name VALUES (...) syntax 185 | parsed_stmt = sqlparse.parse(query)[0] 186 | logger.info(f"Parsed statement: {parsed_stmt}") 187 | 188 | # Find the VALUES token and extract values 189 | values_token = None 190 | table_identifier = None 191 | 192 | for token in parsed_stmt.tokens: 193 | logger.info(f"Token: {token}, Type: {token.ttype}, Value: {token.value}") 194 | if isinstance(token, sqlparse.sql.Identifier): 195 | table_identifier = token 196 | elif token.value.upper() == 'VALUES': 197 | values_token = token 198 | break 199 | 200 | if table_identifier: 201 | # Handle multi-part identifiers (e.g., schema.table) 202 | table_name = str(table_identifier) 203 | logger.info(f"Found table name: {table_name}") 204 | 205 | if values_token and len(parsed_stmt.tokens) > parsed_stmt.tokens.index(values_token) + 1: 206 | next_token = parsed_stmt.tokens[parsed_stmt.tokens.index(values_token) + 1] 207 | if isinstance(next_token, sqlparse.sql.Parenthesis): 208 | values_str = next_token.value.strip('()').split(',') 209 | values = [] 210 | for v in values_str: 211 | v = v.strip() 212 | if v.startswith("'") and v.endswith("'"): 213 | values.append(v.strip("'")) 214 | elif v.lower() == 'true': 215 | values.append(True) 216 | elif v.lower() == 'false': 217 | values.append(False) 218 | elif v.lower() == 'null': 219 | values.append(None) 220 | else: 221 | try: 222 | values.append(int(v)) 223 | except ValueError: 224 | try: 225 | values.append(float(v)) 226 | except ValueError: 227 | values.append(v) 228 | logger.info(f"Extracted values: {values}") 229 | 230 | if not table_name or values is None: 231 | raise ValueError(f"Invalid INSERT statement format. Table: {table_name}, Values: {values}") 232 | 233 | logger.info(f"Inserting into table: {table_name}") 234 | logger.info(f"Values: {values}") 235 | 236 | # Load table and schema 237 | table = catalog.load_table(table_name) 238 | schema = table.schema() 239 | 240 | # Create PyArrow arrays for each field 241 | arrays = [] 242 | names = [] 243 | for i, field in enumerate(schema.fields): 244 | names.append(field.name) 245 | value = values[i] if i < len(values) else None 246 | if isinstance(field.field_type, IntegerType): 247 | arrays.append(pa.array([value], type=pa.int32())) 248 | elif isinstance(field.field_type, StringType): 249 | arrays.append(pa.array([value], type=pa.string())) 250 | elif isinstance(field.field_type, BooleanType): 251 | arrays.append(pa.array([value], type=pa.bool_())) 252 | elif isinstance(field.field_type, DoubleType): 253 | arrays.append(pa.array([value], type=pa.float64())) 254 | elif isinstance(field.field_type, TimestampType): 255 | arrays.append(pa.array([value], type=pa.timestamp('us'))) 256 | else: 257 | arrays.append(pa.array([value], type=pa.string())) 258 | 259 | # Create PyArrow table 260 | pa_table = pa.Table.from_arrays(arrays, names=names) 261 | 262 | # Append the PyArrow table directly to the Iceberg table 263 | table.append(pa_table) 264 | 265 | return [{"status": "Inserted 1 row successfully"}] 266 | 267 | elif parsed["type"] == "CREATE": 268 | # Basic CREATE TABLE support 269 | if "CREATE TABLE" in query_upper: 270 | # Extract table name and schema 271 | parts = query.split("(", 1) 272 | table_name = parts[0].replace("CREATE TABLE", "").strip() 273 | schema_str = parts[1].strip()[:-1] # Remove trailing ) 274 | 275 | # Parse schema definition 276 | schema_fields = [] 277 | for field in schema_str.split(","): 278 | name, type_str = field.strip().split(" ", 1) 279 | type_str = type_str.upper() 280 | if "STRING" in type_str: 281 | field_type = StringType() 282 | elif "INT" in type_str: 283 | field_type = IntegerType() 284 | elif "DOUBLE" in type_str: 285 | field_type = DoubleType() 286 | elif "TIMESTAMP" in type_str: 287 | field_type = TimestampType() 288 | else: 289 | field_type = StringType() 290 | schema_fields.append(NestedField(len(schema_fields), name, field_type, required=False)) 291 | 292 | schema = Schema(*schema_fields) 293 | catalog.create_table(table_name, schema) 294 | return [{"status": "Table created successfully"}] 295 | 296 | else: 297 | raise ValueError(f"Unsupported query type: {parsed['type']}") 298 | 299 | except Exception as e: 300 | logger.error(f"Query error: {str(e)}") 301 | logger.error(f"Error type: {type(e).__name__}") 302 | raise 303 | 304 | def close(self): 305 | """ 306 | Clean up resources 307 | """ 308 | if self.catalog: 309 | logger.info("Cleaning up catalog resources") 310 | self.catalog = None 311 | 312 | class IcebergServer(Server): 313 | """ 314 | Iceberg MCP server class, handles client interactions 315 | """ 316 | def __init__(self): 317 | super().__init__(name="iceberg-server") 318 | self.db = IcebergConnection() 319 | logger.info("IcebergServer initialized") 320 | 321 | @self.list_tools() 322 | async def handle_tools(): 323 | """ 324 | Return list of available tools 325 | """ 326 | return [ 327 | Tool( 328 | name="execute_query", 329 | description="Execute a query on Iceberg tables", 330 | inputSchema={ 331 | "type": "object", 332 | "properties": { 333 | "query": { 334 | "type": "string", 335 | "description": "Query to execute (supports: LIST TABLES, DESCRIBE TABLE, SELECT, CREATE TABLE)" 336 | } 337 | }, 338 | "required": ["query"] 339 | } 340 | ) 341 | ] 342 | 343 | @self.call_tool() 344 | async def handle_call_tool(name: str, arguments: dict): 345 | """ 346 | Handle tool call requests 347 | 348 | Args: 349 | name (str): Tool name 350 | arguments (dict): Tool arguments 351 | 352 | Returns: 353 | list[TextContent]: Execution results 354 | """ 355 | if name == "execute_query": 356 | start_time = time.time() 357 | try: 358 | result = self.db.execute_query(arguments["query"]) 359 | execution_time = time.time() - start_time 360 | 361 | return [TextContent( 362 | type="text", 363 | text=f"Results (execution time: {execution_time:.2f}s):\n{json.dumps(result, indent=2)}" 364 | )] 365 | except Exception as e: 366 | error_message = f"Error executing query: {str(e)}" 367 | logger.error(error_message) 368 | return [TextContent( 369 | type="text", 370 | text=error_message 371 | )] 372 | 373 | def __del__(self): 374 | """ 375 | Clean up resources 376 | """ 377 | if hasattr(self, 'db'): 378 | self.db.close() 379 | 380 | async def main(): 381 | """ 382 | Main function, starts server and handles requests 383 | """ 384 | try: 385 | server = IcebergServer() 386 | initialization_options = server.create_initialization_options() 387 | logger.info("Starting server") 388 | 389 | async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): 390 | await server.run( 391 | read_stream, 392 | write_stream, 393 | initialization_options 394 | ) 395 | except Exception as e: 396 | logger.critical(f"Server failed: {str(e)}", exc_info=True) 397 | raise 398 | finally: 399 | logger.info("Server shutting down") 400 | 401 | def run_server(): 402 | """ 403 | Entry point for running the server 404 | """ 405 | asyncio.run(main()) 406 | 407 | if __name__ == "__main__": 408 | run_server() ```