#
tokens: 3065/50000 8/8 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .github
│   └── workflows
│       └── publish.yml
├── .python-version
├── LICENSE
├── mcp_doris
│   ├── __init__.py
│   ├── __pycache__
│   │   ├── __init__.cpython-313.pyc
│   │   └── mcp_env.cpython-313.pyc
│   ├── main.py
│   ├── mcp_env.py
│   └── mcp_server.py
├── mcp-doris-demo.gif
├── pyproject.toml
├── README.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.13

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Apache Doris MCP Server

[![smithery badge](https://smithery.ai/badge/@morningman/mcp-doris)](https://smithery.ai/server/@morningman/mcp-doris)

An [MCP server](https://modelcontextprotocol.io/introduction) for [Apache Doris](https://doris.apache.org/).

![Demo](mcp-doris-demo.gif)

## Usage

### Cursor

```
Name: doris
Type: command
Command: DORIS_HOST=<doris-host> DORIS_PORT=<port> DORIS_USER=<doris-user> DORIS_PASSWORD=<doris-pwd> uv run --with mcp-doris --python 3.13 mcp-doris
```

## Development

### Prerequest

- install [uv](https://docs.astral.sh/uv)

### Run MCP Inspector

```sql
cd /path/to/mcp-doris
uv sync
source .venv/bin/activate
export PYTHONPATH=/path/to/mcp-doris:$PYTHONPATH
env DORIS_HOST=<doris-host> DORIS_PORT=<port> DORIS_USER=<doris-user> DORIS_PASSWORD=<doris-pwd> mcp dev mcp_doris/mcp_server.py
```

Then visit `http://localhost:5173` in web browser.

## Publish

```
uv build
uv publish
```

```

--------------------------------------------------------------------------------
/mcp_doris/main.py:
--------------------------------------------------------------------------------

```python
from .mcp_server import mcp

def main():
    mcp.run()

if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/mcp_doris/__init__.py:
--------------------------------------------------------------------------------

```python
from .mcp_server import (
    create_doris_client,
    show_databases,
    show_tables,
    execute_query,
)

__all__ = [
    "show_databases",
    "show_tables",
    "execute_query",
    "create_doris_client",
] 

```

--------------------------------------------------------------------------------
/.github/workflows/publish.yml:
--------------------------------------------------------------------------------

```yaml
on:
  workflow_dispatch:

jobs:
  publish:
    name: Upload release to PyPI
    runs-on: ubuntu-latest
    environment:
      name: pypi
      url: "https://pypi.org/p/mcp-doris"
    permissions:
      id-token: write
    steps:
      - uses: actions/checkout@v4
      - uses: astral-sh/setup-uv@v5
      - run: uv python install
      - run: uv build
      - uses: pypa/gh-action-pypi-publish@release/v1

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "mcp-doris"
version = "0.1.1"
description = "An MCP server for Apache Doris."
readme = "README.md"
license = "Apache-2.0"
license-files = ["LICENSE"]
requires-python = ">=3.13"
dependencies = [
     "mcp[cli]>=1.3.0",
     "python-dotenv>=1.0.1",
     "uvicorn>=0.34.0",
     "pip-system-certs>=4.0",
     "mysql-connector-python>=9.2.0",
]

[project.scripts]
mcp-doris = "mcp_doris.main:main"

[project.urls]
Home = "https://github.com/morningman/mcp-doris"

[project.optional-dependencies]
dev = [
    "ruff",
    "pytest"
]

[tool.hatch.build.targets.wheel]
packages = ["mcp_doris"]

[tool.ruff]
line-length = 100

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

```

--------------------------------------------------------------------------------
/mcp_doris/mcp_env.py:
--------------------------------------------------------------------------------

```python
"""Environment configuration for the MCP Doris server.

This module handles all environment variable configuration with sensible defaults
and type conversion.
"""

from dataclasses import dataclass
import os
from typing import Optional


@dataclass
class DorisConfig:
    """Configuration for Apache Doris connection settings.

    This class handles all environment variable configuration with sensible defaults
    and type conversion. It provides typed methods for accessing each configuration value.

    Required environment variables:
        DORIS_HOST: The hostname of the Doris server
        DORIS_PORT: The port number (default: 9030)
        DORIS_USER: The username for authentication
        DORIS_PASSWORD: The password for authentication

    Optional environment variables (with defaults):
        DORIS_DATABASE: Default database to use (default: None)
        DORIS_CONNECT_TIMEOUT: Connection timeout in seconds (default: 30)
        DORIS_READ_TIMEOUT: Read timeout in seconds (default: 300)
    """

    def __init__(self):
        """Initialize the configuration from environment variables."""
        self._validate_required_vars()

    @property
    def host(self) -> str:
        """Get the Doris host."""
        return os.environ["DORIS_HOST"]

    @property
    def port(self) -> int:
        """Get the Doris port.

        Defaults to 9030 (Doris MySQL protocol port).
        Can be overridden by DORIS_PORT environment variable.
        """
        return int(os.getenv("DORIS_PORT", "9030"))

    @property
    def username(self) -> str:
        """Get the Doris username."""
        return os.environ["DORIS_USER"]

    @property
    def password(self) -> str:
        """Get the Doris password."""
        return os.environ["DORIS_PASSWORD"]

    @property
    def database(self) -> Optional[str]:
        """Get the default database name if set."""
        return os.getenv("DORIS_DATABASE")

    @property
    def connect_timeout(self) -> int:
        """Get the connection timeout in seconds.

        Default: 30
        """
        return int(os.getenv("DORIS_CONNECT_TIMEOUT", "30"))

    @property
    def read_timeout(self) -> int:
        """Get the read timeout in seconds.

        Default: 300
        """
        return int(os.getenv("DORIS_READ_TIMEOUT", "300"))

    def get_client_config(self) -> dict:
        """Get the configuration dictionary for MySQL client.

        Returns:
            dict: Configuration ready to be passed to mysql.connector.connect()
        """
        config = {
            "host": self.host,
            "port": self.port,
            "user": self.username,
            "password": self.password,
            "connect_timeout": self.connect_timeout,
            "connection_timeout": self.read_timeout,
        }

        # Add optional database if set
        if self.database:
            config["database"] = self.database

        return config

    def _validate_required_vars(self) -> None:
        """Validate that all required environment variables are set.

        Raises:
            ValueError: If any required environment variable is missing.
        """
        missing_vars = []
        for var in ["DORIS_HOST", "DORIS_PORT", "DORIS_USER", "DORIS_PASSWORD"]:
            if var not in os.environ:
                missing_vars.append(var)

        if missing_vars:
            raise ValueError(
                f"Missing required environment variables: {', '.join(missing_vars)}"
            )


# Global instance for easy access
config = DorisConfig() 

```

--------------------------------------------------------------------------------
/mcp_doris/mcp_server.py:
--------------------------------------------------------------------------------

```python
import logging
from typing import List, Dict, Any
import concurrent.futures
import atexit
import mysql.connector
from mysql.connector import errorcode

from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP

from mcp_doris.mcp_env import config

MCP_SERVER_NAME = "mcp-doris"

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(MCP_SERVER_NAME)

QUERY_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=10)
atexit.register(lambda: QUERY_EXECUTOR.shutdown(wait=True))
SELECT_QUERY_TIMEOUT_SECS = 30

load_dotenv()

deps = [
    "mysql-connector-python",
    "python-dotenv",
    "uvicorn",
    "pip-system-certs",
]

mcp = FastMCP(MCP_SERVER_NAME, dependencies=deps)


def create_doris_client():
    """Create a MySQL connection to Apache Doris.
    
    Returns:
        mysql.connector.connection.MySQLConnection: A connection to the Doris database
    """
    client_config = config.get_client_config()
    logger.info(
        f"Creating Doris client connection to {client_config['host']}:{client_config['port']} "
        f"as {client_config['user']} "
        f"(connect_timeout={client_config['connect_timeout']}s, "
        f"connection_timeout={client_config['connection_timeout']}s)"
    )

    try:
        conn = mysql.connector.connect(**client_config)
        # Test the connection
        cursor = conn.cursor()
        cursor.execute("SELECT VERSION()")
        version = cursor.fetchone()[0]
        cursor.close()
        logger.info(f"Successfully connected to Apache Doris version {version}")
        return conn
    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
            logger.error("Invalid username or password")
        elif err.errno == errorcode.ER_BAD_DB_ERROR:
            logger.error("Database does not exist")
        else:
            logger.error(f"Failed to connect to Doris: {err}")
        raise


@mcp.tool()
def show_databases():
    """List all databases in the Doris instance.
    
    Returns:
        List[str]: A list of database names
    """
    logger.info("Listing all databases")
    conn = create_doris_client()
    cursor = conn.cursor()
    
    try:
        cursor.execute("SHOW DATABASES")
        databases = [row[0] for row in cursor.fetchall()]
        logger.info(f"Found {len(databases)} databases")
        return databases
    finally:
        cursor.close()
        conn.close()


@mcp.tool()
def show_tables(database: str, like: str = None):
    """List all tables in the specified database.
    
    Args:
        database: The database name
        like: Optional pattern to filter table names
        
    Returns:
        List[Dict]: A list of table information dictionaries
    """
    logger.info(f"Listing tables in database '{database}'")
    conn = create_doris_client()
    cursor = conn.cursor(dictionary=True)
    
    try:
        # Use the specified database
        cursor.execute(f"USE `{database}`")
        
        # Get tables
        query = "SHOW TABLES"
        if like:
            query += f" LIKE '{like}'"
        cursor.execute(query)
        table_names = [row['Tables_in_' + database] for row in cursor.fetchall()]
        
        tables = []
        for table in table_names:
            logger.info(f"Getting schema info for table {database}.{table}")
            
            # Get table schema
            cursor.execute(f"DESCRIBE `{table}`")
            columns = cursor.fetchall()
            
            # Get create table statement
            cursor.execute(f"SHOW CREATE TABLE `{table}`")
            create_table_result = cursor.fetchone()['Create Table']
            
            # Get table comment if available (extracted from create table statement)
            table_comment = None
            if "COMMENT=" in create_table_result:
                comment_parts = create_table_result.split("COMMENT=")
                if len(comment_parts) > 1:
                    table_comment = comment_parts[1].split("'")[1]
            
            tables.append({
                "database": database,
                "name": table,
                "comment": table_comment,
                "columns": columns,
                "create_table_query": create_table_result,
            })
        
        logger.info(f"Found {len(tables)} tables")
        return tables
    finally:
        cursor.close()
        conn.close()


def execute_query_impl(query: str) -> List[Dict[str, Any]]:
    """Execute a SELECT query against Doris.
    
    Args:
        query: The SQL query to execute
        
    Returns:
        List[Dict]: The query results as a list of dictionaries
    """
    conn = create_doris_client()
    cursor = conn.cursor(dictionary=True)
    
    try:
        cursor.execute(query)
        rows = cursor.fetchall()
        logger.info(f"Query returned {len(rows)} rows")
        return rows
    except Exception as err:
        logger.error(f"Error executing query: {err}")
        return [{"error": f"Error running query: {err}"}]
    finally:
        cursor.close()
        conn.close()


@mcp.tool()
def execute_query(query: str):
    """Run a SELECT query against Doris with timeout protection.
    
    Args:
        query: The SQL query to execute
        
    Returns:
        List[Dict]: The query results
    """
    logger.info(f"Executing SELECT query: {query}")
    
    # Basic validation to ensure it's a SELECT query
    if not query.strip().upper().startswith("SELECT"):
        return {"error": "Only SELECT queries are allowed for security reasons"}
    
    future = QUERY_EXECUTOR.submit(execute_query_impl, query)
    try:
        result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS)
        return result
    except concurrent.futures.TimeoutError:
        logger.warning(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds: {query}")
        future.cancel()
        return {"error": f"Queries taking longer than {SELECT_QUERY_TIMEOUT_SECS} seconds are currently not supported."} 

```