# Directory Structure
```
├── .github
│ └── workflows
│ └── publish.yml
├── .python-version
├── LICENSE
├── mcp_doris
│ ├── __init__.py
│ ├── __pycache__
│ │ ├── __init__.cpython-313.pyc
│ │ └── mcp_env.cpython-313.pyc
│ ├── main.py
│ ├── mcp_env.py
│ └── mcp_server.py
├── mcp-doris-demo.gif
├── pyproject.toml
├── README.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.13
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# Apache Doris MCP Server
[](https://smithery.ai/server/@morningman/mcp-doris)
An [MCP server](https://modelcontextprotocol.io/introduction) for [Apache Doris](https://doris.apache.org/).

## Usage
### Cursor
```
Name: doris
Type: command
Command: DORIS_HOST=<doris-host> DORIS_PORT=<port> DORIS_USER=<doris-user> DORIS_PASSWORD=<doris-pwd> uv run --with mcp-doris --python 3.13 mcp-doris
```
## Development
### Prerequest
- install [uv](https://docs.astral.sh/uv)
### Run MCP Inspector
```sql
cd /path/to/mcp-doris
uv sync
source .venv/bin/activate
export PYTHONPATH=/path/to/mcp-doris:$PYTHONPATH
env DORIS_HOST=<doris-host> DORIS_PORT=<port> DORIS_USER=<doris-user> DORIS_PASSWORD=<doris-pwd> mcp dev mcp_doris/mcp_server.py
```
Then visit `http://localhost:5173` in web browser.
## Publish
```
uv build
uv publish
```
```
--------------------------------------------------------------------------------
/mcp_doris/main.py:
--------------------------------------------------------------------------------
```python
from .mcp_server import mcp
def main():
mcp.run()
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/mcp_doris/__init__.py:
--------------------------------------------------------------------------------
```python
from .mcp_server import (
create_doris_client,
show_databases,
show_tables,
execute_query,
)
__all__ = [
"show_databases",
"show_tables",
"execute_query",
"create_doris_client",
]
```
--------------------------------------------------------------------------------
/.github/workflows/publish.yml:
--------------------------------------------------------------------------------
```yaml
on:
workflow_dispatch:
jobs:
publish:
name: Upload release to PyPI
runs-on: ubuntu-latest
environment:
name: pypi
url: "https://pypi.org/p/mcp-doris"
permissions:
id-token: write
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v5
- run: uv python install
- run: uv build
- uses: pypa/gh-action-pypi-publish@release/v1
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "mcp-doris"
version = "0.1.1"
description = "An MCP server for Apache Doris."
readme = "README.md"
license = "Apache-2.0"
license-files = ["LICENSE"]
requires-python = ">=3.13"
dependencies = [
"mcp[cli]>=1.3.0",
"python-dotenv>=1.0.1",
"uvicorn>=0.34.0",
"pip-system-certs>=4.0",
"mysql-connector-python>=9.2.0",
]
[project.scripts]
mcp-doris = "mcp_doris.main:main"
[project.urls]
Home = "https://github.com/morningman/mcp-doris"
[project.optional-dependencies]
dev = [
"ruff",
"pytest"
]
[tool.hatch.build.targets.wheel]
packages = ["mcp_doris"]
[tool.ruff]
line-length = 100
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
```
--------------------------------------------------------------------------------
/mcp_doris/mcp_env.py:
--------------------------------------------------------------------------------
```python
"""Environment configuration for the MCP Doris server.
This module handles all environment variable configuration with sensible defaults
and type conversion.
"""
from dataclasses import dataclass
import os
from typing import Optional
@dataclass
class DorisConfig:
"""Configuration for Apache Doris connection settings.
This class handles all environment variable configuration with sensible defaults
and type conversion. It provides typed methods for accessing each configuration value.
Required environment variables:
DORIS_HOST: The hostname of the Doris server
DORIS_PORT: The port number (default: 9030)
DORIS_USER: The username for authentication
DORIS_PASSWORD: The password for authentication
Optional environment variables (with defaults):
DORIS_DATABASE: Default database to use (default: None)
DORIS_CONNECT_TIMEOUT: Connection timeout in seconds (default: 30)
DORIS_READ_TIMEOUT: Read timeout in seconds (default: 300)
"""
def __init__(self):
"""Initialize the configuration from environment variables."""
self._validate_required_vars()
@property
def host(self) -> str:
"""Get the Doris host."""
return os.environ["DORIS_HOST"]
@property
def port(self) -> int:
"""Get the Doris port.
Defaults to 9030 (Doris MySQL protocol port).
Can be overridden by DORIS_PORT environment variable.
"""
return int(os.getenv("DORIS_PORT", "9030"))
@property
def username(self) -> str:
"""Get the Doris username."""
return os.environ["DORIS_USER"]
@property
def password(self) -> str:
"""Get the Doris password."""
return os.environ["DORIS_PASSWORD"]
@property
def database(self) -> Optional[str]:
"""Get the default database name if set."""
return os.getenv("DORIS_DATABASE")
@property
def connect_timeout(self) -> int:
"""Get the connection timeout in seconds.
Default: 30
"""
return int(os.getenv("DORIS_CONNECT_TIMEOUT", "30"))
@property
def read_timeout(self) -> int:
"""Get the read timeout in seconds.
Default: 300
"""
return int(os.getenv("DORIS_READ_TIMEOUT", "300"))
def get_client_config(self) -> dict:
"""Get the configuration dictionary for MySQL client.
Returns:
dict: Configuration ready to be passed to mysql.connector.connect()
"""
config = {
"host": self.host,
"port": self.port,
"user": self.username,
"password": self.password,
"connect_timeout": self.connect_timeout,
"connection_timeout": self.read_timeout,
}
# Add optional database if set
if self.database:
config["database"] = self.database
return config
def _validate_required_vars(self) -> None:
"""Validate that all required environment variables are set.
Raises:
ValueError: If any required environment variable is missing.
"""
missing_vars = []
for var in ["DORIS_HOST", "DORIS_PORT", "DORIS_USER", "DORIS_PASSWORD"]:
if var not in os.environ:
missing_vars.append(var)
if missing_vars:
raise ValueError(
f"Missing required environment variables: {', '.join(missing_vars)}"
)
# Global instance for easy access
config = DorisConfig()
```
--------------------------------------------------------------------------------
/mcp_doris/mcp_server.py:
--------------------------------------------------------------------------------
```python
import logging
from typing import List, Dict, Any
import concurrent.futures
import atexit
import mysql.connector
from mysql.connector import errorcode
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from mcp_doris.mcp_env import config
MCP_SERVER_NAME = "mcp-doris"
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(MCP_SERVER_NAME)
QUERY_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=10)
atexit.register(lambda: QUERY_EXECUTOR.shutdown(wait=True))
SELECT_QUERY_TIMEOUT_SECS = 30
load_dotenv()
deps = [
"mysql-connector-python",
"python-dotenv",
"uvicorn",
"pip-system-certs",
]
mcp = FastMCP(MCP_SERVER_NAME, dependencies=deps)
def create_doris_client():
"""Create a MySQL connection to Apache Doris.
Returns:
mysql.connector.connection.MySQLConnection: A connection to the Doris database
"""
client_config = config.get_client_config()
logger.info(
f"Creating Doris client connection to {client_config['host']}:{client_config['port']} "
f"as {client_config['user']} "
f"(connect_timeout={client_config['connect_timeout']}s, "
f"connection_timeout={client_config['connection_timeout']}s)"
)
try:
conn = mysql.connector.connect(**client_config)
# Test the connection
cursor = conn.cursor()
cursor.execute("SELECT VERSION()")
version = cursor.fetchone()[0]
cursor.close()
logger.info(f"Successfully connected to Apache Doris version {version}")
return conn
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
logger.error("Invalid username or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
logger.error("Database does not exist")
else:
logger.error(f"Failed to connect to Doris: {err}")
raise
@mcp.tool()
def show_databases():
"""List all databases in the Doris instance.
Returns:
List[str]: A list of database names
"""
logger.info("Listing all databases")
conn = create_doris_client()
cursor = conn.cursor()
try:
cursor.execute("SHOW DATABASES")
databases = [row[0] for row in cursor.fetchall()]
logger.info(f"Found {len(databases)} databases")
return databases
finally:
cursor.close()
conn.close()
@mcp.tool()
def show_tables(database: str, like: str = None):
"""List all tables in the specified database.
Args:
database: The database name
like: Optional pattern to filter table names
Returns:
List[Dict]: A list of table information dictionaries
"""
logger.info(f"Listing tables in database '{database}'")
conn = create_doris_client()
cursor = conn.cursor(dictionary=True)
try:
# Use the specified database
cursor.execute(f"USE `{database}`")
# Get tables
query = "SHOW TABLES"
if like:
query += f" LIKE '{like}'"
cursor.execute(query)
table_names = [row['Tables_in_' + database] for row in cursor.fetchall()]
tables = []
for table in table_names:
logger.info(f"Getting schema info for table {database}.{table}")
# Get table schema
cursor.execute(f"DESCRIBE `{table}`")
columns = cursor.fetchall()
# Get create table statement
cursor.execute(f"SHOW CREATE TABLE `{table}`")
create_table_result = cursor.fetchone()['Create Table']
# Get table comment if available (extracted from create table statement)
table_comment = None
if "COMMENT=" in create_table_result:
comment_parts = create_table_result.split("COMMENT=")
if len(comment_parts) > 1:
table_comment = comment_parts[1].split("'")[1]
tables.append({
"database": database,
"name": table,
"comment": table_comment,
"columns": columns,
"create_table_query": create_table_result,
})
logger.info(f"Found {len(tables)} tables")
return tables
finally:
cursor.close()
conn.close()
def execute_query_impl(query: str) -> List[Dict[str, Any]]:
"""Execute a SELECT query against Doris.
Args:
query: The SQL query to execute
Returns:
List[Dict]: The query results as a list of dictionaries
"""
conn = create_doris_client()
cursor = conn.cursor(dictionary=True)
try:
cursor.execute(query)
rows = cursor.fetchall()
logger.info(f"Query returned {len(rows)} rows")
return rows
except Exception as err:
logger.error(f"Error executing query: {err}")
return [{"error": f"Error running query: {err}"}]
finally:
cursor.close()
conn.close()
@mcp.tool()
def execute_query(query: str):
"""Run a SELECT query against Doris with timeout protection.
Args:
query: The SQL query to execute
Returns:
List[Dict]: The query results
"""
logger.info(f"Executing SELECT query: {query}")
# Basic validation to ensure it's a SELECT query
if not query.strip().upper().startswith("SELECT"):
return {"error": "Only SELECT queries are allowed for security reasons"}
future = QUERY_EXECUTOR.submit(execute_query_impl, query)
try:
result = future.result(timeout=SELECT_QUERY_TIMEOUT_SECS)
return result
except concurrent.futures.TimeoutError:
logger.warning(f"Query timed out after {SELECT_QUERY_TIMEOUT_SECS} seconds: {query}")
future.cancel()
return {"error": f"Queries taking longer than {SELECT_QUERY_TIMEOUT_SECS} seconds are currently not supported."}
```