# Directory Structure
```
├── .dockerignore
├── .editorconfig
├── .github
│ └── workflows
│ ├── ci.yaml
│ └── publish.yml
├── .gitignore
├── .python-version
├── Dockerfile
├── fastmcp.json
├── LICENSE
├── mcp_clickhouse
│ ├── __init__.py
│ ├── chdb_prompt.py
│ ├── main.py
│ ├── mcp_env.py
│ └── mcp_server.py
├── pyproject.toml
├── README.md
├── test-services
│ └── docker-compose.yaml
├── tests
│ ├── test_chdb_tool.py
│ ├── test_config_interface.py
│ ├── test_mcp_server.py
│ └── test_tool.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.10
```
--------------------------------------------------------------------------------
/.editorconfig:
--------------------------------------------------------------------------------
```
[*]
trim_trailing_whitespace = true
end_of_line = lf
```
--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------
```
# Git
.git/
.gitignore
.github/
# Documentation
LICENSE
# README.md is needed for package build
# *.md files may be needed for package metadata
# Development environment
.venv/
.env
.envrc
.python-version
# IDE and editor files
.vscode/
.idea/
*.swp
*.swo
*~
# Cache directories
.ruff_cache/
.pytest_cache/
__pycache__/
*.pyc
*.pyo
*.pyd
.mypy_cache/
.dmypy.json
dmypy.json
# Test files and directories
tests/
test-services/
.coverage
.coverage.*
htmlcov/
.tox/
.nox/
coverage.xml
*.cover
*.py,cover
.hypothesis/
cover/
# Build artifacts
build/
dist/
*.egg-info/
.eggs/
*.egg
MANIFEST
# Temporary files
*.tmp
*.temp
*.log
.DS_Store
Thumbs.db
# Docker files (avoid copying Dockerfile into itself)
Dockerfile
.dockerignore
# Editor config (not needed at runtime)
.editorconfig
# Claude AI files
.claude/
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
.envrc
.ruff_cache/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# PyPI configuration file
.pypirc
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# ClickHouse MCP Server
[](https://pypi.org/project/mcp-clickhouse)
An MCP server for ClickHouse.
<a href="https://glama.ai/mcp/servers/yvjy4csvo1"><img width="380" height="200" src="https://glama.ai/mcp/servers/yvjy4csvo1/badge" alt="mcp-clickhouse MCP server" /></a>
## Features
### ClickHouse Tools
* `run_select_query`
* Execute SQL queries on your ClickHouse cluster.
* Input: `sql` (string): The SQL query to execute.
* All ClickHouse queries are run with `readonly = 1` to ensure they are safe.
* `list_databases`
* List all databases on your ClickHouse cluster.
* `list_tables`
* List all tables in a database.
* Input: `database` (string): The name of the database.
### chDB Tools
* `run_chdb_select_query`
* Execute SQL queries using [chDB](https://github.com/chdb-io/chdb)'s embedded ClickHouse engine.
* Input: `sql` (string): The SQL query to execute.
* Query data directly from various sources (files, URLs, databases) without ETL processes.
### Health Check Endpoint
When running with HTTP or SSE transport, a health check endpoint is available at `/health`. This endpoint:
- Returns `200 OK` with the ClickHouse version if the server is healthy and can connect to ClickHouse
- Returns `503 Service Unavailable` if the server cannot connect to ClickHouse
Example:
```bash
curl http://localhost:8000/health
# Response: OK - Connected to ClickHouse 24.3.1
```
## Configuration
This MCP server supports both ClickHouse and chDB. You can enable either or both depending on your needs.
1. Open the Claude Desktop configuration file located at:
* On macOS: `~/Library/Application Support/Claude/claude_desktop_config.json`
* On Windows: `%APPDATA%/Claude/claude_desktop_config.json`
2. Add the following:
```json
{
"mcpServers": {
"mcp-clickhouse": {
"command": "uv",
"args": [
"run",
"--with",
"mcp-clickhouse",
"--python",
"3.10",
"mcp-clickhouse"
],
"env": {
"CLICKHOUSE_HOST": "<clickhouse-host>",
"CLICKHOUSE_PORT": "<clickhouse-port>",
"CLICKHOUSE_USER": "<clickhouse-user>",
"CLICKHOUSE_PASSWORD": "<clickhouse-password>",
"CLICKHOUSE_SECURE": "true",
"CLICKHOUSE_VERIFY": "true",
"CLICKHOUSE_CONNECT_TIMEOUT": "30",
"CLICKHOUSE_SEND_RECEIVE_TIMEOUT": "30"
}
}
}
}
```
Update the environment variables to point to your own ClickHouse service.
Or, if you'd like to try it out with the [ClickHouse SQL Playground](https://sql.clickhouse.com/), you can use the following config:
```json
{
"mcpServers": {
"mcp-clickhouse": {
"command": "uv",
"args": [
"run",
"--with",
"mcp-clickhouse",
"--python",
"3.10",
"mcp-clickhouse"
],
"env": {
"CLICKHOUSE_HOST": "sql-clickhouse.clickhouse.com",
"CLICKHOUSE_PORT": "8443",
"CLICKHOUSE_USER": "demo",
"CLICKHOUSE_PASSWORD": "",
"CLICKHOUSE_SECURE": "true",
"CLICKHOUSE_VERIFY": "true",
"CLICKHOUSE_CONNECT_TIMEOUT": "30",
"CLICKHOUSE_SEND_RECEIVE_TIMEOUT": "30"
}
}
}
}
```
For chDB (embedded ClickHouse engine), add the following configuration:
```json
{
"mcpServers": {
"mcp-clickhouse": {
"command": "uv",
"args": [
"run",
"--with",
"mcp-clickhouse",
"--python",
"3.10",
"mcp-clickhouse"
],
"env": {
"CHDB_ENABLED": "true",
"CLICKHOUSE_ENABLED": "false",
"CHDB_DATA_PATH": "/path/to/chdb/data"
}
}
}
}
```
You can also enable both ClickHouse and chDB simultaneously:
```json
{
"mcpServers": {
"mcp-clickhouse": {
"command": "uv",
"args": [
"run",
"--with",
"mcp-clickhouse",
"--python",
"3.10",
"mcp-clickhouse"
],
"env": {
"CLICKHOUSE_HOST": "<clickhouse-host>",
"CLICKHOUSE_PORT": "<clickhouse-port>",
"CLICKHOUSE_USER": "<clickhouse-user>",
"CLICKHOUSE_PASSWORD": "<clickhouse-password>",
"CLICKHOUSE_SECURE": "true",
"CLICKHOUSE_VERIFY": "true",
"CLICKHOUSE_CONNECT_TIMEOUT": "30",
"CLICKHOUSE_SEND_RECEIVE_TIMEOUT": "30",
"CHDB_ENABLED": "true",
"CHDB_DATA_PATH": "/path/to/chdb/data"
}
}
}
}
```
3. Locate the command entry for `uv` and replace it with the absolute path to the `uv` executable. This ensures that the correct version of `uv` is used when starting the server. On a mac, you can find this path using `which uv`.
4. Restart Claude Desktop to apply the changes.
### Running Without uv (Using System Python)
If you prefer to use the system Python installation instead of uv, you can install the package from PyPI and run it directly:
1. Install the package using pip:
```bash
python3 -m pip install mcp-clickhouse
```
To upgrade to the latest version:
```bash
python3 -m pip install --upgrade mcp-clickhouse
```
2. Update your Claude Desktop configuration to use Python directly:
```json
{
"mcpServers": {
"mcp-clickhouse": {
"command": "python3",
"args": [
"-m",
"mcp_clickhouse.main"
],
"env": {
"CLICKHOUSE_HOST": "<clickhouse-host>",
"CLICKHOUSE_PORT": "<clickhouse-port>",
"CLICKHOUSE_USER": "<clickhouse-user>",
"CLICKHOUSE_PASSWORD": "<clickhouse-password>",
"CLICKHOUSE_SECURE": "true",
"CLICKHOUSE_VERIFY": "true",
"CLICKHOUSE_CONNECT_TIMEOUT": "30",
"CLICKHOUSE_SEND_RECEIVE_TIMEOUT": "30"
}
}
}
}
```
Alternatively, you can use the installed script directly:
```json
{
"mcpServers": {
"mcp-clickhouse": {
"command": "mcp-clickhouse",
"env": {
"CLICKHOUSE_HOST": "<clickhouse-host>",
"CLICKHOUSE_PORT": "<clickhouse-port>",
"CLICKHOUSE_USER": "<clickhouse-user>",
"CLICKHOUSE_PASSWORD": "<clickhouse-password>",
"CLICKHOUSE_SECURE": "true",
"CLICKHOUSE_VERIFY": "true",
"CLICKHOUSE_CONNECT_TIMEOUT": "30",
"CLICKHOUSE_SEND_RECEIVE_TIMEOUT": "30"
}
}
}
}
```
Note: Make sure to use the full path to the Python executable or the `mcp-clickhouse` script if they are not in your system PATH. You can find the paths using:
- `which python3` for the Python executable
- `which mcp-clickhouse` for the installed script
## Development
1. In `test-services` directory run `docker compose up -d` to start the ClickHouse cluster.
2. Add the following variables to a `.env` file in the root of the repository.
*Note: The use of the `default` user in this context is intended solely for local development purposes.*
```bash
CLICKHOUSE_HOST=localhost
CLICKHOUSE_PORT=8123
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=clickhouse
```
3. Run `uv sync` to install the dependencies. To install `uv` follow the instructions [here](https://docs.astral.sh/uv/). Then do `source .venv/bin/activate`.
4. For easy testing with the MCP Inspector, run `fastmcp dev mcp_clickhouse/mcp_server.py` to start the MCP server.
5. To test with HTTP transport and the health check endpoint:
```bash
# Using default port 8000
CLICKHOUSE_MCP_SERVER_TRANSPORT=http python -m mcp_clickhouse.main
# Or with a custom port
CLICKHOUSE_MCP_SERVER_TRANSPORT=http CLICKHOUSE_MCP_BIND_PORT=4200 python -m mcp_clickhouse.main
# Then in another terminal:
curl http://localhost:8000/health # or http://localhost:4200/health for custom port
```
### Environment Variables
The following environment variables are used to configure the ClickHouse and chDB connections:
#### ClickHouse Variables
##### Required Variables
* `CLICKHOUSE_HOST`: The hostname of your ClickHouse server
* `CLICKHOUSE_USER`: The username for authentication
* `CLICKHOUSE_PASSWORD`: The password for authentication
> [!CAUTION]
> It is important to treat your MCP database user as you would any external client connecting to your database, granting only the minimum necessary privileges required for its operation. The use of default or administrative users should be strictly avoided at all times.
##### Optional Variables
* `CLICKHOUSE_PORT`: The port number of your ClickHouse server
* Default: `8443` if HTTPS is enabled, `8123` if disabled
* Usually doesn't need to be set unless using a non-standard port
* `CLICKHOUSE_SECURE`: Enable/disable HTTPS connection
* Default: `"true"`
* Set to `"false"` for non-secure connections
* `CLICKHOUSE_VERIFY`: Enable/disable SSL certificate verification
* Default: `"true"`
* Set to `"false"` to disable certificate verification (not recommended for production)
* TLS certificates: The package uses your operating system trust store for TLS certificate verification via `truststore`. We call `truststore.inject_into_ssl()` at startup to ensure proper certificate handling. Python’s default SSL behavior is used as a fallback only if an unexpected error occurs.
* `CLICKHOUSE_CONNECT_TIMEOUT`: Connection timeout in seconds
* Default: `"30"`
* Increase this value if you experience connection timeouts
* `CLICKHOUSE_SEND_RECEIVE_TIMEOUT`: Send/receive timeout in seconds
* Default: `"300"`
* Increase this value for long-running queries
* `CLICKHOUSE_DATABASE`: Default database to use
* Default: None (uses server default)
* Set this to automatically connect to a specific database
* `CLICKHOUSE_MCP_SERVER_TRANSPORT`: Sets the transport method for the MCP server.
* Default: `"stdio"`
* Valid options: `"stdio"`, `"http"`, `"sse"`. This is useful for local development with tools like MCP Inspector.
* `CLICKHOUSE_MCP_BIND_HOST`: Host to bind the MCP server to when using HTTP or SSE transport
* Default: `"127.0.0.1"`
* Set to `"0.0.0.0"` to bind to all network interfaces (useful for Docker or remote access)
* Only used when transport is `"http"` or `"sse"`
* `CLICKHOUSE_MCP_BIND_PORT`: Port to bind the MCP server to when using HTTP or SSE transport
* Default: `"8000"`
* Only used when transport is `"http"` or `"sse"`
* `CLICKHOUSE_MCP_QUERY_TIMEOUT`: Timeout in seconds for SELECT tools
* Default: `"30"`
* Increase this if you see `Query timed out after ...` errors for heavy queries
* `CLICKHOUSE_ENABLED`: Enable/disable ClickHouse functionality
* Default: `"true"`
* Set to `"false"` to disable ClickHouse tools when using chDB only
#### chDB Variables
* `CHDB_ENABLED`: Enable/disable chDB functionality
* Default: `"false"`
* Set to `"true"` to enable chDB tools
* `CHDB_DATA_PATH`: The path to the chDB data directory
* Default: `":memory:"` (in-memory database)
* Use `:memory:` for in-memory database
* Use a file path for persistent storage (e.g., `/path/to/chdb/data`)
#### Example Configurations
For local development with Docker:
```env
# Required variables
CLICKHOUSE_HOST=localhost
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=clickhouse
# Optional: Override defaults for local development
CLICKHOUSE_SECURE=false # Uses port 8123 automatically
CLICKHOUSE_VERIFY=false
```
For ClickHouse Cloud:
```env
# Required variables
CLICKHOUSE_HOST=your-instance.clickhouse.cloud
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=your-password
# Optional: These use secure defaults
# CLICKHOUSE_SECURE=true # Uses port 8443 automatically
# CLICKHOUSE_DATABASE=your_database
```
For ClickHouse SQL Playground:
```env
CLICKHOUSE_HOST=sql-clickhouse.clickhouse.com
CLICKHOUSE_USER=demo
CLICKHOUSE_PASSWORD=
# Uses secure defaults (HTTPS on port 8443)
```
For chDB only (in-memory):
```env
# chDB configuration
CHDB_ENABLED=true
CLICKHOUSE_ENABLED=false
# CHDB_DATA_PATH defaults to :memory:
```
For chDB with persistent storage:
```env
# chDB configuration
CHDB_ENABLED=true
CLICKHOUSE_ENABLED=false
CHDB_DATA_PATH=/path/to/chdb/data
```
For MCP Inspector or remote access with HTTP transport:
```env
CLICKHOUSE_HOST=localhost
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=clickhouse
CLICKHOUSE_MCP_SERVER_TRANSPORT=http
CLICKHOUSE_MCP_BIND_HOST=0.0.0.0 # Bind to all interfaces
CLICKHOUSE_MCP_BIND_PORT=4200 # Custom port (default: 8000)
```
When using HTTP transport, the server will run on the configured port (default 8000). For example, with the above configuration:
- MCP endpoint: `http://localhost:4200/mcp`
- Health check: `http://localhost:4200/health`
You can set these variables in your environment, in a `.env` file, or in the Claude Desktop configuration:
```json
{
"mcpServers": {
"mcp-clickhouse": {
"command": "uv",
"args": [
"run",
"--with",
"mcp-clickhouse",
"--python",
"3.10",
"mcp-clickhouse"
],
"env": {
"CLICKHOUSE_HOST": "<clickhouse-host>",
"CLICKHOUSE_USER": "<clickhouse-user>",
"CLICKHOUSE_PASSWORD": "<clickhouse-password>",
"CLICKHOUSE_DATABASE": "<optional-database>",
"CLICKHOUSE_MCP_SERVER_TRANSPORT": "stdio",
"CLICKHOUSE_MCP_BIND_HOST": "127.0.0.1",
"CLICKHOUSE_MCP_BIND_PORT": "8000"
}
}
}
}
```
Note: The bind host and port settings are only used when transport is set to "http" or "sse".
### Running tests
```bash
uv sync --all-extras --dev # install dev dependencies
uv run ruff check . # run linting
docker compose up -d test_services # start ClickHouse
uv run pytest -v tests
uv run pytest -v tests/test_tool.py # ClickHouse only
uv run pytest -v tests/test_chdb_tool.py # chDB only
```
## YouTube Overview
[](https://www.youtube.com/watch?v=y9biAm_Fkqw)
```
--------------------------------------------------------------------------------
/fastmcp.json:
--------------------------------------------------------------------------------
```json
{
"$schema": "https://gofastmcp.com/public/schemas/fastmcp.json/v1.json",
"source": {
"path": "mcp_clickhouse/mcp_server.py",
"entrypoint": "mcp"
},
"environment": {
"dependencies": [
"clickhouse-connect",
"python-dotenv",
"truststore",
"chdb"
]
}
}
```
--------------------------------------------------------------------------------
/test-services/docker-compose.yaml:
--------------------------------------------------------------------------------
```yaml
services:
clickhouse:
image: clickhouse/clickhouse-server:latest
ports:
- "8123:8123"
- "9000:9000"
volumes:
- clickhouse-data:/var/lib/clickhouse
environment:
- CLICKHOUSE_USER=default
- CLICKHOUSE_PASSWORD=clickhouse
- CLICKHOUSE_DB=default
volumes:
clickhouse-data:
```
--------------------------------------------------------------------------------
/.github/workflows/publish.yml:
--------------------------------------------------------------------------------
```yaml
on:
workflow_dispatch:
jobs:
publish:
name: Upload release to PyPI
runs-on: ubuntu-latest
environment:
name: pypi
url: "https://pypi.org/p/mcp-clickhouse"
permissions:
id-token: write
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v5
- run: uv python install
- run: uv build
- uses: pypa/gh-action-pypi-publish@release/v1
```
--------------------------------------------------------------------------------
/mcp_clickhouse/__init__.py:
--------------------------------------------------------------------------------
```python
import os
from .mcp_server import (
create_clickhouse_client,
list_databases,
list_tables,
run_select_query,
create_chdb_client,
run_chdb_select_query,
chdb_initial_prompt,
)
if os.getenv("MCP_CLICKHOUSE_TRUSTSTORE_DISABLE", None) != "1":
try:
import truststore
truststore.inject_into_ssl()
except Exception:
pass
__all__ = [
"list_databases",
"list_tables",
"run_select_query",
"create_clickhouse_client",
"create_chdb_client",
"run_chdb_select_query",
"chdb_initial_prompt",
]
```
--------------------------------------------------------------------------------
/mcp_clickhouse/main.py:
--------------------------------------------------------------------------------
```python
from .mcp_server import mcp
from .mcp_env import get_mcp_config, TransportType
def main():
mcp_config = get_mcp_config()
transport = mcp_config.server_transport
# For HTTP and SSE transports, we need to specify host and port
http_transports = [TransportType.HTTP.value, TransportType.SSE.value]
if transport in http_transports:
# Use the configured bind host (defaults to 127.0.0.1, can be set to 0.0.0.0)
# and bind port (defaults to 8000)
mcp.run(transport=transport, host=mcp_config.bind_host, port=mcp_config.bind_port)
else:
# For stdio transport, no host or port is needed
mcp.run(transport=transport)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "mcp-clickhouse"
version = "0.1.12"
description = "An MCP server for ClickHouse."
readme = "README.md"
license = "Apache-2.0"
license-files = ["LICENSE"]
requires-python = ">=3.10"
dependencies = [
"fastmcp>=2.0.0",
"python-dotenv>=1.0.1",
"clickhouse-connect>=0.8.16",
"truststore>=0.10",
"chdb>=3.3.0",
]
[project.scripts]
mcp-clickhouse = "mcp_clickhouse.main:main"
[project.urls]
Home = "https://github.com/ClickHouse/mcp-clickhouse"
[project.optional-dependencies]
dev = [
"ruff",
"pytest",
"pytest-asyncio"
]
[tool.hatch.build.targets.wheel]
packages = ["mcp_clickhouse"]
[tool.ruff]
line-length = 100
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
# Build stage - Use a Python image with uv pre-installed
FROM ghcr.io/astral-sh/uv:python3.13-bookworm-slim AS builder
# Install the project into `/app`
WORKDIR /app
# Enable bytecode compilation
ENV UV_COMPILE_BYTECODE=1
# Copy from the cache instead of linking since it's a mounted volume
ENV UV_LINK_MODE=copy
# Install git and build dependencies for ClickHouse client
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
--mount=type=cache,target=/var/lib/apt,sharing=locked \
apt-get update && apt-get install -y --no-install-recommends git build-essential
# Install the project's dependencies using the lockfile and settings
RUN --mount=type=cache,target=/root/.cache/uv \
--mount=type=bind,source=uv.lock,target=uv.lock \
--mount=type=bind,source=pyproject.toml,target=pyproject.toml \
--mount=type=bind,source=README.md,target=README.md \
uv sync --locked --no-install-project --no-dev
# Then, add the rest of the project source code and install it
# Installing separately from its dependencies allows optimal layer caching
COPY . /app
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --locked --no-dev --no-editable
# Production stage - Use minimal Python image
FROM python:3.13-slim-bookworm
# Set the working directory
WORKDIR /app
# Copy the virtual environment from the builder stage
COPY --from=builder /app/.venv /app/.venv
# Place executables in the environment at the front of the path
ENV PATH="/app/.venv/bin:$PATH"
# Run the MCP ClickHouse server by default
CMD ["python", "-m", "mcp_clickhouse.main"]
```
--------------------------------------------------------------------------------
/tests/test_chdb_tool.py:
--------------------------------------------------------------------------------
```python
import unittest
from dotenv import load_dotenv
from mcp_clickhouse import create_chdb_client, run_chdb_select_query
load_dotenv()
class TestChDBTools(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Set up the environment before chDB tests."""
cls.client = create_chdb_client()
def test_run_chdb_select_query_simple(self):
"""Test running a simple SELECT query in chDB."""
query = "SELECT 1 as test_value"
result = run_chdb_select_query(query)
self.assertIsInstance(result, list)
self.assertIn("test_value", str(result))
def test_run_chdb_select_query_with_url_table_function(self):
"""Test running a SELECT query with url table function in chDB."""
query = "SELECT COUNT(1) FROM url('https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_0.parquet', 'Parquet')"
result = run_chdb_select_query(query)
print(result)
self.assertIsInstance(result, list)
self.assertIn("1000000", str(result))
def test_run_chdb_select_query_failure(self):
"""Test running a SELECT query with an error in chDB."""
query = "SELECT * FROM non_existent_table_chDB"
result = run_chdb_select_query(query)
print(result)
self.assertIsInstance(result, dict)
self.assertEqual(result["status"], "error")
self.assertIn("message", result)
def test_run_chdb_select_query_empty_result(self):
"""Test running a SELECT query that returns empty result in chDB."""
query = "SELECT 1 WHERE 1 = 0"
result = run_chdb_select_query(query)
print(result)
self.assertIsInstance(result, list)
self.assertEqual(len(result), 0)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/.github/workflows/ci.yaml:
--------------------------------------------------------------------------------
```yaml
name: CI
on:
push:
branches:
- main
pull_request:
jobs:
test:
runs-on: ubuntu-latest
services:
clickhouse:
image: clickhouse/clickhouse-server:24.10
ports:
- 9000:9000
- 8123:8123
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v4
- name: Setup Python
run: uv python install 3.13
- name: Install Project
run: uv sync --all-extras --dev
- name: Run tests
env:
CLICKHOUSE_HOST: "localhost"
CLICKHOUSE_PORT: "8123"
CLICKHOUSE_USER: "default"
CLICKHOUSE_PASSWORD: ""
CLICKHOUSE_SECURE: "false"
CLICKHOUSE_VERIFY: "false"
CHDB_ENABLED: "true"
run: |
uv run pytest tests
- name: Lint with Ruff
run: uv run ruff check .
docker-build:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build Docker image
uses: docker/build-push-action@v5
with:
context: .
push: false
load: true
tags: mcp-clickhouse:test
cache-from: type=gha
cache-to: type=gha,mode=max
- name: Test Docker image import
run: |
docker run --rm mcp-clickhouse:test python -c "import mcp_clickhouse; print('✅ MCP ClickHouse Docker image works!')"
- name: Test Docker image default command
run: |
timeout 10s docker run --rm \
-e CLICKHOUSE_HOST=localhost \
-e CLICKHOUSE_USER=default \
-e CLICKHOUSE_PASSWORD="" \
mcp-clickhouse:test || [ $? = 124 ] && echo "✅ Docker container starts successfully"
```
--------------------------------------------------------------------------------
/tests/test_config_interface.py:
--------------------------------------------------------------------------------
```python
import pytest
from mcp_clickhouse.mcp_env import ClickHouseConfig
def test_interface_http_when_secure_false(monkeypatch: pytest.MonkeyPatch):
"""Test that interface is set to 'http' when CLICKHOUSE_SECURE=false."""
monkeypatch.setenv("CLICKHOUSE_HOST", "localhost")
monkeypatch.setenv("CLICKHOUSE_USER", "test")
monkeypatch.setenv("CLICKHOUSE_PASSWORD", "test")
monkeypatch.setenv("CLICKHOUSE_SECURE", "false")
monkeypatch.setenv("CLICKHOUSE_PORT", "8123")
config = ClickHouseConfig()
client_config = config.get_client_config()
assert client_config["interface"] == "http"
assert client_config["secure"] is False
assert client_config["port"] == 8123
def test_interface_https_when_secure_true(monkeypatch: pytest.MonkeyPatch):
"""Test that interface is set to 'https' when CLICKHOUSE_SECURE=true."""
monkeypatch.setenv("CLICKHOUSE_HOST", "example.com")
monkeypatch.setenv("CLICKHOUSE_USER", "test")
monkeypatch.setenv("CLICKHOUSE_PASSWORD", "test")
monkeypatch.setenv("CLICKHOUSE_SECURE", "true")
monkeypatch.setenv("CLICKHOUSE_PORT", "8443")
config = ClickHouseConfig()
client_config = config.get_client_config()
assert client_config["interface"] == "https"
assert client_config["secure"] is True
assert client_config["port"] == 8443
def test_interface_https_by_default(monkeypatch: pytest.MonkeyPatch):
"""Test that interface defaults to 'https' when CLICKHOUSE_SECURE is not set."""
monkeypatch.setenv("CLICKHOUSE_HOST", "example.com")
monkeypatch.setenv("CLICKHOUSE_USER", "test")
monkeypatch.setenv("CLICKHOUSE_PASSWORD", "test")
monkeypatch.delenv("CLICKHOUSE_SECURE", raising=False)
monkeypatch.delenv("CLICKHOUSE_PORT", raising=False)
config = ClickHouseConfig()
client_config = config.get_client_config()
assert client_config["interface"] == "https"
assert client_config["secure"] is True
assert client_config["port"] == 8443
def test_interface_http_with_custom_port(monkeypatch: pytest.MonkeyPatch):
"""Test that interface is 'http' with custom port when CLICKHOUSE_SECURE=false."""
monkeypatch.setenv("CLICKHOUSE_HOST", "localhost")
monkeypatch.setenv("CLICKHOUSE_USER", "test")
monkeypatch.setenv("CLICKHOUSE_PASSWORD", "test")
monkeypatch.setenv("CLICKHOUSE_SECURE", "false")
monkeypatch.setenv("CLICKHOUSE_PORT", "9000")
config = ClickHouseConfig()
client_config = config.get_client_config()
assert client_config["interface"] == "http"
assert client_config["secure"] is False
assert client_config["port"] == 9000
def test_interface_https_with_custom_port(monkeypatch: pytest.MonkeyPatch):
"""Test that interface is 'https' with custom port when CLICKHOUSE_SECURE=true."""
monkeypatch.setenv("CLICKHOUSE_HOST", "example.com")
monkeypatch.setenv("CLICKHOUSE_USER", "test")
monkeypatch.setenv("CLICKHOUSE_PASSWORD", "test")
monkeypatch.setenv("CLICKHOUSE_SECURE", "true")
monkeypatch.setenv("CLICKHOUSE_PORT", "9443")
config = ClickHouseConfig()
client_config = config.get_client_config()
assert client_config["interface"] == "https"
assert client_config["secure"] is True
assert client_config["port"] == 9443
```
--------------------------------------------------------------------------------
/tests/test_tool.py:
--------------------------------------------------------------------------------
```python
import unittest
import json
from dotenv import load_dotenv
from fastmcp.exceptions import ToolError
from mcp_clickhouse import create_clickhouse_client, list_databases, list_tables, run_select_query
load_dotenv()
class TestClickhouseTools(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Set up the environment before tests."""
cls.client = create_clickhouse_client()
# Prepare test database and table
cls.test_db = "test_tool_db"
cls.test_table = "test_table"
cls.client.command(f"CREATE DATABASE IF NOT EXISTS {cls.test_db}")
# Drop table if exists to ensure clean state
cls.client.command(f"DROP TABLE IF EXISTS {cls.test_db}.{cls.test_table}")
# Create table with comments
cls.client.command(f"""
CREATE TABLE {cls.test_db}.{cls.test_table} (
id UInt32 COMMENT 'Primary identifier',
name String COMMENT 'User name field'
) ENGINE = MergeTree()
ORDER BY id
COMMENT 'Test table for unit testing'
""")
cls.client.command(f"""
INSERT INTO {cls.test_db}.{cls.test_table} (id, name) VALUES (1, 'Alice'), (2, 'Bob')
""")
@classmethod
def tearDownClass(cls):
"""Clean up the environment after tests."""
cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}")
def test_list_databases(self):
"""Test listing databases."""
result = list_databases()
# Parse JSON response
databases = json.loads(result)
self.assertIn(self.test_db, databases)
def test_list_tables_without_like(self):
"""Test listing tables without a 'LIKE' filter."""
result = list_tables(self.test_db)
self.assertIsInstance(result, list)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["name"], self.test_table)
def test_list_tables_with_like(self):
"""Test listing tables with a 'LIKE' filter."""
result = list_tables(self.test_db, like=f"{self.test_table}%")
self.assertIsInstance(result, list)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["name"], self.test_table)
def test_run_select_query_success(self):
"""Test running a SELECT query successfully."""
query = f"SELECT * FROM {self.test_db}.{self.test_table}"
result = run_select_query(query)
self.assertIsInstance(result, dict)
self.assertEqual(len(result["rows"]), 2)
self.assertEqual(result["rows"][0][0], 1)
self.assertEqual(result["rows"][0][1], "Alice")
def test_run_select_query_failure(self):
"""Test running a SELECT query with an error."""
query = f"SELECT * FROM {self.test_db}.non_existent_table"
# Should raise ToolError
with self.assertRaises(ToolError) as context:
run_select_query(query)
self.assertIn("Query execution failed", str(context.exception))
def test_table_and_column_comments(self):
"""Test that table and column comments are correctly retrieved."""
result = list_tables(self.test_db)
self.assertIsInstance(result, list)
self.assertEqual(len(result), 1)
table_info = result[0]
# Verify table comment
self.assertEqual(table_info["comment"], "Test table for unit testing")
# Get columns by name for easier testing
columns = {col["name"]: col for col in table_info["columns"]}
# Verify column comments
self.assertEqual(columns["id"]["comment"], "Primary identifier")
self.assertEqual(columns["name"]["comment"], "User name field")
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/mcp_clickhouse/chdb_prompt.py:
--------------------------------------------------------------------------------
```python
"""chDB prompts for MCP server."""
CHDB_PROMPT = """
# chDB MCP System Prompt
## Available Tools
- **run_chdb_select_query**: Execute SELECT queries using chDB's table functions
## Core Principles
You are a chDB assistant, specialized in helping users query data sources directly through table functions, **avoiding data imports**.
### 🚨 Important Constraints
#### Data Processing Constraints
- **No large data display**: Don't show more than 10 rows of raw data in responses
- **Use analysis tool**: All data processing must be completed in the analysis tool
- **Result-oriented output**: Only provide query results and key insights, not intermediate processing data
- **Avoid context explosion**: Don't paste large amounts of raw data or complete tables
#### Query Strategy Constraints
- **Prioritize table functions**: When users mention import/load/insert, immediately recommend table functions
- **Direct querying**: All data should be queried in place through table functions
- **Fallback option**: When no suitable table function exists, use Python to download temporary files then process with file()
- **Concise responses**: Avoid lengthy explanations, provide executable SQL directly
## Table Functions
### File Types
```sql
-- Local files (auto format detection)
file('path/to/file.csv')
file('data.parquet', 'Parquet')
-- Remote files
url('https://example.com/data.csv', 'CSV')
url('https://example.com/data.parquet')
-- S3 storage
s3('s3://bucket/path/file.csv', 'CSV')
s3('s3://bucket/path/*.parquet', 'access_key', 'secret_key', 'Parquet')
-- HDFS
hdfs('hdfs://namenode:9000/path/file.parquet')
```
### Database Types
```sql
-- PostgreSQL
postgresql('host:port', 'database', 'table', 'user', 'password')
-- MySQL
mysql('host:port', 'database', 'table', 'user', 'password')
-- SQLite
sqlite('path/to/database.db', 'table')
```
### Common Formats
- `CSV`, `CSVWithNames`, `TSV`, `TSVWithNames`
- `JSON`, `JSONEachRow`, `JSONCompact`
- `Parquet`, `ORC`, `Avro`
## Workflow
### 1. Identify Data Source
- User mentions URL → `url()`
- User mentions S3 → `s3()`
- User mentions local file → `file()`
- User mentions database → corresponding database function
- **No suitable table function** → Use Python to download as temporary file
### 2. Fallback: Python Download
When no suitable table function exists:
```python
# Execute in analysis tool
import requests
import tempfile
import os
# Download data to temporary file
response = requests.get('your_data_url')
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
f.write(response.text)
temp_file = f.name
# Execute chDB query immediately within the block
try:
# Use run_chdb_select_query to execute query
result = run_chdb_select_query(f"SELECT * FROM file('{temp_file}', 'CSV') LIMIT 10")
print(result)
finally:
# Ensure temporary file deletion
if os.path.exists(temp_file):
os.unlink(temp_file)
```
### 3. Quick Testing
```sql
-- Test connection (default LIMIT 10)
SELECT * FROM table_function(...) LIMIT 10;
-- View structure
DESCRIBE table_function(...);
```
### 4. Build Queries
```sql
-- Basic query (default LIMIT 10)
SELECT column1, column2 FROM table_function(...) WHERE condition LIMIT 10;
-- Aggregation analysis
SELECT category, COUNT(*), AVG(price)
FROM table_function(...)
GROUP BY category
LIMIT 10;
-- Multi-source join
SELECT a.id, b.name
FROM file('data1.csv') a
JOIN url('https://example.com/data2.csv', 'CSV') b ON a.id = b.id
LIMIT 10;
```
## Response Patterns
### When Users Ask About Data Import
1. **Immediate stop**: "No need to import data, chDB can query directly"
2. **Recommend solution**: Provide corresponding table function based on data source type
3. **Fallback option**: If no suitable table function, explain using Python to download temporary file
4. **Provide examples**: Give specific SQL statements
5. **Follow constraints**: Complete all data processing in analysis tool, only output key results
### Example Dialogues
```
User: "How to import this CSV file into chDB?"
Assistant: "No need to import! Query directly:
SELECT * FROM file('your_file.csv') LIMIT 10;
What analysis do you want?"
User: "This API endpoint doesn't have direct table function support"
Assistant: "I'll use Python to download data to a temporary file, then query with file().
Let me process the data in the analysis tool first..."
```
## Output Constraints
- **Avoid**: Displaying large amounts of raw data, complete tables, intermediate processing steps
- **Recommend**: Concise statistical summaries, key insights, executable SQL
- **Interaction**: Provide overview first, ask for specific needs before deep analysis
## Optimization Tips
- Use WHERE filtering to reduce data transfer
- SELECT specific columns to avoid full table scans
- **Default use LIMIT 10** to prevent large data output
- Test connection with LIMIT 1 for large datasets first
"""
```
--------------------------------------------------------------------------------
/mcp_clickhouse/mcp_env.py:
--------------------------------------------------------------------------------
```python
"""Environment configuration for the MCP ClickHouse server.
This module handles all environment variable configuration with sensible defaults
and type conversion.
"""
from dataclasses import dataclass
import os
from typing import Optional
from enum import Enum
class TransportType(str, Enum):
"""Supported MCP server transport types."""
STDIO = "stdio"
HTTP = "http"
SSE = "sse"
@classmethod
def values(cls) -> list[str]:
"""Get all valid transport values."""
return [transport.value for transport in cls]
@dataclass
class ClickHouseConfig:
"""Configuration for ClickHouse connection settings.
This class handles all environment variable configuration with sensible defaults
and type conversion. It provides typed methods for accessing each configuration value.
Required environment variables (only when CLICKHOUSE_ENABLED=true):
CLICKHOUSE_HOST: The hostname of the ClickHouse server
CLICKHOUSE_USER: The username for authentication
CLICKHOUSE_PASSWORD: The password for authentication
Optional environment variables (with defaults):
CLICKHOUSE_PORT: The port number (default: 8443 if secure=True, 8123 if secure=False)
CLICKHOUSE_SECURE: Enable HTTPS (default: true)
CLICKHOUSE_VERIFY: Verify SSL certificates (default: true)
CLICKHOUSE_CONNECT_TIMEOUT: Connection timeout in seconds (default: 30)
CLICKHOUSE_SEND_RECEIVE_TIMEOUT: Send/receive timeout in seconds (default: 300)
CLICKHOUSE_DATABASE: Default database to use (default: None)
CLICKHOUSE_PROXY_PATH: Path to be added to the host URL. For instance, for servers behind an HTTP proxy (default: None)
CLICKHOUSE_ENABLED: Enable ClickHouse server (default: true)
"""
def __init__(self):
"""Initialize the configuration from environment variables."""
if self.enabled:
self._validate_required_vars()
@property
def enabled(self) -> bool:
"""Get whether ClickHouse server is enabled.
Default: True
"""
return os.getenv("CLICKHOUSE_ENABLED", "true").lower() == "true"
@property
def host(self) -> str:
"""Get the ClickHouse host."""
return os.environ["CLICKHOUSE_HOST"]
@property
def port(self) -> int:
"""Get the ClickHouse port.
Defaults to 8443 if secure=True, 8123 if secure=False.
Can be overridden by CLICKHOUSE_PORT environment variable.
"""
if "CLICKHOUSE_PORT" in os.environ:
return int(os.environ["CLICKHOUSE_PORT"])
return 8443 if self.secure else 8123
@property
def username(self) -> str:
"""Get the ClickHouse username."""
return os.environ["CLICKHOUSE_USER"]
@property
def password(self) -> str:
"""Get the ClickHouse password."""
return os.environ["CLICKHOUSE_PASSWORD"]
@property
def database(self) -> Optional[str]:
"""Get the default database name if set."""
return os.getenv("CLICKHOUSE_DATABASE")
@property
def secure(self) -> bool:
"""Get whether HTTPS is enabled.
Default: True
"""
return os.getenv("CLICKHOUSE_SECURE", "true").lower() == "true"
@property
def verify(self) -> bool:
"""Get whether SSL certificate verification is enabled.
Default: True
"""
return os.getenv("CLICKHOUSE_VERIFY", "true").lower() == "true"
@property
def connect_timeout(self) -> int:
"""Get the connection timeout in seconds.
Default: 30
"""
return int(os.getenv("CLICKHOUSE_CONNECT_TIMEOUT", "30"))
@property
def send_receive_timeout(self) -> int:
"""Get the send/receive timeout in seconds.
Default: 300 (ClickHouse default)
"""
return int(os.getenv("CLICKHOUSE_SEND_RECEIVE_TIMEOUT", "300"))
@property
def proxy_path(self) -> str:
return os.getenv("CLICKHOUSE_PROXY_PATH")
def get_client_config(self) -> dict:
"""Get the configuration dictionary for clickhouse_connect client.
Returns:
dict: Configuration ready to be passed to clickhouse_connect.get_client()
"""
config = {
"host": self.host,
"port": self.port,
"username": self.username,
"password": self.password,
"interface": "https" if self.secure else "http",
"secure": self.secure,
"verify": self.verify,
"connect_timeout": self.connect_timeout,
"send_receive_timeout": self.send_receive_timeout,
"client_name": "mcp_clickhouse",
}
# Add optional database if set
if self.database:
config["database"] = self.database
if self.proxy_path:
config["proxy_path"] = self.proxy_path
return config
def _validate_required_vars(self) -> None:
"""Validate that all required environment variables are set.
Raises:
ValueError: If any required environment variable is missing.
"""
missing_vars = []
for var in ["CLICKHOUSE_HOST", "CLICKHOUSE_USER", "CLICKHOUSE_PASSWORD"]:
if var not in os.environ:
missing_vars.append(var)
if missing_vars:
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
@dataclass
class ChDBConfig:
"""Configuration for chDB connection settings.
This class handles all environment variable configuration with sensible defaults
and type conversion. It provides typed methods for accessing each configuration value.
Required environment variables:
CHDB_DATA_PATH: The path to the chDB data directory (only required if CHDB_ENABLED=true)
"""
def __init__(self):
"""Initialize the configuration from environment variables."""
if self.enabled:
self._validate_required_vars()
@property
def enabled(self) -> bool:
"""Get whether chDB is enabled.
Default: False
"""
return os.getenv("CHDB_ENABLED", "false").lower() == "true"
@property
def data_path(self) -> str:
"""Get the chDB data path."""
return os.getenv("CHDB_DATA_PATH", ":memory:")
def get_client_config(self) -> dict:
"""Get the configuration dictionary for chDB client.
Returns:
dict: Configuration ready to be passed to chDB client
"""
return {
"data_path": self.data_path,
}
def _validate_required_vars(self) -> None:
"""Validate that all required environment variables are set.
Raises:
ValueError: If any required environment variable is missing.
"""
pass
# Global instance placeholders for the singleton pattern
_CONFIG_INSTANCE = None
_CHDB_CONFIG_INSTANCE = None
def get_config():
"""
Gets the singleton instance of ClickHouseConfig.
Instantiates it on the first call.
"""
global _CONFIG_INSTANCE
if _CONFIG_INSTANCE is None:
# Instantiate the config object here, ensuring load_dotenv() has likely run
_CONFIG_INSTANCE = ClickHouseConfig()
return _CONFIG_INSTANCE
def get_chdb_config() -> ChDBConfig:
"""
Gets the singleton instance of ChDBConfig.
Instantiates it on the first call.
Returns:
ChDBConfig: The chDB configuration instance
"""
global _CHDB_CONFIG_INSTANCE
if _CHDB_CONFIG_INSTANCE is None:
_CHDB_CONFIG_INSTANCE = ChDBConfig()
return _CHDB_CONFIG_INSTANCE
@dataclass
class MCPServerConfig:
"""Configuration for MCP server-level settings.
These settings control the server transport and tool behavior and are
intentionally independent of ClickHouse connection validation.
Optional environment variables (with defaults):
CLICKHOUSE_MCP_SERVER_TRANSPORT: "stdio", "http", or "sse" (default: stdio)
CLICKHOUSE_MCP_BIND_HOST: Bind host for HTTP/SSE (default: 127.0.0.1)
CLICKHOUSE_MCP_BIND_PORT: Bind port for HTTP/SSE (default: 8000)
CLICKHOUSE_MCP_QUERY_TIMEOUT: SELECT tool timeout in seconds (default: 30)
"""
@property
def server_transport(self) -> str:
transport = os.getenv("CLICKHOUSE_MCP_SERVER_TRANSPORT", TransportType.STDIO.value).lower()
if transport not in TransportType.values():
valid_options = ", ".join(f'"{t}"' for t in TransportType.values())
raise ValueError(f"Invalid transport '{transport}'. Valid options: {valid_options}")
return transport
@property
def bind_host(self) -> str:
return os.getenv("CLICKHOUSE_MCP_BIND_HOST", "127.0.0.1")
@property
def bind_port(self) -> int:
return int(os.getenv("CLICKHOUSE_MCP_BIND_PORT", "8000"))
@property
def query_timeout(self) -> int:
return int(os.getenv("CLICKHOUSE_MCP_QUERY_TIMEOUT", "30"))
_MCP_CONFIG_INSTANCE = None
def get_mcp_config() -> MCPServerConfig:
"""Gets the singleton instance of MCPServerConfig."""
global _MCP_CONFIG_INSTANCE
if _MCP_CONFIG_INSTANCE is None:
_MCP_CONFIG_INSTANCE = MCPServerConfig()
return _MCP_CONFIG_INSTANCE
```
--------------------------------------------------------------------------------
/tests/test_mcp_server.py:
--------------------------------------------------------------------------------
```python
import pytest
import pytest_asyncio
from fastmcp import Client
from fastmcp.exceptions import ToolError
import asyncio
from mcp_clickhouse.mcp_server import mcp, create_clickhouse_client
from dotenv import load_dotenv
import json
# Load environment variables
load_dotenv()
@pytest.fixture(scope="module")
def event_loop():
"""Create an instance of the default event loop for the test session."""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest_asyncio.fixture(scope="module")
async def setup_test_database():
"""Set up test database and tables before running tests."""
client = create_clickhouse_client()
# Test database and table names
test_db = "test_mcp_db"
test_table = "test_table"
test_table2 = "another_test_table"
# Create test database
client.command(f"CREATE DATABASE IF NOT EXISTS {test_db}")
# Drop tables if they exist
client.command(f"DROP TABLE IF EXISTS {test_db}.{test_table}")
client.command(f"DROP TABLE IF EXISTS {test_db}.{test_table2}")
# Create first test table with comments
client.command(f"""
CREATE TABLE {test_db}.{test_table} (
id UInt32 COMMENT 'Primary identifier',
name String COMMENT 'User name field',
age UInt8 COMMENT 'User age',
created_at DateTime DEFAULT now() COMMENT 'Record creation timestamp'
) ENGINE = MergeTree()
ORDER BY id
COMMENT 'Test table for MCP server testing'
""")
# Create second test table
client.command(f"""
CREATE TABLE {test_db}.{test_table2} (
event_id UInt64,
event_type String,
timestamp DateTime
) ENGINE = MergeTree()
ORDER BY (event_type, timestamp)
COMMENT 'Event tracking table'
""")
# Insert test data
client.command(f"""
INSERT INTO {test_db}.{test_table} (id, name, age) VALUES
(1, 'Alice', 30),
(2, 'Bob', 25),
(3, 'Charlie', 35),
(4, 'Diana', 28)
""")
client.command(f"""
INSERT INTO {test_db}.{test_table2} (event_id, event_type, timestamp) VALUES
(1001, 'login', '2024-01-01 10:00:00'),
(1002, 'logout', '2024-01-01 11:00:00'),
(1003, 'login', '2024-01-01 12:00:00')
""")
yield test_db, test_table, test_table2
# Cleanup after tests
client.command(f"DROP DATABASE IF EXISTS {test_db}")
@pytest.fixture
def mcp_server():
"""Return the MCP server instance for testing."""
return mcp
@pytest.mark.asyncio
async def test_list_databases(mcp_server, setup_test_database):
"""Test the list_databases tool."""
test_db, _, _ = setup_test_database
async with Client(mcp_server) as client:
result = await client.call_tool("list_databases", {})
# The result should be a list containing at least one item
assert len(result.content) >= 1
assert isinstance(result.content[0].text, str)
# Parse the result text (it's a JSON list of database names)
databases = json.loads(result.content[0].text)
assert test_db in databases
assert "system" in databases # System database should always exist
@pytest.mark.asyncio
async def test_list_tables_basic(mcp_server, setup_test_database):
"""Test the list_tables tool without filters."""
test_db, test_table, test_table2 = setup_test_database
async with Client(mcp_server) as client:
result = await client.call_tool("list_tables", {"database": test_db})
assert len(result.content) >= 1
tables = json.loads(result.content[0].text)
# Should have exactly 2 tables
assert len(tables) == 2
# Get table names
table_names = [table["name"] for table in tables]
assert test_table in table_names
assert test_table2 in table_names
# Check table details
for table in tables:
assert table["database"] == test_db
assert "columns" in table
assert "total_rows" in table
assert "engine" in table
assert "comment" in table
# Verify column information exists
assert len(table["columns"]) > 0
for column in table["columns"]:
assert "name" in column
assert "column_type" in column
assert "comment" in column
@pytest.mark.asyncio
async def test_list_tables_with_like_filter(mcp_server, setup_test_database):
"""Test the list_tables tool with LIKE filter."""
test_db, test_table, _ = setup_test_database
async with Client(mcp_server) as client:
# Test with LIKE filter
result = await client.call_tool("list_tables", {"database": test_db, "like": "test_%"})
tables_data = json.loads(result.content[0].text)
# Handle both single dict and list of dicts
if isinstance(tables_data, dict):
tables = [tables_data]
else:
tables = tables_data
assert len(tables) == 1
assert tables[0]["name"] == test_table
@pytest.mark.asyncio
async def test_list_tables_with_not_like_filter(mcp_server, setup_test_database):
"""Test the list_tables tool with NOT LIKE filter."""
test_db, _, test_table2 = setup_test_database
async with Client(mcp_server) as client:
# Test with NOT LIKE filter
result = await client.call_tool("list_tables", {"database": test_db, "not_like": "test_%"})
tables_data = json.loads(result.content[0].text)
# Handle both single dict and list of dicts
if isinstance(tables_data, dict):
tables = [tables_data]
else:
tables = tables_data
assert len(tables) == 1
assert tables[0]["name"] == test_table2
@pytest.mark.asyncio
async def test_run_select_query_success(mcp_server, setup_test_database):
"""Test running a successful SELECT query."""
test_db, test_table, _ = setup_test_database
async with Client(mcp_server) as client:
query = f"SELECT id, name, age FROM {test_db}.{test_table} ORDER BY id"
result = await client.call_tool("run_select_query", {"query": query})
query_result = json.loads(result.content[0].text)
# Check structure
assert "columns" in query_result
assert "rows" in query_result
# Check columns
assert query_result["columns"] == ["id", "name", "age"]
# Check rows
assert len(query_result["rows"]) == 4
assert query_result["rows"][0] == [1, "Alice", 30]
assert query_result["rows"][1] == [2, "Bob", 25]
assert query_result["rows"][2] == [3, "Charlie", 35]
assert query_result["rows"][3] == [4, "Diana", 28]
@pytest.mark.asyncio
async def test_run_select_query_with_aggregation(mcp_server, setup_test_database):
"""Test running a SELECT query with aggregation."""
test_db, test_table, _ = setup_test_database
async with Client(mcp_server) as client:
query = f"SELECT COUNT(*) as count, AVG(age) as avg_age FROM {test_db}.{test_table}"
result = await client.call_tool("run_select_query", {"query": query})
query_result = json.loads(result.content[0].text)
assert query_result["columns"] == ["count", "avg_age"]
assert len(query_result["rows"]) == 1
assert query_result["rows"][0][0] == 4 # count
assert query_result["rows"][0][1] == 29.5 # average age
@pytest.mark.asyncio
async def test_run_select_query_with_join(mcp_server, setup_test_database):
"""Test running a SELECT query with JOIN."""
test_db, test_table, test_table2 = setup_test_database
async with Client(mcp_server) as client:
# Insert related data for join
client_direct = create_clickhouse_client()
client_direct.command(f"""
INSERT INTO {test_db}.{test_table2} (event_id, event_type, timestamp) VALUES
(2001, 'purchase', '2024-01-01 14:00:00')
""")
query = f"""
SELECT
COUNT(DISTINCT event_type) as event_types_count
FROM {test_db}.{test_table2}
"""
result = await client.call_tool("run_select_query", {"query": query})
query_result = json.loads(result.content[0].text)
assert query_result["rows"][0][0] == 3 # login, logout, purchase
@pytest.mark.asyncio
async def test_run_select_query_error(mcp_server, setup_test_database):
"""Test running a SELECT query that results in an error."""
test_db, _, _ = setup_test_database
async with Client(mcp_server) as client:
# Query non-existent table
query = f"SELECT * FROM {test_db}.non_existent_table"
# Should raise ToolError
with pytest.raises(ToolError) as exc_info:
await client.call_tool("run_select_query", {"query": query})
assert "Query execution failed" in str(exc_info.value)
@pytest.mark.asyncio
async def test_run_select_query_syntax_error(mcp_server):
"""Test running a SELECT query with syntax error."""
async with Client(mcp_server) as client:
# Invalid SQL syntax
query = "SELECT FROM WHERE"
# Should raise ToolError
with pytest.raises(ToolError) as exc_info:
await client.call_tool("run_select_query", {"query": query})
assert "Query execution failed" in str(exc_info.value)
@pytest.mark.asyncio
async def test_table_metadata_details(mcp_server, setup_test_database):
"""Test that table metadata is correctly retrieved."""
test_db, test_table, _ = setup_test_database
async with Client(mcp_server) as client:
result = await client.call_tool("list_tables", {"database": test_db})
tables = json.loads(result.content[0].text)
# Find our test table
test_table_info = next(t for t in tables if t["name"] == test_table)
# Check table comment
assert test_table_info["comment"] == "Test table for MCP server testing"
# Check engine info
assert test_table_info["engine"] == "MergeTree"
assert "MergeTree" in test_table_info["engine_full"]
# Check row count
assert test_table_info["total_rows"] == 4
# Check columns and their comments
columns_by_name = {col["name"]: col for col in test_table_info["columns"]}
assert columns_by_name["id"]["comment"] == "Primary identifier"
assert columns_by_name["id"]["column_type"] == "UInt32"
assert columns_by_name["name"]["comment"] == "User name field"
assert columns_by_name["name"]["column_type"] == "String"
assert columns_by_name["age"]["comment"] == "User age"
assert columns_by_name["age"]["column_type"] == "UInt8"
assert columns_by_name["created_at"]["comment"] == "Record creation timestamp"
assert columns_by_name["created_at"]["column_type"] == "DateTime"
assert columns_by_name["created_at"]["default_expression"] == "now()"
@pytest.mark.asyncio
async def test_system_database_access(mcp_server):
"""Test that we can access system databases."""
async with Client(mcp_server) as client:
# List tables in system database
result = await client.call_tool("list_tables", {"database": "system"})
tables = json.loads(result.content[0].text)
# System database should have many tables
assert len(tables) > 10
# Check for some common system tables
table_names = [t["name"] for t in tables]
assert "tables" in table_names
assert "columns" in table_names
assert "databases" in table_names
@pytest.mark.asyncio
async def test_concurrent_queries(mcp_server, setup_test_database):
"""Test running multiple queries concurrently."""
test_db, test_table, test_table2 = setup_test_database
async with Client(mcp_server) as client:
# Run multiple queries concurrently
queries = [
f"SELECT COUNT(*) FROM {test_db}.{test_table}",
f"SELECT COUNT(*) FROM {test_db}.{test_table2}",
f"SELECT MAX(id) FROM {test_db}.{test_table}",
f"SELECT MIN(event_id) FROM {test_db}.{test_table2}",
]
# Execute all queries concurrently
results = await asyncio.gather(
*[client.call_tool("run_select_query", {"query": query}) for query in queries]
)
# Verify all queries succeeded
assert len(results) == 4
# Check each result
for i, result in enumerate(results):
query_result = json.loads(result.content[0].text)
assert "rows" in query_result
assert len(query_result["rows"]) == 1
```
--------------------------------------------------------------------------------
/mcp_clickhouse/mcp_server.py:
--------------------------------------------------------------------------------
```python
import logging
import json
from typing import Optional, List, Any
import concurrent.futures
import atexit
import os
import clickhouse_connect
import chdb.session as chs
from clickhouse_connect.driver.binding import format_query_value
from dotenv import load_dotenv
from fastmcp import FastMCP
from fastmcp.tools import Tool
from fastmcp.prompts import Prompt
from fastmcp.exceptions import ToolError
from dataclasses import dataclass, field, asdict, is_dataclass
from starlette.requests import Request
from starlette.responses import PlainTextResponse
from mcp_clickhouse.mcp_env import get_config, get_chdb_config, get_mcp_config
from mcp_clickhouse.chdb_prompt import CHDB_PROMPT
@dataclass
class Column:
database: str
table: str
name: str
column_type: str
default_kind: Optional[str]
default_expression: Optional[str]
comment: Optional[str]
@dataclass
class Table:
database: str
name: str
engine: str
create_table_query: str
dependencies_database: str
dependencies_table: str
engine_full: str
sorting_key: str
primary_key: str
total_rows: int
total_bytes: int
total_bytes_uncompressed: int
parts: int
active_parts: int
total_marks: int
comment: Optional[str] = None
columns: List[Column] = field(default_factory=list)
MCP_SERVER_NAME = "mcp-clickhouse"
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(MCP_SERVER_NAME)
QUERY_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=10)
atexit.register(lambda: QUERY_EXECUTOR.shutdown(wait=True))
load_dotenv()
mcp = FastMCP(name=MCP_SERVER_NAME)
@mcp.custom_route("/health", methods=["GET"])
async def health_check(request: Request) -> PlainTextResponse:
"""Health check endpoint for monitoring server status.
Returns OK if the server is running and can connect to ClickHouse.
"""
try:
# Check if ClickHouse is enabled by trying to create config
# If ClickHouse is disabled, this will succeed but connection will fail
clickhouse_enabled = os.getenv("CLICKHOUSE_ENABLED", "true").lower() == "true"
if not clickhouse_enabled:
# If ClickHouse is disabled, check chDB status
chdb_config = get_chdb_config()
if chdb_config.enabled:
return PlainTextResponse("OK - MCP server running with chDB enabled")
else:
# Both ClickHouse and chDB are disabled - this is an error
return PlainTextResponse(
"ERROR - Both ClickHouse and chDB are disabled. At least one must be enabled.",
status_code=503,
)
# Try to create a client connection to verify ClickHouse connectivity
client = create_clickhouse_client()
version = client.server_version
return PlainTextResponse(f"OK - Connected to ClickHouse {version}")
except Exception as e:
# Return 503 Service Unavailable if we can't connect to ClickHouse
return PlainTextResponse(f"ERROR - Cannot connect to ClickHouse: {str(e)}", status_code=503)
def result_to_table(query_columns, result) -> List[Table]:
return [Table(**dict(zip(query_columns, row))) for row in result]
def result_to_column(query_columns, result) -> List[Column]:
return [Column(**dict(zip(query_columns, row))) for row in result]
def to_json(obj: Any) -> str:
if is_dataclass(obj):
return json.dumps(asdict(obj), default=to_json)
elif isinstance(obj, list):
return [to_json(item) for item in obj]
elif isinstance(obj, dict):
return {key: to_json(value) for key, value in obj.items()}
return obj
def list_databases():
"""List available ClickHouse databases"""
logger.info("Listing all databases")
client = create_clickhouse_client()
result = client.command("SHOW DATABASES")
# Convert newline-separated string to list and trim whitespace
if isinstance(result, str):
databases = [db.strip() for db in result.strip().split("\n")]
else:
databases = [result]
logger.info(f"Found {len(databases)} databases")
return json.dumps(databases)
def list_tables(database: str, like: Optional[str] = None, not_like: Optional[str] = None):
"""List available ClickHouse tables in a database, including schema, comment,
row count, and column count."""
logger.info(f"Listing tables in database '{database}'")
client = create_clickhouse_client()
query = f"SELECT database, name, engine, create_table_query, dependencies_database, dependencies_table, engine_full, sorting_key, primary_key, total_rows, total_bytes, total_bytes_uncompressed, parts, active_parts, total_marks, comment FROM system.tables WHERE database = {format_query_value(database)}"
if like:
query += f" AND name LIKE {format_query_value(like)}"
if not_like:
query += f" AND name NOT LIKE {format_query_value(not_like)}"
result = client.query(query)
# Deserialize result as Table dataclass instances
tables = result_to_table(result.column_names, result.result_rows)
for table in tables:
column_data_query = f"SELECT database, table, name, type AS column_type, default_kind, default_expression, comment FROM system.columns WHERE database = {format_query_value(database)} AND table = {format_query_value(table.name)}"
column_data_query_result = client.query(column_data_query)
table.columns = [
c
for c in result_to_column(
column_data_query_result.column_names,
column_data_query_result.result_rows,
)
]
logger.info(f"Found {len(tables)} tables")
return [asdict(table) for table in tables]
def execute_query(query: str):
client = create_clickhouse_client()
try:
read_only = get_readonly_setting(client)
res = client.query(query, settings={"readonly": read_only})
logger.info(f"Query returned {len(res.result_rows)} rows")
return {"columns": res.column_names, "rows": res.result_rows}
except Exception as err:
logger.error(f"Error executing query: {err}")
raise ToolError(f"Query execution failed: {str(err)}")
def run_select_query(query: str):
"""Run a SELECT query in a ClickHouse database"""
logger.info(f"Executing SELECT query: {query}")
try:
future = QUERY_EXECUTOR.submit(execute_query, query)
try:
timeout_secs = get_mcp_config().query_timeout
result = future.result(timeout=timeout_secs)
# Check if we received an error structure from execute_query
if isinstance(result, dict) and "error" in result:
logger.warning(f"Query failed: {result['error']}")
# MCP requires structured responses; string error messages can cause
# serialization issues leading to BrokenResourceError
return {
"status": "error",
"message": f"Query failed: {result['error']}",
}
return result
except concurrent.futures.TimeoutError:
logger.warning(f"Query timed out after {timeout_secs} seconds: {query}")
future.cancel()
raise ToolError(f"Query timed out after {timeout_secs} seconds")
except ToolError:
raise
except Exception as e:
logger.error(f"Unexpected error in run_select_query: {str(e)}")
raise RuntimeError(f"Unexpected error during query execution: {str(e)}")
def create_clickhouse_client():
client_config = get_config().get_client_config()
logger.info(
f"Creating ClickHouse client connection to {client_config['host']}:{client_config['port']} "
f"as {client_config['username']} "
f"(secure={client_config['secure']}, verify={client_config['verify']}, "
f"connect_timeout={client_config['connect_timeout']}s, "
f"send_receive_timeout={client_config['send_receive_timeout']}s)"
)
try:
client = clickhouse_connect.get_client(**client_config)
# Test the connection
version = client.server_version
logger.info(f"Successfully connected to ClickHouse server version {version}")
return client
except Exception as e:
logger.error(f"Failed to connect to ClickHouse: {str(e)}")
raise
def get_readonly_setting(client) -> str:
"""Get the appropriate readonly setting value to use for queries.
This function handles potential conflicts between server and client readonly settings:
- readonly=0: No read-only restrictions
- readonly=1: Only read queries allowed, settings cannot be changed
- readonly=2: Only read queries allowed, settings can be changed (except readonly itself)
If server has readonly=2 and client tries to set readonly=1, it would cause:
"Setting readonly is unknown or readonly" error
This function preserves the server's readonly setting unless it's 0, in which case
we enforce readonly=1 to ensure queries are read-only.
Args:
client: ClickHouse client connection
Returns:
String value of readonly setting to use
"""
read_only = client.server_settings.get("readonly")
if read_only:
if read_only == "0":
return "1" # Force read-only mode if server has it disabled
else:
return read_only.value # Respect server's readonly setting (likely 2)
else:
return "1" # Default to basic read-only mode if setting isn't present
def create_chdb_client():
"""Create a chDB client connection."""
if not get_chdb_config().enabled:
raise ValueError("chDB is not enabled. Set CHDB_ENABLED=true to enable it.")
return _chdb_client
def execute_chdb_query(query: str):
"""Execute a query using chDB client."""
client = create_chdb_client()
try:
res = client.query(query, "JSON")
if res.has_error():
error_msg = res.error_message()
logger.error(f"Error executing chDB query: {error_msg}")
return {"error": error_msg}
result_data = res.data()
if not result_data:
return []
result_json = json.loads(result_data)
return result_json.get("data", [])
except Exception as err:
logger.error(f"Error executing chDB query: {err}")
return {"error": str(err)}
def run_chdb_select_query(query: str):
"""Run SQL in chDB, an in-process ClickHouse engine"""
logger.info(f"Executing chDB SELECT query: {query}")
try:
future = QUERY_EXECUTOR.submit(execute_chdb_query, query)
try:
timeout_secs = get_mcp_config().query_timeout
result = future.result(timeout=timeout_secs)
# Check if we received an error structure from execute_chdb_query
if isinstance(result, dict) and "error" in result:
logger.warning(f"chDB query failed: {result['error']}")
return {
"status": "error",
"message": f"chDB query failed: {result['error']}",
}
return result
except concurrent.futures.TimeoutError:
logger.warning(
f"chDB query timed out after {timeout_secs} seconds: {query}"
)
future.cancel()
return {
"status": "error",
"message": f"chDB query timed out after {timeout_secs} seconds",
}
except Exception as e:
logger.error(f"Unexpected error in run_chdb_select_query: {e}")
return {"status": "error", "message": f"Unexpected error: {e}"}
def chdb_initial_prompt() -> str:
"""This prompt helps users understand how to interact and perform common operations in chDB"""
return CHDB_PROMPT
def _init_chdb_client():
"""Initialize the global chDB client instance."""
try:
if not get_chdb_config().enabled:
logger.info("chDB is disabled, skipping client initialization")
return None
client_config = get_chdb_config().get_client_config()
data_path = client_config["data_path"]
logger.info(f"Creating chDB client with data_path={data_path}")
client = chs.Session(path=data_path)
logger.info(f"Successfully connected to chDB with data_path={data_path}")
return client
except Exception as e:
logger.error(f"Failed to initialize chDB client: {e}")
return None
# Register tools based on configuration
if os.getenv("CLICKHOUSE_ENABLED", "true").lower() == "true":
mcp.add_tool(Tool.from_function(list_databases))
mcp.add_tool(Tool.from_function(list_tables))
mcp.add_tool(Tool.from_function(run_select_query))
logger.info("ClickHouse tools registered")
if os.getenv("CHDB_ENABLED", "false").lower() == "true":
_chdb_client = _init_chdb_client()
if _chdb_client:
atexit.register(lambda: _chdb_client.close())
mcp.add_tool(Tool.from_function(run_chdb_select_query))
chdb_prompt = Prompt.from_function(
chdb_initial_prompt,
name="chdb_initial_prompt",
description="This prompt helps users understand how to interact and perform common operations in chDB",
)
mcp.add_prompt(chdb_prompt)
logger.info("chDB tools and prompts registered")
```