#
tokens: 2374/50000 6/6 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── __init__.py
├── .env.template
├── .gitignore
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── requirements.txt
├── server.py
└── smithery.yaml
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Sensitive files
.env
*.env
!.env.template

# Python
__pycache__/
*.py[cod]
*$py.class
venv/
*.egg-info/

# IDE
.idea/
.vscode/

# Logs
*.log
```

--------------------------------------------------------------------------------
/__init__.py:
--------------------------------------------------------------------------------

```python
from .server import main

__all__ = ["main"]
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
snowflake-connector-python
python-dotenv
mcp

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
FROM python:3.10-alpine

# Set working directory
WORKDIR /app

# Install system dependencies: add gcc, g++, musl-dev, linux-headers
RUN apk add --no-cache gcc g++ musl-dev linux-headers

# Copy project files
COPY . /app

# Install Python dependencies
RUN pip install --no-cache-dir --upgrade pip \
    && pip install --no-cache-dir -r requirements.txt

# Set default environment variables (can be overridden)
ENV SNOWFLAKE_USER=your_username \
    SNOWFLAKE_PASSWORD=your_password \
    SNOWFLAKE_ACCOUNT=your_account \
    SNOWFLAKE_DATABASE=your_database \
    SNOWFLAKE_WAREHOUSE=your_warehouse

# Command to run the MCP server
CMD ["python", "server.py"]

```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
# Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml

startCommand:
  type: stdio
  configSchema:
    # JSON Schema defining the configuration options for the MCP.
    type: object
    required:
      - snowflakeUser
      - snowflakePassword
      - snowflakeAccount
      - snowflakeDatabase
      - snowflakeWarehouse
    properties:
      snowflakeUser:
        type: string
        description: Snowflake username
      snowflakePassword:
        type: string
        description: Snowflake password
      snowflakeAccount:
        type: string
        description: Snowflake account identifier
      snowflakeDatabase:
        type: string
        description: Snowflake database name
      snowflakeWarehouse:
        type: string
        description: Snowflake warehouse name
  commandFunction:
    # A JS function that produces the CLI command based on the given config to start the MCP on stdio.
    |-
    (config) => ({
      command: 'python',
      args: ['server.py'],
      env: {
        SNOWFLAKE_USER: config.snowflakeUser,
        SNOWFLAKE_PASSWORD: config.snowflakePassword,
        SNOWFLAKE_ACCOUNT: config.snowflakeAccount,
        SNOWFLAKE_DATABASE: config.snowflakeDatabase,
        SNOWFLAKE_WAREHOUSE: config.snowflakeWarehouse
      }
    })
  exampleConfig:
    snowflakeUser: example_user
    snowflakePassword: example_password
    snowflakeAccount: EXAMPLE12345.US-WEST-2
    snowflakeDatabase: EXAMPLE_DB
    snowflakeWarehouse: EXAMPLE_WH

```

--------------------------------------------------------------------------------
/server.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
import os
import asyncio
import logging
import json
import time
import snowflake.connector
from dotenv import load_dotenv
import mcp.server.stdio
from mcp.server import Server
from mcp.types import Tool, ServerResult, TextContent
from contextlib import closing
from typing import Optional, Any

# 配置日志 / Configure logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('snowflake_server')

load_dotenv()

class SnowflakeConnection:
    """
    Snowflake数据库连接管理类
    Snowflake database connection management class
    """
    def __init__(self):
        # 初始化配置信息 / Initialize configuration
        self.config = {
            "user": os.getenv("SNOWFLAKE_USER"),
            "password": os.getenv("SNOWFLAKE_PASSWORD"),
            "account": os.getenv("SNOWFLAKE_ACCOUNT"),
            "database": os.getenv("SNOWFLAKE_DATABASE"),
            "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
        }
        self.conn: Optional[snowflake.connector.SnowflakeConnection] = None
        logger.info(f"Initialized with config (excluding password): {json.dumps({k:v for k,v in self.config.items() if k != 'password'})}")
    
    def ensure_connection(self) -> snowflake.connector.SnowflakeConnection:
        """
        确保数据库连接可用,如果连接不存在或已断开则重新建立连接
        Ensure database connection is available, create new connection if it doesn't exist or is disconnected
        """
        try:
            # 检查连接是否需要重新建立 / Check if connection needs to be re-established
            if self.conn is None:
                logger.info("Creating new Snowflake connection...")
                self.conn = snowflake.connector.connect(
                    **self.config,
                    client_session_keep_alive=True,
                    network_timeout=15,
                    login_timeout=15
                )
                self.conn.cursor().execute("ALTER SESSION SET TIMEZONE = 'UTC'")
                logger.info("New connection established and configured")
            
            # 测试连接是否有效 / Test if connection is valid
            try:
                self.conn.cursor().execute("SELECT 1")
            except:
                logger.info("Connection lost, reconnecting...")
                self.conn = None
                return self.ensure_connection()
                
            return self.conn
        except Exception as e:
            logger.error(f"Connection error: {str(e)}")
            raise

    def execute_query(self, query: str) -> list[dict[str, Any]]:
        """
        执行SQL查询并返回结果
        Execute SQL query and return results
        
        Args:
            query (str): SQL查询语句 / SQL query statement
            
        Returns:
            list[dict[str, Any]]: 查询结果列表 / List of query results
        """
        start_time = time.time()
        logger.info(f"Executing query: {query[:200]}...")  # 只记录前200个字符 / Log only first 200 characters
        
        try:
            conn = self.ensure_connection()
            with conn.cursor() as cursor:
                # 对于写操作使用事务 / Use transaction for write operations
                if any(query.strip().upper().startswith(word) for word in ['INSERT', 'UPDATE', 'DELETE', 'CREATE', 'DROP', 'ALTER']):
                    cursor.execute("BEGIN")
                    try:
                        cursor.execute(query)
                        conn.commit()
                        logger.info(f"Write query executed in {time.time() - start_time:.2f}s")
                        return [{"affected_rows": cursor.rowcount}]
                    except Exception as e:
                        conn.rollback()
                        raise
                else:
                    # 读操作 / Read operations
                    cursor.execute(query)
                    if cursor.description:
                        columns = [col[0] for col in cursor.description]
                        rows = cursor.fetchall()
                        results = [dict(zip(columns, row)) for row in rows]
                        logger.info(f"Read query returned {len(results)} rows in {time.time() - start_time:.2f}s")
                        return results
                    return []
                
        except snowflake.connector.errors.ProgrammingError as e:
            logger.error(f"SQL Error: {str(e)}")
            logger.error(f"Error Code: {getattr(e, 'errno', 'unknown')}")
            raise
        except Exception as e:
            logger.error(f"Query error: {str(e)}")
            logger.error(f"Error type: {type(e).__name__}")
            raise

    def close(self):
        """
        关闭数据库连接
        Close database connection
        """
        if self.conn:
            try:
                self.conn.close()
                logger.info("Connection closed")
            except Exception as e:
                logger.error(f"Error closing connection: {str(e)}")
            finally:
                self.conn = None

class SnowflakeServer(Server):
    """
    Snowflake MCP服务器类,处理与客户端的交互
    Snowflake MCP server class, handles client interactions
    """
    def __init__(self):
        super().__init__(name="snowflake-server")
        self.db = SnowflakeConnection()
        logger.info("SnowflakeServer initialized")

        @self.list_tools()
        async def handle_tools():
            """
            返回可用工具列表
            Return list of available tools
            """
            return [
                Tool(
                    name="execute_query",
                    description="Execute a SQL query on Snowflake",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "query": {
                                "type": "string",
                                "description": "SQL query to execute"
                            }
                        },
                        "required": ["query"]
                    }
                )
            ]

        @self.call_tool()
        async def handle_call_tool(name: str, arguments: dict):
            """
            处理工具调用请求
            Handle tool call requests
            
            Args:
                name (str): 工具名称 / Tool name
                arguments (dict): 工具参数 / Tool arguments
                
            Returns:
                list[TextContent]: 执行结果 / Execution results
            """
            if name == "execute_query":
                start_time = time.time()
                try:
                    result = self.db.execute_query(arguments["query"])
                    execution_time = time.time() - start_time
                    
                    return [TextContent(
                        type="text",
                        text=f"Results (execution time: {execution_time:.2f}s):\n{result}"
                    )]
                except Exception as e:
                    error_message = f"Error executing query: {str(e)}"
                    logger.error(error_message)
                    return [TextContent(
                        type="text",
                        text=error_message
                    )]

    def __del__(self):
        """
        清理资源,关闭数据库连接
        Clean up resources, close database connection
        """
        if hasattr(self, 'db'):
            self.db.close()

async def main():
    """
    主函数,启动服务器并处理请求
    Main function, starts server and handles requests
    """
    try:
        server = SnowflakeServer()
        initialization_options = server.create_initialization_options()
        logger.info("Starting server")
        
        async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
            await server.run(
                read_stream,
                write_stream,
                initialization_options
            )
    except Exception as e:
        logger.critical(f"Server failed: {str(e)}", exc_info=True)
        raise
    finally:
        logger.info("Server shutting down")

if __name__ == "__main__":
    asyncio.run(main())
```