This is page 1 of 5. Use http://codebase.md/alexander-zuev/supabase-mcp-server?page={x} to view the full context. # Directory Structure ``` ├── .claude │ └── settings.local.json ├── .dockerignore ├── .env.example ├── .env.test.example ├── .github │ ├── FUNDING.yml │ ├── ISSUE_TEMPLATE │ │ ├── bug_report.md │ │ ├── feature_request.md │ │ └── roadmap_item.md │ ├── PULL_REQUEST_TEMPLATE.md │ └── workflows │ ├── ci.yaml │ ├── docs │ │ └── release-checklist.md │ └── publish.yaml ├── .gitignore ├── .pre-commit-config.yaml ├── .python-version ├── CHANGELOG.MD ├── codecov.yml ├── CONTRIBUTING.MD ├── Dockerfile ├── LICENSE ├── llms-full.txt ├── pyproject.toml ├── README.md ├── smithery.yaml ├── supabase_mcp │ ├── __init__.py │ ├── clients │ │ ├── api_client.py │ │ ├── base_http_client.py │ │ ├── management_client.py │ │ └── sdk_client.py │ ├── core │ │ ├── __init__.py │ │ ├── container.py │ │ └── feature_manager.py │ ├── exceptions.py │ ├── logger.py │ ├── main.py │ ├── services │ │ ├── __init__.py │ │ ├── api │ │ │ ├── __init__.py │ │ │ ├── api_manager.py │ │ │ ├── spec_manager.py │ │ │ └── specs │ │ │ └── api_spec.json │ │ ├── database │ │ │ ├── __init__.py │ │ │ ├── migration_manager.py │ │ │ ├── postgres_client.py │ │ │ ├── query_manager.py │ │ │ └── sql │ │ │ ├── loader.py │ │ │ ├── models.py │ │ │ ├── queries │ │ │ │ ├── create_migration.sql │ │ │ │ ├── get_migrations.sql │ │ │ │ ├── get_schemas.sql │ │ │ │ ├── get_table_schema.sql │ │ │ │ ├── get_tables.sql │ │ │ │ ├── init_migrations.sql │ │ │ │ └── logs │ │ │ │ ├── auth_logs.sql │ │ │ │ ├── cron_logs.sql │ │ │ │ ├── edge_logs.sql │ │ │ │ ├── function_edge_logs.sql │ │ │ │ ├── pgbouncer_logs.sql │ │ │ │ ├── postgres_logs.sql │ │ │ │ ├── postgrest_logs.sql │ │ │ │ ├── realtime_logs.sql │ │ │ │ ├── storage_logs.sql │ │ │ │ └── supavisor_logs.sql │ │ │ └── validator.py │ │ ├── logs │ │ │ ├── __init__.py │ │ │ └── log_manager.py │ │ ├── safety │ │ │ ├── __init__.py │ │ │ ├── models.py │ │ │ ├── safety_configs.py │ │ │ └── safety_manager.py │ │ └── sdk │ │ ├── __init__.py │ │ ├── auth_admin_models.py │ │ └── auth_admin_sdk_spec.py │ ├── settings.py │ └── tools │ ├── __init__.py │ ├── descriptions │ │ ├── api_tools.yaml │ │ ├── database_tools.yaml │ │ ├── logs_and_analytics_tools.yaml │ │ ├── safety_tools.yaml │ │ └── sdk_tools.yaml │ ├── manager.py │ └── registry.py ├── tests │ ├── __init__.py │ ├── conftest.py │ ├── services │ │ ├── __init__.py │ │ ├── api │ │ │ ├── __init__.py │ │ │ ├── test_api_client.py │ │ │ ├── test_api_manager.py │ │ │ └── test_spec_manager.py │ │ ├── database │ │ │ ├── sql │ │ │ │ ├── __init__.py │ │ │ │ ├── conftest.py │ │ │ │ ├── test_loader.py │ │ │ │ ├── test_sql_validator_integration.py │ │ │ │ └── test_sql_validator.py │ │ │ ├── test_migration_manager.py │ │ │ ├── test_postgres_client.py │ │ │ └── test_query_manager.py │ │ ├── logs │ │ │ └── test_log_manager.py │ │ ├── safety │ │ │ ├── test_api_safety_config.py │ │ │ ├── test_safety_manager.py │ │ │ └── test_sql_safety_config.py │ │ └── sdk │ │ ├── test_auth_admin_models.py │ │ └── test_sdk_client.py │ ├── test_container.py │ ├── test_main.py │ ├── test_settings.py │ ├── test_tool_manager.py │ ├── test_tools_integration.py.bak │ └── test_tools.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.12.9 ``` -------------------------------------------------------------------------------- /.env.test.example: -------------------------------------------------------------------------------- ``` # Supabase MCP Server Test Environment # Copy this file to .env.test and modify as needed for your tests # Connection settings for test database SUPABASE_PROJECT_REF=127.0.0.1:54322 SUPABASE_DB_PASSWORD=postgres # Optional: Management API access token (for API tests) # SUPABASE_ACCESS_TOKEN=your_access_token # Optional: Service role key (for auth tests) # SUPABASE_SERVICE_ROLE_KEY=your_service_role_key # TheQuery.dev API URL QUERY_API_URL=http://127.0.0.1:8080/v1 ``` -------------------------------------------------------------------------------- /.env.example: -------------------------------------------------------------------------------- ``` # Supabase MCP Server Environment Configuration # Copy this file to .env to configure your server # API Key QUERY_API_KEY=your-api-key # thequery.dev API key # Required for remote Supabase projects (optional for local development) SUPABASE_PROJECT_REF=your-project-ref # Your project reference from dashboard URL SUPABASE_DB_PASSWORD=your-db-password # Database password for your project SUPABASE_REGION=us-east-1 # Region where your Supabase project is hosted # Optional configuration SUPABASE_ACCESS_TOKEN=your-personal-access-token # Required for Management API tools SUPABASE_SERVICE_ROLE_KEY=your-service-role-key # Required for Auth Admin SDK tools # ONLY for local development QUERY_API_URL=http://127.0.0.1:8080/v1 # TheQuery.dev API URL when developing locally ``` -------------------------------------------------------------------------------- /.dockerignore: -------------------------------------------------------------------------------- ``` # Python __pycache__/ *.py[cod] *$py.class *.so .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ *.egg-info/ .installed.cfg *.egg .pytest_cache/ .coverage htmlcov/ .tox/ .nox/ # Virtual Environment .env .venv env/ venv/ ENV/ env.bak/ venv.bak/ # macOS .DS_Store .AppleDouble .LSOverride Icon ._* .DocumentRevisions-V100 .fseventsd .Spotlight-V100 .TemporaryItems .Trashes .VolumeIcon.icns .com.apple.timemachine.donotpresent # IDEs and Editors .idea/ .vscode/ *.swp *.swo *~ .project .classpath .settings/ *.sublime-workspace *.sublime-project # Local development .env.mcp .env.mcp2 *.log logs/ # Ignore local assets assets/ *.gif *.mp4 # Generated version file supabase_mcp/_version.py # Docs .llms-full.txt # Docker specific ignores Dockerfile .dockerignore docker-compose.yml docker-compose.yaml # Git .git/ .github/ .gitignore ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python __pycache__/ *.py[cod] *$py.class *.so .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ *.egg-info/ .installed.cfg *.egg .pytest_cache/ .coverage htmlcov/ .tox/ .nox/ # Virtual Environment .env .env.test .venv env/ venv/ ENV/ env.bak/ venv.bak/ # macOS .DS_Store .AppleDouble .LSOverride Icon ._* .DocumentRevisions-V100 .fseventsd .Spotlight-V100 .TemporaryItems .Trashes .VolumeIcon.icns .com.apple.timemachine.donotpresent # IDEs and Editors .idea/ .vscode/ *.swp *.swo *~ .project .classpath .settings/ *.sublime-workspace *.sublime-project # Local development *.log # Only ignore logs directory in the root, not in the package /logs/ # Ignore local assets assets/ *.gif *.mp4 # Generated version file supabase_mcp/_version.py # Docs .llms-full.txt COMMIT_CONVENTION.md feature-spec/ # Claude code CLAUDE.md # future-evolution.md ``` -------------------------------------------------------------------------------- /.pre-commit-config.yaml: -------------------------------------------------------------------------------- ```yaml repos: # === Syntax & Basic Checks === - repo: https://github.com/pre-commit/pre-commit-hooks rev: v5.0.0 hooks: - id: check-ast name: Validate Python syntax - id: check-toml name: Validate TOML files - id: mixed-line-ending name: Normalize line endings args: ['--fix=lf'] - id: trailing-whitespace name: Remove trailing whitespace - id: end-of-file-fixer name: Ensure file ends with newline # === Security === - repo: https://github.com/pre-commit/pre-commit-hooks rev: v5.0.0 hooks: - id: detect-private-key name: Check for private keys stages: [pre-commit, pre-push, manual] - id: check-merge-conflict name: Check for merge conflicts stages: [pre-commit, manual] - id: debug-statements name: Check for debugger imports stages: [pre-commit, manual] # === SQL Linting === - repo: https://github.com/sqlfluff/sqlfluff rev: 3.3.1 hooks: - id: sqlfluff-lint name: Run SQLFluff linter description: Lint SQL files with SQLFluff types: [sql] args: [ "--dialect", "postgres", "--exclude-rules", "L016,L031,LT02", # Exclude some opinionated rules ] files: ^(supabase_mcp/sql|tests/sql)/ - id: sqlfluff-fix name: Run SQLFluff fixer description: Auto-fix SQL files with SQLFluff types: [sql] args: [ "--dialect", "postgres", "--exclude-rules", "L016,L031,LT02", # Exclude some opinionated rules ] files: ^(supabase_mcp/sql|tests/sql)/ # === Type Checking === - repo: https://github.com/pre-commit/mirrors-mypy rev: "v1.15.0" hooks: - id: mypy name: Run mypy type checker args: [ "--config-file=pyproject.toml", "--show-error-codes", "--pretty", ] additional_dependencies: [ "types-requests", "types-aiofiles", "types-pytz", "pydantic", "chainlit", "anthropic", "fastapi", "httpx", "tiktoken", "weave", "chromadb", "cohere", "langchain" ] entry: bash -c 'mypy "$@" || true' -- # === Code Quality & Style === - repo: https://github.com/astral-sh/ruff-pre-commit rev: v0.9.9 hooks: - id: ruff name: Run Ruff linter args: [ --fix, --exit-zero, --quiet, ] types_or: [python, pyi, jupyter] files: ^(src|tests)/ exclude: ^src/experimental/ verbose: false - id: ruff-format name: Run Ruff formatter types_or: [python, pyi, jupyter] # === Documentation Checks === - repo: https://github.com/tcort/markdown-link-check rev: v3.13.6 hooks: - id: markdown-link-check name: Check Markdown links description: Extracts links from markdown texts and checks they're all alive stages: [pre-commit, pre-push, manual] # === Testing === - repo: local hooks: - id: pytest name: Run tests entry: pytest language: system types: [python] pass_filenames: false args: [ "--no-header", "--quiet", "--no-summary", "--show-capture=no", "--tb=line" # Show only one line per failure ] stages: [pre-commit, pre-push] # === Build Check === - repo: local hooks: - id: build-check name: Check build entry: uv build language: system pass_filenames: false stages: [pre-commit, pre-push] - id: version-check name: Check package version # Print version from the built package entry: python -c "from supabase_mcp import __version__; print('📦 Package version:', __version__)" language: system verbose: true pass_filenames: false stages: [pre-commit, pre-push] ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # Query | MCP server for Supabase > 🌅 More than 17k installs via pypi and close to 30k downloads on Smithery.ai — in short, this was fun! 🥳 > Thanks to everyone who has been using this server for the past few months, and I hope it was useful for you. > Since Supabase has released their own [official MCP server](https://github.com/supabase-community/supabase-mcp), > I've decided to no longer actively maintain this one. The official MCP server is as feature-rich, and many more > features will be added in the future. Check it out! <p class="center-text"> <strong>Query MCP is an open-source MCP server that lets your IDE safely run SQL, manage schema changes, call the Supabase Management API, and use Auth Admin SDK — all with built-in safety controls.</strong> </p> <p class="center-text"> <a href="https://pypi.org/project/supabase-mcp-server/"><img src="https://img.shields.io/pypi/v/supabase-mcp-server.svg" alt="PyPI version" /></a> <a href="https://github.com/alexander-zuev/supabase-mcp-server/actions"><img src="https://github.com/alexander-zuev/supabase-mcp-server/workflows/CI/badge.svg" alt="CI Status" /></a> <a href="https://codecov.io/gh/alexander-zuev/supabase-mcp-server"><img src="https://codecov.io/gh/alexander-zuev/supabase-mcp-server/branch/main/graph/badge.svg" alt="Code Coverage" /></a> <a href="https://www.python.org/downloads/"><img src="https://img.shields.io/badge/python-3.12%2B-blue.svg" alt="Python 3.12+" /></a> <a href="https://github.com/astral-sh/uv"><img src="https://img.shields.io/badge/uv-package%20manager-blueviolet" alt="uv package manager" /></a> <a href="https://pepy.tech/project/supabase-mcp-server"><img src="https://static.pepy.tech/badge/supabase-mcp-server" alt="PyPI Downloads" /></a> <a href="https://smithery.ai/server/@alexander-zuev/supabase-mcp-server"><img src="https://smithery.ai/badge/@alexander-zuev/supabase-mcp-server" alt="Smithery.ai Downloads" /></a> <a href="https://modelcontextprotocol.io/introduction"><img src="https://img.shields.io/badge/MCP-Server-orange" alt="MCP Server" /></a> <a href="LICENSE"><img src="https://img.shields.io/badge/license-Apache%202.0-blue.svg" alt="License" /></a> </p> ## Table of contents <p class="center-text"> <a href="#getting-started">Getting started</a> • <a href="#feature-overview">Feature overview</a> • <a href="#troubleshooting">Troubleshooting</a> • <a href="#changelog">Changelog</a> </p> ## ✨ Key features - 💻 Compatible with Cursor, Windsurf, Cline and other MCP clients supporting `stdio` protocol - 🔐 Control read-only and read-write modes of SQL query execution - 🔍 Runtime SQL query validation with risk level assessment - 🛡️ Three-tier safety system for SQL operations: safe, write, and destructive - 🔄 Robust transaction handling for both direct and pooled database connections - 📝 Automatic versioning of database schema changes - 💻 Manage your Supabase projects with Supabase Management API - 🧑💻 Manage users with Supabase Auth Admin methods via Python SDK - 🔨 Pre-built tools to help Cursor & Windsurf work with MCP more effectively - 📦 Dead-simple install & setup via package manager (uv, pipx, etc.) ## Getting Started ### Prerequisites Installing the server requires the following on your system: - Python 3.12+ If you plan to install via `uv`, ensure it's [installed](https://docs.astral.sh/uv/getting-started/installation/#__tabbed_1_1). ### PostgreSQL Installation PostgreSQL installation is no longer required for the MCP server itself, as it now uses asyncpg which doesn't depend on PostgreSQL development libraries. However, you'll still need PostgreSQL if you're running a local Supabase instance: **MacOS** ```bash brew install postgresql@16 ``` **Windows** - Download and install PostgreSQL 16+ from https://www.postgresql.org/download/windows/ - Ensure "PostgreSQL Server" and "Command Line Tools" are selected during installation ### Step 1. Installation Since v0.2.0 I introduced support for package installation. You can use your favorite Python package manager to install the server via: ```bash # if pipx is installed (recommended) pipx install supabase-mcp-server # if uv is installed uv pip install supabase-mcp-server ``` `pipx` is recommended because it creates isolated environments for each package. You can also install the server manually by cloning the repository and running `pipx install -e .` from the root directory. #### Installing from source If you would like to install from source, for example for local development: ```bash uv venv # On Mac source .venv/bin/activate # On Windows .venv\Scripts\activate # Install package in editable mode uv pip install -e . ``` #### Installing via Smithery.ai You can find the full instructions on how to use Smithery.ai to connect to this MCP server [here](https://smithery.ai/server/@alexander-zuev/supabase-mcp-server). ### Step 2. Configuration The Supabase MCP server requires configuration to connect to your Supabase database, access the Management API, and use the Auth Admin SDK. This section explains all available configuration options and how to set them up. > 🔑 **Important**: Since v0.4 MCP server requires an API key which you can get for free at [thequery.dev](https://thequery.dev) to use this MCP server. #### Environment Variables The server uses the following environment variables: | Variable | Required | Default | Description | |----------|----------|---------|-------------| | `SUPABASE_PROJECT_REF` | Yes | `127.0.0.1:54322` | Your Supabase project reference ID (or local host:port) | | `SUPABASE_DB_PASSWORD` | Yes | `postgres` | Your database password | | `SUPABASE_REGION` | Yes* | `us-east-1` | AWS region where your Supabase project is hosted | | `SUPABASE_ACCESS_TOKEN` | No | None | Personal access token for Supabase Management API | | `SUPABASE_SERVICE_ROLE_KEY` | No | None | Service role key for Auth Admin SDK | | `QUERY_API_KEY` | Yes | None | API key from thequery.dev (required for all operations) | > **Note**: The default values are configured for local Supabase development. For remote Supabase projects, you must provide your own values for `SUPABASE_PROJECT_REF` and `SUPABASE_DB_PASSWORD`. > 🚨 **CRITICAL CONFIGURATION NOTE**: For remote Supabase projects, you MUST specify the correct region where your project is hosted using `SUPABASE_REGION`. If you encounter a "Tenant or user not found" error, this is almost certainly because your region setting doesn't match your project's actual region. You can find your project's region in the Supabase dashboard under Project Settings. #### Connection Types ##### Database Connection - The server connects to your Supabase PostgreSQL database using the transaction pooler endpoint - Local development uses a direct connection to `127.0.0.1:54322` - Remote projects use the format: `postgresql://postgres.[project_ref]:[password]@aws-0-[region].pooler.supabase.com:6543/postgres` > ⚠️ **Important**: Session pooling connections are not supported. The server exclusively uses transaction pooling for better compatibility with the MCP server architecture. ##### Management API Connection - Requires `SUPABASE_ACCESS_TOKEN` to be set - Connects to the Supabase Management API at `https://api.supabase.com` - Only works with remote Supabase projects (not local development) ##### Auth Admin SDK Connection - Requires `SUPABASE_SERVICE_ROLE_KEY` to be set - For local development, connects to `http://127.0.0.1:54321` - For remote projects, connects to `https://[project_ref].supabase.co` #### Configuration Methods The server looks for configuration in this order (highest to lowest priority): 1. **Environment Variables**: Values set directly in your environment 2. **Local `.env` File**: A `.env` file in your current working directory (only works when running from source) 3. **Global Config File**: - Windows: `%APPDATA%\supabase-mcp\.env` - macOS/Linux: `~/.config/supabase-mcp/.env` 4. **Default Settings**: Local development defaults (if no other config is found) > ⚠️ **Important**: When using the package installed via pipx or uv, local `.env` files in your project directory are **not** detected. You must use either environment variables or the global config file. #### Setting Up Configuration ##### Option 1: Client-Specific Configuration (Recommended) Set environment variables directly in your MCP client configuration (see client-specific setup instructions in Step 3). Most MCP clients support this approach, which keeps your configuration with your client settings. ##### Option 2: Global Configuration Create a global `.env` configuration file that will be used for all MCP server instances: ```bash # Create config directory # On macOS/Linux mkdir -p ~/.config/supabase-mcp # On Windows (PowerShell) mkdir -Force "$env:APPDATA\supabase-mcp" # Create and edit .env file # On macOS/Linux nano ~/.config/supabase-mcp/.env # On Windows (PowerShell) notepad "$env:APPDATA\supabase-mcp\.env" ``` Add your configuration values to the file: ``` QUERY_API_KEY=your-api-key SUPABASE_PROJECT_REF=your-project-ref SUPABASE_DB_PASSWORD=your-db-password SUPABASE_REGION=us-east-1 SUPABASE_ACCESS_TOKEN=your-access-token SUPABASE_SERVICE_ROLE_KEY=your-service-role-key ``` ##### Option 3: Project-Specific Configuration (Source Installation Only) If you're running the server from source (not via package), you can create a `.env` file in your project directory with the same format as above. #### Finding Your Supabase Project Information - **Project Reference**: Found in your Supabase project URL: `https://supabase.com/dashboard/project/<project-ref>` - **Database Password**: Set during project creation or found in Project Settings → Database - **Access Token**: Generate at https://supabase.com/dashboard/account/tokens - **Service Role Key**: Found in Project Settings → API → Project API keys #### Supported Regions The server supports all Supabase regions: - `us-west-1` - West US (North California) - `us-east-1` - East US (North Virginia) - default - `us-east-2` - East US (Ohio) - `ca-central-1` - Canada (Central) - `eu-west-1` - West EU (Ireland) - `eu-west-2` - West Europe (London) - `eu-west-3` - West EU (Paris) - `eu-central-1` - Central EU (Frankfurt) - `eu-central-2` - Central Europe (Zurich) - `eu-north-1` - North EU (Stockholm) - `ap-south-1` - South Asia (Mumbai) - `ap-southeast-1` - Southeast Asia (Singapore) - `ap-northeast-1` - Northeast Asia (Tokyo) - `ap-northeast-2` - Northeast Asia (Seoul) - `ap-southeast-2` - Oceania (Sydney) - `sa-east-1` - South America (São Paulo) #### Limitations - **No Self-Hosted Support**: The server only supports official Supabase.com hosted projects and local development - **No Connection String Support**: Custom connection strings are not supported - **No Session Pooling**: Only transaction pooling is supported for database connections - **API and SDK Features**: Management API and Auth Admin SDK features only work with remote Supabase projects, not local development ### Step 3. Usage In general, any MCP client that supports `stdio` protocol should work with this MCP server. This server was explicitly tested to work with: - Cursor - Windsurf - Cline - Claude Desktop Additionally, you can also use smithery.ai to install this server a number of clients, including the ones above. Follow the guides below to install this MCP server in your client. #### Cursor Go to Settings -> Features -> MCP Servers and add a new server with this configuration: ```bash # can be set to any name name: supabase type: command # if you installed with pipx command: supabase-mcp-server # if you installed with uv command: uv run supabase-mcp-server # if the above doesn't work, use the full path (recommended) command: /full/path/to/supabase-mcp-server # Find with 'which supabase-mcp-server' (macOS/Linux) or 'where supabase-mcp-server' (Windows) ``` If configuration is correct, you should see a green dot indicator and the number of tools exposed by the server.  #### Windsurf Go to Cascade -> Click on the hammer icon -> Configure -> Fill in the configuration: ```json { "mcpServers": { "supabase": { "command": "/Users/username/.local/bin/supabase-mcp-server", // update path "env": { "QUERY_API_KEY": "your-api-key", // Required - get your API key at thequery.dev "SUPABASE_PROJECT_REF": "your-project-ref", "SUPABASE_DB_PASSWORD": "your-db-password", "SUPABASE_REGION": "us-east-1", // optional, defaults to us-east-1 "SUPABASE_ACCESS_TOKEN": "your-access-token", // optional, for management API "SUPABASE_SERVICE_ROLE_KEY": "your-service-role-key" // optional, for Auth Admin SDK } } } } ``` If configuration is correct, you should see green dot indicator and clickable supabase server in the list of available servers.  #### Claude Desktop Claude Desktop also supports MCP servers through a JSON configuration. Follow these steps to set up the Supabase MCP server: 1. **Find the full path to the executable** (this step is critical): ```bash # On macOS/Linux which supabase-mcp-server # On Windows where supabase-mcp-server ``` Copy the full path that is returned (e.g., `/Users/username/.local/bin/supabase-mcp-server`). 2. **Configure the MCP server** in Claude Desktop: - Open Claude Desktop - Go to Settings → Developer -> Edit Config MCP Servers - Add a new configuration with the following JSON: ```json { "mcpServers": { "supabase": { "command": "/full/path/to/supabase-mcp-server", // Replace with the actual path from step 1 "env": { "QUERY_API_KEY": "your-api-key", // Required - get your API key at thequery.dev "SUPABASE_PROJECT_REF": "your-project-ref", "SUPABASE_DB_PASSWORD": "your-db-password", "SUPABASE_REGION": "us-east-1", // optional, defaults to us-east-1 "SUPABASE_ACCESS_TOKEN": "your-access-token", // optional, for management API "SUPABASE_SERVICE_ROLE_KEY": "your-service-role-key" // optional, for Auth Admin SDK } } } } ``` > ⚠️ **Important**: Unlike Windsurf and Cursor, Claude Desktop requires the **full absolute path** to the executable. Using just the command name (`supabase-mcp-server`) will result in a "spawn ENOENT" error. If configuration is correct, you should see the Supabase MCP server listed as available in Claude Desktop.  #### Cline Cline also supports MCP servers through a similar JSON configuration. Follow these steps to set up the Supabase MCP server: 1. **Find the full path to the executable** (this step is critical): ```bash # On macOS/Linux which supabase-mcp-server # On Windows where supabase-mcp-server ``` Copy the full path that is returned (e.g., `/Users/username/.local/bin/supabase-mcp-server`). 2. **Configure the MCP server** in Cline: - Open Cline in VS Code - Click on the "MCP Servers" tab in the Cline sidebar - Click "Configure MCP Servers" - This will open the `cline_mcp_settings.json` file - Add the following configuration: ```json { "mcpServers": { "supabase": { "command": "/full/path/to/supabase-mcp-server", // Replace with the actual path from step 1 "env": { "QUERY_API_KEY": "your-api-key", // Required - get your API key at thequery.dev "SUPABASE_PROJECT_REF": "your-project-ref", "SUPABASE_DB_PASSWORD": "your-db-password", "SUPABASE_REGION": "us-east-1", // optional, defaults to us-east-1 "SUPABASE_ACCESS_TOKEN": "your-access-token", // optional, for management API "SUPABASE_SERVICE_ROLE_KEY": "your-service-role-key" // optional, for Auth Admin SDK } } } } ``` If configuration is correct, you should see a green indicator next to the Supabase MCP server in the Cline MCP Servers list, and a message confirming "supabase MCP server connected" at the bottom of the panel.  ### Troubleshooting Here are some tips & tricks that might help you: - **Debug installation** - run `supabase-mcp-server` directly from the terminal to see if it works. If it doesn't, there might be an issue with the installation. - **MCP Server configuration** - if the above step works, it means the server is installed and configured correctly. As long as you provided the right command, IDE should be able to connect. Make sure to provide the right path to the server executable. - **"No tools found" error** - If you see "Client closed - no tools available" in Cursor despite the package being installed: - Find the full path to the executable by running `which supabase-mcp-server` (macOS/Linux) or `where supabase-mcp-server` (Windows) - Use the full path in your MCP server configuration instead of just `supabase-mcp-server` - For example: `/Users/username/.local/bin/supabase-mcp-server` or `C:\Users\username\.local\bin\supabase-mcp-server.exe` - **Environment variables** - to connect to the right database, make sure you either set env variables in `mcp_config.json` or in `.env` file placed in a global config directory (`~/.config/supabase-mcp/.env` on macOS/Linux or `%APPDATA%\supabase-mcp\.env` on Windows). - **Accessing logs** - The MCP server writes detailed logs to a file: - Log file location: - macOS/Linux: `~/.local/share/supabase-mcp/mcp_server.log` - Windows: `%USERPROFILE%\.local\share\supabase-mcp\mcp_server.log` - Logs include connection status, configuration details, and operation results - View logs using any text editor or terminal commands: ```bash # On macOS/Linux cat ~/.local/share/supabase-mcp/mcp_server.log # On Windows (PowerShell) Get-Content "$env:USERPROFILE\.local\share\supabase-mcp\mcp_server.log" ``` If you are stuck or any of the instructions above are incorrect, please raise an issue. ### MCP Inspector A super useful tool to help debug MCP server issues is MCP Inspector. If you installed from source, you can run `supabase-mcp-inspector` from the project repo and it will run the inspector instance. Coupled with logs this will give you complete overview over what's happening in the server. > 📝 Running `supabase-mcp-inspector`, if installed from package, doesn't work properly - I will validate and fix in the coming release. ## Feature Overview ### Database query tools Since v0.3+ server provides comprehensive database management capabilities with built-in safety controls: - **SQL Query Execution**: Execute PostgreSQL queries with risk assessment - **Three-tier safety system**: - `safe`: Read-only operations (SELECT) - always allowed - `write`: Data modifications (INSERT, UPDATE, DELETE) - require unsafe mode - `destructive`: Schema changes (DROP, CREATE) - require unsafe mode + confirmation - **SQL Parsing and Validation**: - Uses PostgreSQL's parser (pglast) for accurate analysis and provides clear feedback on safety requirements - **Automatic Migration Versioning**: - Database-altering operations operations are automatically versioned - Generates descriptive names based on operation type and target - **Safety Controls**: - Default SAFE mode allows only read-only operations - All statements run in transaction mode via `asyncpg` - 2-step confirmation for high-risk operations - **Available Tools**: - `get_schemas`: Lists schemas with sizes and table counts - `get_tables`: Lists tables, foreign tables, and views with metadata - `get_table_schema`: Gets detailed table structure (columns, keys, relationships) - `execute_postgresql`: Executes SQL statements against your database - `confirm_destructive_operation`: Executes high-risk operations after confirmation - `retrieve_migrations`: Gets migrations with filtering and pagination options - `live_dangerously`: Toggles between safe and unsafe modes ### Management API tools Since v0.3.0 server provides secure access to the Supabase Management API with built-in safety controls: - **Available Tools**: - `send_management_api_request`: Sends arbitrary requests to Supabase Management API with auto-injection of project ref - `get_management_api_spec`: Gets the enriched API specification with safety information - Supports multiple query modes: by domain, by specific path/method, or all paths - Includes risk assessment information for each endpoint - Provides detailed parameter requirements and response formats - Helps LLMs understand the full capabilities of the Supabase Management API - `get_management_api_safety_rules`: Gets all safety rules with human-readable explanations - `live_dangerously`: Toggles between safe and unsafe operation modes - **Safety Controls**: - Uses the same safety manager as database operations for consistent risk management - Operations categorized by risk level: - `safe`: Read-only operations (GET) - always allowed - `unsafe`: State-changing operations (POST, PUT, PATCH, DELETE) - require unsafe mode - `blocked`: Destructive operations (delete project, etc.) - never allowed - Default safe mode prevents accidental state changes - Path-based pattern matching for precise safety rules **Note**: Management API tools only work with remote Supabase instances and are not compatible with local Supabase development setups. ### Auth Admin tools I was planning to add support for Python SDK methods to the MCP server. Upon consideration I decided to only add support for Auth admin methods as I often found myself manually creating test users which was prone to errors and time consuming. Now I can just ask Cursor to create a test user and it will be done seamlessly. Check out the full Auth Admin SDK method docs to know what it can do. Since v0.3.6 server supports direct access to Supabase Auth Admin methods via Python SDK: - Includes the following tools: - `get_auth_admin_methods_spec` to retrieve documentation for all available Auth Admin methods - `call_auth_admin_method` to directly invoke Auth Admin methods with proper parameter handling - Supported methods: - `get_user_by_id`: Retrieve a user by their ID - `list_users`: List all users with pagination - `create_user`: Create a new user - `delete_user`: Delete a user by their ID - `invite_user_by_email`: Send an invite link to a user's email - `generate_link`: Generate an email link for various authentication purposes - `update_user_by_id`: Update user attributes by ID - `delete_factor`: Delete a factor on a user (currently not implemented in SDK) #### Why use Auth Admin SDK instead of raw SQL queries? The Auth Admin SDK provides several key advantages over direct SQL manipulation: - **Functionality**: Enables operations not possible with SQL alone (invites, magic links, MFA) - **Accuracy**: More reliable then creating and executing raw SQL queries on auth schemas - **Simplicity**: Offers clear methods with proper validation and error handling - Response format: - All methods return structured Python objects instead of raw dictionaries - Object attributes can be accessed using dot notation (e.g., `user.id` instead of `user["id"]`) - Edge cases and limitations: - UUID validation: Many methods require valid UUID format for user IDs and will return specific validation errors - Email configuration: Methods like `invite_user_by_email` and `generate_link` require email sending to be configured in your Supabase project - Link types: When generating links, different link types have different requirements: - `signup` links don't require the user to exist - `magiclink` and `recovery` links require the user to already exist in the system - Error handling: The server provides detailed error messages from the Supabase API, which may differ from the dashboard interface - Method availability: Some methods like `delete_factor` are exposed in the API but not fully implemented in the SDK ### Logs & Analytics The server provides access to Supabase logs and analytics data, making it easier to monitor and troubleshoot your applications: - **Available Tool**: `retrieve_logs` - Access logs from any Supabase service - **Log Collections**: - `postgres`: Database server logs - `api_gateway`: API gateway requests - `auth`: Authentication events - `postgrest`: RESTful API service logs - `pooler`: Connection pooling logs - `storage`: Object storage operations - `realtime`: WebSocket subscription logs - `edge_functions`: Serverless function executions - `cron`: Scheduled job logs - `pgbouncer`: Connection pooler logs - **Features**: Filter by time, search text, apply field filters, or use custom SQL queries Simplifies debugging across your Supabase stack without switching between interfaces or writing complex queries. ### Automatic Versioning of Database Changes "With great power comes great responsibility." While `execute_postgresql` tool coupled with aptly named `live_dangerously` tool provide a powerful and simple way to manage your Supabase database, it also means that dropping a table or modifying one is one chat message away. In order to reduce the risk of irreversible changes, since v0.3.8 the server supports: - automatic creation of migration scripts for all write & destructive sql operations executed on the database - improved safety mode of query execution, in which all queries are categorized in: - `safe` type: always allowed. Includes all read-only ops. - `write`type: requires `write` mode to be enabled by the user. - `destructive` type: requires `write` mode to be enabled by the user AND a 2-step confirmation of query execution for clients that do not execute tools automatically. ### Universal Safety Mode Since v0.3.8 Safety Mode has been standardized across all services (database, API, SDK) using a universal safety manager. This provides consistent risk management and a unified interface for controlling safety settings across the entire MCP server. All operations (SQL queries, API requests, SDK methods) are categorized into risk levels: - `Low` risk: Read-only operations that don't modify data or structure (SELECT queries, GET API requests) - `Medium` risk: Write operations that modify data but not structure (INSERT/UPDATE/DELETE, most POST/PUT API requests) - `High` risk: Destructive operations that modify database structure or could cause data loss (DROP/TRUNCATE, DELETE API endpoints) - `Extreme` risk: Operations with severe consequences that are blocked entirely (deleting projects) Safety controls are applied based on risk level: - Low risk operations are always allowed - Medium risk operations require unsafe mode to be enabled - High risk operations require unsafe mode AND explicit confirmation - Extreme risk operations are never allowed #### How confirmation flow works Any high-risk operations (be it a postgresql or api request) will be blocked even in `unsafe` mode.  You will have to confirm and approve every high-risk operation explicitly in order for it to be executed.  ## Changelog - 📦 Simplified installation via package manager - ✅ (v0.2.0) - 🌎 Support for different Supabase regions - ✅ (v0.2.2) - 🎮 Programmatic access to Supabase management API with safety controls - ✅ (v0.3.0) - 👷♂️ Read and read-write database SQL queries with safety controls - ✅ (v0.3.0) - 🔄 Robust transaction handling for both direct and pooled connections - ✅ (v0.3.2) - 🐍 Support methods and objects available in native Python SDK - ✅ (v0.3.6) - 🔍 Stronger SQL query validation ✅ (v0.3.8) - 📝 Automatic versioning of database changes ✅ (v0.3.8) - 📖 Radically improved knowledge and tools of api spec ✅ (v0.3.8) - ✍️ Improved consistency of migration-related tools for a more organized database vcs ✅ (v0.3.10) - 🥳 Query MCP is released (v0.4.0) For a more detailed roadmap, please see this [discussion](https://github.com/alexander-zuev/supabase-mcp-server/discussions/46) on GitHub. ## Star History [](https://star-history.com/#alexander-zuev/supabase-mcp-server&Date) --- Enjoy! ☺️ ``` -------------------------------------------------------------------------------- /CONTRIBUTING.MD: -------------------------------------------------------------------------------- ```markdown # Contributing to Supabase MCP Server Thank you for your interest in Supabase MCP Server. This project aims to maintain a high quality standard I've set for it. I welcome and carefully review all contributions. Please read the following guidelines carefully. ## 🤓 Important: Pre-Contribution Requirements 1. **Required: Open a Discussion First** - **All contributions** must start with a GitHub Discussion before any code is written - Explain your proposed changes, why they're needed, and how they align with the project's vision - Wait for explicit approval from the maintainer before proceeding - PRs without a prior approved discussion will be closed immediately without review 2. **Project Vision** - This project follows a specific development vision maintained by the owner - Not all feature ideas will be accepted, even if well-implemented - The maintainer reserves the right to decline contributions that don't align with the project's direction ## 🛠️ Contribution Process (Only After Discussion Approval) 1. **Fork the repository:** Click the "Fork" button in the top right corner of the GitHub page. 2. **Create a new branch:** Create a branch with a descriptive name related to your contribution. ```bash git checkout -b feature/your-approved-feature ``` 3. **Quality Requirements:** - **Test Coverage:** All code changes must include appropriate tests - **Documentation:** Update all relevant documentation - **Code Style:** Follow the existing code style and patterns - **Commit Messages:** Use clear, descriptive commit messages 4. **Make your changes:** Implement the changes that were approved in the discussion. 5. **Test thoroughly:** Ensure all tests pass and add new tests for your changes. ```bash # Run tests pytest ``` 6. **Commit your changes:** Use clear, descriptive commit messages that explain what you've done. ```bash git commit -m "feat: implement approved feature X" ``` 7. **Push your branch:** Push your changes to your forked repository. ```bash git push origin feature/your-approved-feature ``` 8. **Create a pull request:** - Go to the original repository on GitHub - Click "New Pull Request" - Select "compare across forks" - Select your fork and branch as the source - Add a detailed description that references the approved discussion - Include information about how you've tested the changes - Submit the pull request 9. **Review Process:** - PRs will be reviewed when time permits - Be prepared to make requested changes - The maintainer may request significant revisions - PRs may be rejected even after review if they don't meet quality standards ## ⚠️ Grounds for Immediate Rejection Your PR will be closed without review if: - No prior discussion was opened and approved - Tests are missing or failing - Documentation is not updated - Code quality doesn't meet project standards - PR description is inadequate - Changes don't align with the approved discussion ## 🤔 Why These Requirements? - This project is maintained by a single developer (me) with limited review time - Quality and consistency are prioritized over quantity of contributions - The project follows a specific vision that I want to maintain ## 🌟 Acceptable Contributions The following types of contributions are most welcome: - Bug fixes with clear reproduction steps - Performance improvements with benchmarks - Documentation improvements - New features that have been pre-approved via discussion ## 💡 Alternative Ways to Contribute If you have ideas but don't want to go through this process: - Fork the project and build your own version - Share your use case in Discussions - Report bugs with detailed reproduction steps Thank you for understanding and respecting these guidelines. They help maintain the quality and direction of the project. ``` -------------------------------------------------------------------------------- /supabase_mcp/services/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /supabase_mcp/services/api/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /supabase_mcp/services/logs/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /supabase_mcp/services/safety/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /supabase_mcp/services/sdk/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /tests/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /tests/services/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /tests/services/api/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /tests/services/database/sql/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /tests/services/database/sql/test_sql_validator_integration.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /.github/FUNDING.yml: -------------------------------------------------------------------------------- ```yaml github: alexander-zuev ``` -------------------------------------------------------------------------------- /.claude/settings.local.json: -------------------------------------------------------------------------------- ```json { "permissions": { "allow": [ "Bash(mv:*)" ], "deny": [] } } ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/queries/create_migration.sql: -------------------------------------------------------------------------------- ```sql -- Create a migration INSERT INTO supabase_migrations.schema_migrations (version, name, statements) VALUES ('{version}', '{name}', ARRAY['{statements}']); ``` -------------------------------------------------------------------------------- /supabase_mcp/__init__.py: -------------------------------------------------------------------------------- ```python """Supabase MCP Server package.""" from supabase_mcp._version import __version__, version, version_tuple __all__ = ["__version__", "version", "version_tuple"] ``` -------------------------------------------------------------------------------- /supabase_mcp/core/__init__.py: -------------------------------------------------------------------------------- ```python """Supabase MCP Server package.""" from supabase_mcp._version import __version__, version, version_tuple __all__ = ["__version__", "version", "version_tuple"] ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/queries/logs/postgrest_logs.sql: -------------------------------------------------------------------------------- ```sql SELECT id, postgrest_logs.timestamp, event_message, identifier, metadata.host FROM postgrest_logs CROSS JOIN unnest(metadata) AS metadata {where_clause} ORDER BY timestamp DESC LIMIT {limit}; ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/queries/logs/pgbouncer_logs.sql: -------------------------------------------------------------------------------- ```sql SELECT id, pgbouncer_logs.timestamp, event_message, metadata.host, metadata.project FROM pgbouncer_logs CROSS JOIN unnest(metadata) AS metadata {where_clause} ORDER BY timestamp DESC LIMIT {limit}; ``` -------------------------------------------------------------------------------- /supabase_mcp/tools/__init__.py: -------------------------------------------------------------------------------- ```python """Tool manager module for Supabase MCP server. This module provides a centralized way to manage tool descriptions and registration. """ from .manager import ToolManager, ToolName __all__ = ["ToolManager", "ToolName"] ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/queries/logs/supavisor_logs.sql: -------------------------------------------------------------------------------- ```sql SELECT id, supavisor_logs.timestamp, event_message, metadata.level, metadata.project, metadata.region FROM supavisor_logs CROSS JOIN unnest(metadata) AS metadata {where_clause} ORDER BY timestamp DESC LIMIT {limit}; ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/queries/logs/auth_logs.sql: -------------------------------------------------------------------------------- ```sql SELECT id, auth_logs.timestamp, event_message, metadata.level, metadata.status, metadata.path, metadata.msg FROM auth_logs CROSS JOIN unnest(metadata) AS metadata {where_clause} ORDER BY timestamp DESC LIMIT {limit}; ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/queries/logs/storage_logs.sql: -------------------------------------------------------------------------------- ```sql SELECT id, storage_logs.timestamp, event_message, metadata.level, metadata.project, metadata.responseTime, metadata.rawError FROM storage_logs CROSS JOIN unnest(metadata) AS metadata {where_clause} ORDER BY timestamp DESC LIMIT {limit}; ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/queries/logs/postgres_logs.sql: -------------------------------------------------------------------------------- ```sql SELECT id, postgres_logs.timestamp, event_message, identifier, parsed.error_severity, parsed.query, parsed.application_name FROM postgres_logs CROSS JOIN unnest(metadata) AS m CROSS JOIN unnest(m.parsed) AS parsed {where_clause} ORDER BY timestamp DESC LIMIT {limit}; ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/queries/logs/edge_logs.sql: -------------------------------------------------------------------------------- ```sql SELECT id, edge_logs.timestamp, event_message, identifier, request.method, request.path, response.status_code, request.url, response.origin_time FROM edge_logs CROSS JOIN unnest(metadata) AS m CROSS JOIN unnest(m.request) AS request CROSS JOIN unnest(m.response) AS response {where_clause} ORDER BY timestamp DESC LIMIT {limit}; ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/queries/init_migrations.sql: -------------------------------------------------------------------------------- ```sql -- Initialize migrations infrastructure -- Create the migrations schema if it doesn't exist CREATE SCHEMA IF NOT EXISTS supabase_migrations; -- Create the migrations table if it doesn't exist CREATE TABLE IF NOT EXISTS supabase_migrations.schema_migrations ( version TEXT PRIMARY KEY, statements TEXT[] NOT NULL, name TEXT NOT NULL ); ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/queries/logs/cron_logs.sql: -------------------------------------------------------------------------------- ```sql SELECT id, postgres_logs.timestamp, event_message, identifier, parsed.error_severity, parsed.query, parsed.application_name FROM postgres_logs CROSS JOIN unnest(metadata) AS m CROSS JOIN unnest(m.parsed) AS parsed WHERE (parsed.application_name = 'pg_cron' OR event_message LIKE '%cron job%') {and_where_clause} ORDER BY timestamp DESC LIMIT {limit}; ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/queries/logs/realtime_logs.sql: -------------------------------------------------------------------------------- ```sql SELECT id, realtime_logs.timestamp, event_message, metadata.level, measurements.connected, measurements.connected_cluster, measurements.limit, measurements.sum, metadata.external_id FROM realtime_logs CROSS JOIN unnest(metadata) AS metadata CROSS JOIN unnest(metadata.measurements) AS measurements {where_clause} ORDER BY timestamp DESC LIMIT {limit}; ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile FROM python:3.12-slim-bookworm WORKDIR /app # Prepare the basic dependencies RUN apt-get update && apt-get install -y \ && rm -rf /var/lib/apt/lists/* # Install pipx RUN pip install --no-cache-dir pipx && \ pipx ensurepath && \ pipx install supabase-mcp-server # Add pipx bin directory to PATH ENV PATH="/root/.local/bin:$PATH" CMD ["supabase-mcp-server"] ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/queries/logs/function_edge_logs.sql: -------------------------------------------------------------------------------- ```sql SELECT id, function_edge_logs.timestamp, event_message, m.deployment_id, m.execution_time_ms, m.function_id, m.project_ref, request.method, request.pathname, request.url, request.host, response.status_code, m.version FROM function_edge_logs CROSS JOIN unnest(metadata) AS m CROSS JOIN unnest(m.request) AS request CROSS JOIN unnest(m.response) AS response {where_clause} ORDER BY timestamp DESC LIMIT {limit}; ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/queries/get_migrations.sql: -------------------------------------------------------------------------------- ```sql SELECT version, name, CASE WHEN '{include_full_queries}' = 'true' THEN statements ELSE NULL END AS statements, array_length(statements, 1) AS statement_count, CASE WHEN version ~ '^[0-9]+$' THEN 'numbered' ELSE 'named' END AS version_type FROM supabase_migrations.schema_migrations WHERE -- Filter by name if provided ('{name_pattern}' = '' OR name ILIKE '%' || '{name_pattern}' || '%') ORDER BY -- Order by version (timestamp) descending version DESC LIMIT {limit} OFFSET {offset}; ``` -------------------------------------------------------------------------------- /.github/ISSUE_TEMPLATE/roadmap_item.md: -------------------------------------------------------------------------------- ```markdown --- name: Roadmap Item (only for maintainers) about: Reserved for maintainers, used to track roadmap items title: "[target version] Brief description of the feature" labels: roadmap assignees: alexander-zuev --- ## Context What is the context of this item? ## Motivation Why should this feature be added? What problems does it solve? For whom? ## Expected Impact - **Users**: How will this benefit users? - **Development**: How does this improve the codebase or development process? ## Implementation Ideas Any initial thoughts on how this could be implemented. ``` -------------------------------------------------------------------------------- /supabase_mcp/services/safety/models.py: -------------------------------------------------------------------------------- ```python from enum import Enum, IntEnum class OperationRiskLevel(IntEnum): """Universal operation risk level mapping. Higher number reflects higher risk levels with 4 being the highest.""" LOW = 1 MEDIUM = 2 HIGH = 3 EXTREME = 4 class SafetyMode(str, Enum): """Universal safety mode of a client (database, api, etc). Clients should always default to safe mode.""" SAFE = "safe" UNSAFE = "unsafe" class ClientType(str, Enum): """Types of clients that can be managed by the safety system.""" DATABASE = "database" API = "api" ``` -------------------------------------------------------------------------------- /.github/ISSUE_TEMPLATE/feature_request.md: -------------------------------------------------------------------------------- ```markdown --- name: Feature Request about: Suggest an idea to improve the Supabase MCP server title: "I want X so that I can do Y and gain Z" labels: '' assignees: '' --- ## Is your feature request related to a problem? A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] ## Describe the solution you'd like A clear and concise description of what you want to happen. ## Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered. ## Additional context Add any other context or screenshots about the feature request here. ``` -------------------------------------------------------------------------------- /.github/PULL_REQUEST_TEMPLATE.md: -------------------------------------------------------------------------------- ```markdown # Description <!-- Provide a clear and concise description of what this PR accomplishes --> ## Type of Change - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Documentation update - [ ] Performance improvement - [ ] Code refactoring (no functional changes) - [ ] Test updates - [ ] CI/CD or build process changes - [ ] Other (please describe): ## Checklist - [ ] I have performed a self-review of my own code - [ ] I have made corresponding changes to the documentation - [ ] New and existing unit tests pass locally with my changes ``` -------------------------------------------------------------------------------- /codecov.yml: -------------------------------------------------------------------------------- ```yaml codecov: require_ci_to_pass: true notify: wait_for_ci: true coverage: precision: 1 round: down range: "60...90" status: # For overall project coverage project: default: target: 80% # Target coverage of 80% threshold: 5% # Allow a 5% decrease without failing informational: true # Start as informational until coverage improves # For coverage of new/changed code in PRs patch: default: target: 80% threshold: 5% # Be a bit more lenient on PR patches informational: true # Start as informational until coverage improves # Don't fail if there are no changes changes: no # Configure PR comment comment: layout: "reach, diff, flags, files" behavior: default require_changes: false # Comment even if coverage doesn't change require_base: false require_head: true hide_project_coverage: false # Show both project and diff coverage # Ignore certain paths ignore: - "tests/**/*" # Don't count test files in coverage - "supabase_mcp/_version.py" # Auto-generated version file ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/queries/get_schemas.sql: -------------------------------------------------------------------------------- ```sql WITH t AS ( -- Regular tables SELECT schemaname AS schema_name, tablename AS table_name, 'regular' AS table_type FROM pg_tables UNION ALL -- Foreign tables SELECT foreign_table_schema AS schema_name, foreign_table_name AS table_name, 'foreign' AS table_type FROM information_schema.foreign_tables ) SELECT s.schema_name, COALESCE(PG_SIZE_PRETTY(SUM( COALESCE( CASE WHEN t.table_type = 'regular' THEN PG_TOTAL_RELATION_SIZE( QUOTE_IDENT(t.schema_name) || '.' || QUOTE_IDENT(t.table_name) ) ELSE 0 END, 0 ) )), '0 B') AS total_size, COUNT(t.table_name) AS table_count FROM information_schema.schemata AS s LEFT JOIN t ON s.schema_name = t.schema_name WHERE s.schema_name NOT IN ('pg_catalog', 'information_schema') AND s.schema_name NOT LIKE 'pg_%' AND s.schema_name NOT LIKE 'pg_toast%' GROUP BY s.schema_name ORDER BY COUNT(t.table_name) DESC, -- Schemas with most tables first total_size DESC, -- Then by size s.schema_name ASC; -- Then alphabetically ``` -------------------------------------------------------------------------------- /supabase_mcp/logger.py: -------------------------------------------------------------------------------- ```python import logging import logging.handlers from pathlib import Path def setup_logger() -> logging.Logger: """Configure logging for the MCP server with log rotation.""" logger = logging.getLogger("supabase-mcp") # Remove existing handlers to avoid duplicate logs if logger.hasHandlers(): logger.handlers.clear() # Define a consistent log directory in the user's home folder log_dir = Path.home() / ".local" / "share" / "supabase-mcp" log_dir.mkdir(parents=True, exist_ok=True) # Ensure the directory exists # Define the log file path log_file = log_dir / "mcp_server.log" # Create a rotating file handler # - Rotate when log reaches 5MB # - Keep 3 backup files file_handler = logging.handlers.RotatingFileHandler( log_file, maxBytes=5 * 1024 * 1024, # 5MB backupCount=3, encoding="utf-8", ) # Create formatter formatter = logging.Formatter("[%(asctime)s] %(levelname)-8s %(message)s", datefmt="%y/%m/%d %H:%M:%S") # Add formatter to file handler file_handler.setFormatter(formatter) # Add handler to logger logger.addHandler(file_handler) # Set level logger.setLevel(logging.DEBUG) return logger logger = setup_logger() ``` -------------------------------------------------------------------------------- /.github/workflows/docs/release-checklist.md: -------------------------------------------------------------------------------- ```markdown # Release Checklist Pre-release 1. Tests pass 2. CI passes 3. Build succeeds 4. Clean install succeeds 5. Documentation is up to date 6. Changelog is up to date 7. Tag and release on GitHub 8. Release is published to PyPI 9. Update dockerfile 10. Update .env.example (if necessary) Post-release - Clean install from PyPi works ## v0.3.12 - 2025-03-12 Pre-release 1. Tests pass - [X] 2. CI passes - [X] 3. Build succeeds - [X] 4. Documentation is up to date - [X] 5. Changelog is up to date - [X] 6. Tag and release on GitHub - [] 7. Release is published to PyPI - [] 8. Update dockerfile - [X] 9. Update .env.example (if necessary) - [X] Post-release 10. Clean install from PyPi works - [] ## v0.3.8 - 2025-03-07 Pre-release 1. Tests pass - [X] 2. CI passes - [X] 3. Build succeeds - [X] 4. Documentation is up to date - [X] 5. Changelog is up to date - [X] 6. Tag and release on GitHub 7. Release is published to PyPI 8. Update dockerfile - [X] 9. Update .env.example (if necessary) - [X] Post-release 10. Clean install from PyPi works - [X] ## v0.3.0 - 2025-02-22 1. Tests pass - [X] 2. CI passes - [X] 3. Build succeeds - [X] 4. Clean install succeeds - [X] 5. Documentation is up to date - [X] 6. Changelog is up to date - [X] 7. Tag and release on GitHub - [X] 8. Release is published to PyPI - [X] 9. Clean install from PyPI works - [X] ``` -------------------------------------------------------------------------------- /.github/ISSUE_TEMPLATE/bug_report.md: -------------------------------------------------------------------------------- ```markdown --- name: Bug Report about: Report an issue with the Supabase MCP server title: "An issue with doing X when Y under conditions Z" labels: bug assignees: alexander-zuev --- ## ⚠️ IMPORTANT NOTE The following types of reports will be closed immediately without investigation: - Vague reports like "Something is not working" without clear explanations - Issues missing reproduction steps or necessary context - Reports without essential information (logs, environment details) - Issues already covered in the README Please provide complete information as outlined below. ## Describe the bug A clear and concise description of what the bug is. ## Steps to Reproduce 1. 2. 3. ## Connection Details - Connection type: (Local or Remote) - Using password with special characters? (Yes/No) ## Screenshots If applicable, add screenshots to help explain your problem. ## Logs HIGHLY USEFUL: Attach server logs from: - macOS/Linux: ~/.local/share/supabase-mcp/mcp_server.log - Windows: %USERPROFILE%\.local\share\supabase-mcp\mcp_server.log You can get the last 50 lines with: ``` tail -n 50 ~/.local/share/supabase-mcp/mcp_server.log ``` ## Additional context Add any other context about the problem here. ## Checklist Mark completed items with an [x]: - [ ] I've included the server logs - [ ] I've checked the README troubleshooting section - [ ] I've verified my connection settings are correct ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml # Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml build: dockerBuildPath: . startCommand: type: stdio configSchema: type: object required: - supabaseProjectRef - supabaseDbPassword - supabaseRegion - queryApiKey properties: queryApiKey: type: string description: "(required) - Your Query API key" supabaseProjectRef: type: string description: "(required) - Supabase project reference ID. Defaults to: 127.0.0.1:54322" supabaseDbPassword: type: string description: "(required) - Database password" supabaseRegion: type: string description: "(required) - AWS region where your Supabase project is hosted - Default: us-east-1" supabaseAccessToken: type: string description: "(optional) - Personal access token for Supabase Management API - Default: none" supabaseServiceRoleKey: type: string description: "(optional) - Project Service Role Key for Auth Admin SDK - Default: none" commandFunction: |- (config) => ({ command: 'supabase-mcp-server', args: [], env: { QUERY_API_KEY: config.queryApiKey, SUPABASE_PROJECT_REF: config.supabaseProjectRef, SUPABASE_DB_PASSWORD: config.supabaseDbPassword, SUPABASE_REGION: config.supabaseRegion, SUPABASE_ACCESS_TOKEN: config.supabaseAccessToken, SUPABASE_SERVICE_ROLE_KEY: config.supabaseServiceRoleKey } }) ``` -------------------------------------------------------------------------------- /supabase_mcp/main.py: -------------------------------------------------------------------------------- ```python from collections.abc import AsyncGenerator from contextlib import asynccontextmanager from mcp.server.fastmcp import FastMCP from supabase_mcp.core.container import ServicesContainer from supabase_mcp.logger import logger from supabase_mcp.settings import settings from supabase_mcp.tools.registry import ToolRegistry # Create lifespan for the MCP server @asynccontextmanager async def lifespan(app: FastMCP) -> AsyncGenerator[FastMCP, None]: try: logger.info("Initializing services") # Initialize services services_container = ServicesContainer.get_instance() services_container.initialize_services(settings) # Register tools mcp = ToolRegistry(mcp=app, services_container=services_container).register_tools() yield mcp finally: logger.info("Shutting down services") services_container = ServicesContainer.get_instance() await services_container.shutdown_services() # Force kill the entire process - doesn't care about async contexts import os os._exit(0) # Use 0 for successful termination # Create mcp instance mcp = FastMCP("supabase", lifespan=lifespan) def run_server() -> None: logger.info("Starting Supabase MCP server") mcp.run() logger.info("This code runs only if I don't exit in lifespan") def run_inspector() -> None: """Inspector mode - same as mcp dev""" logger.info("Starting Supabase MCP server inspector") from mcp.cli.cli import dev return dev(__file__) if __name__ == "__main__": logger.info("Starting Supabase MCP server") run_server() ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/queries/get_table_schema.sql: -------------------------------------------------------------------------------- ```sql WITH pk AS ( SELECT ccu.column_name FROM information_schema.table_constraints AS tc INNER JOIN information_schema.constraint_column_usage AS ccu ON tc.constraint_name = ccu.constraint_name WHERE tc.table_schema = '{schema_name}' AND tc.table_name = '{table}' AND tc.constraint_type = 'PRIMARY KEY' ), fk AS ( SELECT kcu.column_name, ccu.table_name AS foreign_table_name, ccu.column_name AS foreign_column_name FROM information_schema.table_constraints AS tc INNER JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name INNER JOIN information_schema.constraint_column_usage AS ccu ON tc.constraint_name = ccu.constraint_name WHERE tc.table_schema = '{schema_name}' AND tc.table_name = '{table}' AND tc.constraint_type = 'FOREIGN KEY' ) SELECT DISTINCT c.column_name, c.data_type, c.is_nullable, c.column_default, c.ordinal_position, fk.foreign_table_name, fk.foreign_column_name, col_description(pc.oid, c.ordinal_position) AS column_description, coalesce(pk.column_name IS NOT NULL, FALSE) AS is_primary_key FROM information_schema.columns AS c INNER JOIN pg_class AS pc ON pc.relname = '{table}' AND pc.relnamespace = ( SELECT oid FROM pg_namespace WHERE nspname = '{schema_name}' ) LEFT JOIN pk ON c.column_name = pk.column_name LEFT JOIN fk ON c.column_name = fk.column_name WHERE c.table_schema = '{schema_name}' AND c.table_name = '{table}' ORDER BY c.ordinal_position; ``` -------------------------------------------------------------------------------- /.github/workflows/publish.yaml: -------------------------------------------------------------------------------- ```yaml name: Publish to PyPI on: release: types: [published] branches: [main] # Only trigger for releases from main env: UV_VERSION: "0.6.0" # Pin uv version to avoid breaking changes jobs: build-and-publish: runs-on: ubuntu-latest environment: name: pypi url: https://pypi.org/project/supabase-mcp-server/ permissions: id-token: write # Required for trusted publishing contents: read steps: - uses: actions/checkout@v4 with: fetch-depth: 0 # Required for proper version detection - name: Install uv uses: astral-sh/setup-uv@v5 with: version: ${{ env.UV_VERSION }} - name: Set up Python uses: actions/setup-python@v5 with: python-version: "3.12" - name: Build package run: uv build --no-sources - name: Verify package installation and entry points env: SUPABASE_PROJECT_REF: ${{ secrets.SUPABASE_PROJECT_REF }} SUPABASE_DB_PASSWORD: ${{ secrets.SUPABASE_DB_PASSWORD }} run: | # Create a new venv for testing uv venv source .venv/bin/activate # Install the built wheel uv pip install dist/*.whl echo "Testing supabase-mcp-server entry point..." # Run with --help to test basic functionality without needing actual connection if ! uv run supabase-mcp-server --help; then echo "❌ supabase-mcp-server --help failed" exit 1 fi echo "✅ supabase-mcp-server --help succeeded" - name: Publish to PyPI uses: pypa/gh-action-pypi-publish@release/v1 ``` -------------------------------------------------------------------------------- /.github/workflows/ci.yaml: -------------------------------------------------------------------------------- ```yaml name: CI on: push: branches: [ main ] pull_request: branches: [ main ] env: UV_VERSION: "0.6.1" # Pin uv version to avoid breaking changes jobs: test: runs-on: ubuntu-latest env: # Test environment variables - no real secrets needed anymore SUPABASE_PROJECT_REF: "abcdefghij1234567890" SUPABASE_DB_PASSWORD: "test-password-123" SUPABASE_ACCESS_TOKEN: "test-access-token" SUPABASE_SERVICE_ROLE_KEY: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c" steps: - uses: actions/checkout@v4 - name: Set up Python 3.12 uses: actions/setup-python@v5 with: python-version: "3.12" - name: Install uv uses: astral-sh/setup-uv@v5 with: version: ${{ env.UV_VERSION }} - name: Create venv and install dependencies run: | # Create venv and install dependencies uv venv source .venv/bin/activate uv sync --group dev --frozen - name: Run tests run: | source .venv/bin/activate # necessary for pytest pytest --cov=supabase_mcp --cov-report=xml --cov-report=term - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 with: token: ${{ secrets.CODECOV_TOKEN }} files: ./coverage.xml fail_ci_if_error: false - name: Build distribution packages run: | uv build --no-sources # Verify dist contains both wheel and tar.gz test -f dist/*.whl test -f dist/*.tar.gz ``` -------------------------------------------------------------------------------- /tests/test_container.py: -------------------------------------------------------------------------------- ```python from typing import Any, cast from mcp.server.fastmcp import FastMCP from supabase_mcp.core.container import ServicesContainer from supabase_mcp.settings import Settings class TestContainer: """Tests for the Container class functionality.""" def test_container_initialization(self, container_integration: ServicesContainer): """Test that the container is properly initialized with all services.""" # Verify all services are properly initialized assert container_integration.postgres_client is not None assert container_integration.api_client is not None assert container_integration.sdk_client is not None assert container_integration.api_manager is not None assert container_integration.safety_manager is not None assert container_integration.query_manager is not None assert container_integration.tool_manager is not None assert container_integration.mcp_server is not None def test_container_initialize_method(self, settings_integration: Settings, mock_mcp_server: Any): """Test the initialize method creates all services properly.""" # Create empty container container = ServicesContainer(mcp_server=cast(FastMCP, mock_mcp_server)) # Initialize with settings container.initialize_services(settings_integration) # Verify all services were created assert container.postgres_client is not None assert container.api_client is not None assert container.sdk_client is not None assert container.api_manager is not None assert container.safety_manager is not None assert container.query_manager is not None assert container.tool_manager is not None assert container.mcp_server is not None ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/queries/get_tables.sql: -------------------------------------------------------------------------------- ```sql ( -- Regular tables & views: full metadata available SELECT t.table_name, t.table_type, obj_description(pc.oid) AS description, pg_total_relation_size(format('%I.%I', t.table_schema, t.table_name))::bigint AS size_bytes, pg_stat_get_live_tuples(pc.oid)::bigint AS row_count, ( SELECT count(*) FROM information_schema.columns AS c WHERE c.table_schema = t.table_schema AND c.table_name = t.table_name ) AS column_count, ( SELECT count(*) FROM pg_indexes AS i WHERE i.schemaname = t.table_schema AND i.tablename = t.table_name ) AS index_count FROM information_schema.tables AS t INNER JOIN pg_class AS pc ON t.table_name = pc.relname AND pc.relnamespace = ( SELECT oid FROM pg_namespace WHERE nspname = '{schema_name}' ) WHERE t.table_schema = '{schema_name}' AND t.table_type IN ('BASE TABLE', 'VIEW') ) UNION ALL ( -- Foreign tables: limited metadata (size & row count functions don't apply) SELECT ft.foreign_table_name AS table_name, 'FOREIGN TABLE' AS table_type, ( SELECT obj_description( (quote_ident(ft.foreign_table_schema) || '.' || quote_ident(ft.foreign_table_name))::regclass ) ) AS description, 0::bigint AS size_bytes, NULL::bigint AS row_count, ( SELECT count(*) FROM information_schema.columns AS c WHERE c.table_schema = ft.foreign_table_schema AND c.table_name = ft.foreign_table_name ) AS column_count, 0 AS index_count FROM information_schema.foreign_tables AS ft WHERE ft.foreign_table_schema = '{schema_name}' ) ORDER BY size_bytes DESC; ``` -------------------------------------------------------------------------------- /supabase_mcp/tools/descriptions/sdk_tools.yaml: -------------------------------------------------------------------------------- ```yaml # Python SDK tools descriptions get_auth_admin_methods_spec: | Get Python SDK methods specification for Auth Admin. Returns a comprehensive dictionary of all Auth Admin methods available in the Supabase Python SDK, including: - Method names and descriptions - Required and optional parameters for each method - Parameter types and constraints - Return value information This tool is useful for exploring the capabilities of the Auth Admin SDK and understanding how to properly format parameters for the call_auth_admin_method tool. No parameters required. call_auth_admin_method: | Call an Auth Admin method from Supabase Python SDK. This tool provides a safe, validated interface to the Supabase Auth Admin SDK, allowing you to: - Manage users (create, update, delete) - List and search users - Generate authentication links - Manage multi-factor authentication - And more IMPORTANT NOTES: - Request bodies must adhere to the Python SDK specification - Some methods may have nested parameter structures - The tool validates all parameters against Pydantic models - Extra fields not defined in the models will be rejected AVAILABLE METHODS: - get_user_by_id: Retrieve a user by their ID - list_users: List all users with pagination - create_user: Create a new user - delete_user: Delete a user by their ID - invite_user_by_email: Send an invite link to a user's email - generate_link: Generate an email link for various authentication purposes - update_user_by_id: Update user attributes by ID - delete_factor: Delete a factor on a user EXAMPLES: 1. Get user by ID: method: "get_user_by_id" params: {"uid": "user-uuid-here"} 2. Create user: method: "create_user" params: { "email": "[email protected]", "password": "secure-password" } 3. Update user by ID: method: "update_user_by_id" params: { "uid": "user-uuid-here", "attributes": { "email": "[email protected]" } } For complete documentation of all methods and their parameters, use the get_auth_admin_methods_spec tool. ``` -------------------------------------------------------------------------------- /supabase_mcp/services/sdk/auth_admin_models.py: -------------------------------------------------------------------------------- ```python from typing import Any, Literal from pydantic import BaseModel, model_validator class GetUserByIdParams(BaseModel): uid: str class ListUsersParams(BaseModel): page: int | None = 1 per_page: int | None = 50 class UserMetadata(BaseModel): email: str | None = None email_verified: bool | None = None phone_verified: bool | None = None sub: str | None = None class AdminUserAttributes(BaseModel): email: str | None = None password: str | None = None email_confirm: bool | None = False phone: str | None = None phone_confirm: bool | None = False user_metadata: UserMetadata | None = None app_metadata: dict[str, Any] | None = None role: str | None = None ban_duration: str | None = None nonce: str | None = None class CreateUserParams(AdminUserAttributes): pass @model_validator(mode="after") def check_email_or_phone(self) -> "CreateUserParams": if not self.email and not self.phone: raise ValueError("Either email or phone must be provided") return self class DeleteUserParams(BaseModel): id: str should_soft_delete: bool | None = False class InviteUserByEmailParams(BaseModel): email: str options: dict[str, Any] | None = None class GenerateLinkParams(BaseModel): type: Literal[ "signup", "invite", "magiclink", "recovery", "email_change_current", "email_change_new", "phone_change" ] email: str password: str | None = None new_email: str | None = None options: dict[str, Any] | None = None @model_validator(mode="after") def validate_required_fields(self) -> "GenerateLinkParams": # Check password for signup if self.type == "signup" and not self.password: raise ValueError("Password is required for signup links") # Check new_email for email change if self.type in ["email_change_current", "email_change_new"] and not self.new_email: raise ValueError("new_email is required for email change links") return self class UpdateUserByIdParams(BaseModel): uid: str attributes: AdminUserAttributes class DeleteFactorParams(BaseModel): id: str user_id: str # Map method names to their parameter models PARAM_MODELS = { "get_user_by_id": GetUserByIdParams, "list_users": ListUsersParams, "create_user": CreateUserParams, "delete_user": DeleteUserParams, "invite_user_by_email": InviteUserByEmailParams, "generate_link": GenerateLinkParams, "update_user_by_id": UpdateUserByIdParams, "delete_factor": DeleteFactorParams, } ``` -------------------------------------------------------------------------------- /supabase_mcp/clients/api_client.py: -------------------------------------------------------------------------------- ```python import httpx from pydantic import BaseModel from supabase_mcp.clients.base_http_client import AsyncHTTPClient from supabase_mcp.logger import logger from supabase_mcp.settings import settings class ApiRoutes: """Routes for the Query API""" FEATURES_ACCESS = "/features/{feature_name}/access" class FeatureAccessRequest(BaseModel): """Request for feature access. Later can be extended with additional metadata.""" feature_name: str class FeatureAccessResponse(BaseModel): """Response for feature access. Later can be extended with additional metadata.""" access_granted: bool class ApiClient(AsyncHTTPClient): """Client for communicating with the Query API server for premium features. To preserve backwards compatibility and ensure a smooth UX for existing users, API key is not required as of now. """ def __init__( self, query_api_key: str | None = None, query_api_url: str | None = None, ): """Initialize the Query API client""" self.query_api_key = query_api_key or settings.query_api_key self.query_api_url = query_api_url or settings.query_api_url self._check_api_key_set() self.client: httpx.AsyncClient | None = None logger.info( f"✔️ Query API client initialized successfully with URL: {self.query_api_url}, with key: {bool(self.query_api_key)}" ) async def _ensure_client(self) -> httpx.AsyncClient: """Ensure client exists and is ready for use. Creates the client if it doesn't exist yet. Returns the client instance. """ if self.client is None: logger.info("Creating new Query API client") self.client = httpx.AsyncClient( base_url=self.query_api_url, headers={"X-API-Key": f"{self.query_api_key}"}, timeout=30.0, ) logger.info("Returning existing Query API client") return self.client async def close(self) -> None: """Close the client and release resources.""" if self.client: await self.client.aclose() logger.info("Query API client closed") def _check_api_key_set(self) -> None: """Check if the API key is set""" if not self.query_api_key: logger.warning("Query API key is not set. Only free features will be available.") return async def check_feature_access(self, feature_name: str) -> FeatureAccessResponse: """Check if the feature is available for the user""" try: result = await self.execute_request( method="GET", path=ApiRoutes.FEATURES_ACCESS.format(feature_name=feature_name), ) logger.debug(f"Feature access response: {result}") return FeatureAccessResponse.model_validate(result) except Exception as e: logger.error(f"Error checking feature access: {e}") raise e ``` -------------------------------------------------------------------------------- /supabase_mcp/tools/manager.py: -------------------------------------------------------------------------------- ```python from __future__ import annotations from enum import Enum from pathlib import Path import yaml from supabase_mcp.logger import logger class ToolName(str, Enum): """Enum of all available tools in the Supabase MCP server.""" # Database tools GET_SCHEMAS = "get_schemas" GET_TABLES = "get_tables" GET_TABLE_SCHEMA = "get_table_schema" EXECUTE_POSTGRESQL = "execute_postgresql" RETRIEVE_MIGRATIONS = "retrieve_migrations" # Safety tools LIVE_DANGEROUSLY = "live_dangerously" CONFIRM_DESTRUCTIVE_OPERATION = "confirm_destructive_operation" # Management API tools SEND_MANAGEMENT_API_REQUEST = "send_management_api_request" GET_MANAGEMENT_API_SPEC = "get_management_api_spec" # Auth Admin tools GET_AUTH_ADMIN_METHODS_SPEC = "get_auth_admin_methods_spec" CALL_AUTH_ADMIN_METHOD = "call_auth_admin_method" # Logs & Analytics tools RETRIEVE_LOGS = "retrieve_logs" class ToolManager: """Manager for tool descriptions and registration. This class is responsible for loading tool descriptions from YAML files and providing them to the main application. """ _instance: ToolManager | None = None # Singleton instance def __init__(self) -> None: """Initialize the tool manager.""" self.descriptions: dict[str, str] = {} self._load_descriptions() @classmethod def get_instance(cls) -> ToolManager: """Get or create the singleton instance of ToolManager.""" if cls._instance is None: cls._instance = cls() return cls._instance @classmethod def reset(cls) -> None: """Reset the singleton instance of ToolManager.""" if cls._instance is not None: cls._instance = None logger.info("ToolManager instance reset complete") def _load_descriptions(self) -> None: """Load tool descriptions from YAML files.""" # Path to the descriptions directory descriptions_dir = Path(__file__).parent / "descriptions" # Check if the directory exists if not descriptions_dir.exists(): raise FileNotFoundError(f"Tool descriptions directory not found: {descriptions_dir}") # Load all YAML files in the directory for yaml_file in descriptions_dir.glob("*.yaml"): try: with open(yaml_file) as f: tool_descriptions = yaml.safe_load(f) if tool_descriptions: self.descriptions.update(tool_descriptions) except Exception as e: print(f"Error loading tool descriptions from {yaml_file}: {e}") def get_description(self, tool_name: str) -> str: """Get the description for a specific tool. Args: tool_name: The name of the tool to get the description for. Returns: The description of the tool, or an empty string if not found. """ return self.descriptions.get(tool_name, "") ``` -------------------------------------------------------------------------------- /supabase_mcp/exceptions.py: -------------------------------------------------------------------------------- ```python from typing import Any class DatabaseError(Exception): """Base class for database-related errors.""" pass class ConnectionError(DatabaseError): """Raised when a database connection fails.""" pass class PermissionError(DatabaseError): """Raised when a database operation is not permitted.""" pass class QueryError(DatabaseError): """Raised when a database query fails.""" pass class TimeoutError(DatabaseError): """Raised when a database operation times out.""" pass class ValidationError(Exception): """Raised when validation fails.""" pass class SafetyError(Exception): """Raised when a safety check fails.""" pass class OperationNotAllowedError(SafetyError): """Raised when an operation is not allowed in the current safety mode.""" pass class ConfirmationRequiredError(SafetyError): """Raised when a user needs to confirm destructive SQL operation""" pass class APIError(Exception): """Base class for API-related errors.""" def __init__( self, message: str, status_code: int | None = None, response_body: dict[str, Any] | None = None, ): self.status_code = status_code self.response_body = response_body super().__init__(message) class APIConnectionError(APIError): """Raised when an API connection fails.""" pass class PythonSDKError(Exception): """Raised when a Python SDK operation fails.""" pass class APIResponseError(APIError): """Raised when an API response is invalid.""" pass class APIClientError(APIError): """Raised when an API client error occurs.""" pass class APIServerError(APIError): """Raised when an API server error occurs.""" pass class UnexpectedError(APIError): """Raised when an unexpected error occurs.""" pass class FeatureAccessError(APIError): """Raised when a user does not have access to a premium feature.""" def __init__( self, feature_name: str, status_code: int | None = None, response_body: dict[str, Any] | None = None, ): message = ( f"This feature '{feature_name}' is available in our Pro plan. " f"Upgrade at https://thequery.dev to unlock advanced capabilities " f"and take your database experience to the next level!" ) super().__init__(message, status_code, response_body) class FeatureTemporaryError(APIError): """Raised when a feature check encounters a temporary error.""" def __init__( self, feature_name: str, status_code: int | None = None, response_body: dict[str, Any] | None = None, ): message = ( f"We couldn't verify access to '{feature_name}' at the moment. " f"Please try again in a few moments. If this persists, " f"check your connection or visit https://thequery.dev/status for updates." ) super().__init__(message, status_code, response_body) ``` -------------------------------------------------------------------------------- /tests/services/safety/test_sql_safety_config.py: -------------------------------------------------------------------------------- ```python """ Unit tests for the SQLSafetyConfig class. This file contains unit test cases for the SQLSafetyConfig class, which is responsible for determining the risk level of SQL operations and whether they are allowed or require confirmation. """ from unittest.mock import MagicMock import pytest from supabase_mcp.services.safety.models import OperationRiskLevel, SafetyMode from supabase_mcp.services.safety.safety_configs import SQLSafetyConfig @pytest.mark.unit class TestSQLSafetyConfig: """Unit tests for the SQLSafetyConfig class.""" def test_get_risk_level(self): """Test that get_risk_level returns the highest_risk_level from the operation.""" config = SQLSafetyConfig() # Create mock QueryValidationResults objects with different risk levels low_risk_op = MagicMock() low_risk_op.highest_risk_level = OperationRiskLevel.LOW medium_risk_op = MagicMock() medium_risk_op.highest_risk_level = OperationRiskLevel.MEDIUM high_risk_op = MagicMock() high_risk_op.highest_risk_level = OperationRiskLevel.HIGH extreme_risk_op = MagicMock() extreme_risk_op.highest_risk_level = OperationRiskLevel.EXTREME # Test that the risk level is correctly returned assert config.get_risk_level(low_risk_op) == OperationRiskLevel.LOW assert config.get_risk_level(medium_risk_op) == OperationRiskLevel.MEDIUM assert config.get_risk_level(high_risk_op) == OperationRiskLevel.HIGH assert config.get_risk_level(extreme_risk_op) == OperationRiskLevel.EXTREME def test_is_operation_allowed(self): """Test if operations are allowed based on risk level and safety mode. This tests the behavior inherited from SafetyConfigBase. """ config = SQLSafetyConfig() # Low risk operations should be allowed in both safe and unsafe modes assert config.is_operation_allowed(OperationRiskLevel.LOW, SafetyMode.SAFE) is True assert config.is_operation_allowed(OperationRiskLevel.LOW, SafetyMode.UNSAFE) is True # Medium/high risk operations should only be allowed in unsafe mode assert config.is_operation_allowed(OperationRiskLevel.MEDIUM, SafetyMode.SAFE) is False assert config.is_operation_allowed(OperationRiskLevel.MEDIUM, SafetyMode.UNSAFE) is True assert config.is_operation_allowed(OperationRiskLevel.HIGH, SafetyMode.SAFE) is False assert config.is_operation_allowed(OperationRiskLevel.HIGH, SafetyMode.UNSAFE) is True # Extreme risk operations are never allowed assert config.is_operation_allowed(OperationRiskLevel.EXTREME, SafetyMode.SAFE) is False assert config.is_operation_allowed(OperationRiskLevel.EXTREME, SafetyMode.UNSAFE) is False def test_needs_confirmation(self): """Test if operations need confirmation based on risk level. This tests the behavior inherited from SafetyConfigBase. """ config = SQLSafetyConfig() # Low and medium risk operations should not need confirmation assert config.needs_confirmation(OperationRiskLevel.LOW) is False assert config.needs_confirmation(OperationRiskLevel.MEDIUM) is False # High risk operations should need confirmation assert config.needs_confirmation(OperationRiskLevel.HIGH) is True # Extreme risk operations should need confirmation assert config.needs_confirmation(OperationRiskLevel.EXTREME) is True ``` -------------------------------------------------------------------------------- /supabase_mcp/tools/descriptions/logs_and_analytics_tools.yaml: -------------------------------------------------------------------------------- ```yaml # Tools related to logs and Advisor analytics retrieve_logs: | Retrieve logs from your Supabase project's services for debugging and monitoring. Returns log entries from various Supabase services with timestamps, messages, and metadata. This tool provides access to the same logs available in the Supabase dashboard's Logs & Analytics section. AVAILABLE LOG COLLECTIONS: - postgres: Database server logs including queries, errors, warnings, and system messages - api_gateway: API requests, responses, and errors processed by the Kong API gateway - auth: Authentication and authorization logs for sign-ups, logins, and token operations - postgrest: Logs from the RESTful API service that exposes your PostgreSQL database - pooler: Connection pooling logs from pgbouncer and supavisor services - storage: Object storage service logs for file uploads, downloads, and permissions - realtime: Logs from the real-time subscription service for WebSocket connections - edge_functions: Serverless function execution logs including invocations and errors - cron: Scheduled job logs (can be queried through postgres logs with specific filters) - pgbouncer: Connection pooler logs PARAMETERS: - collection: The log collection to query (required, one of the values listed above) - limit: Maximum number of log entries to return (default: 20) - hours_ago: Retrieve logs from the last N hours (default: 1) - filters: List of filter objects with field, operator, and value (default: []) Format: [{"field": "field_name", "operator": "=", "value": "value"}] - search: Text to search for in event messages (default: "") - custom_query: Complete custom SQL query to execute instead of the pre-built queries (default: "") HOW IT WORKS: This tool makes a request to the Supabase Management API endpoint for logs, sending either a pre-built optimized query for the selected collection or your custom query. Each log collection has a specific table structure and metadata format that requires appropriate CROSS JOIN UNNEST operations to access nested fields. EXAMPLES: 1. Using pre-built parameters: collection: "postgres" limit: 20 hours_ago: 24 filters: [{"field": "parsed.error_severity", "operator": "=", "value": "ERROR"}] search: "connection" 2. Using a custom query: collection: "edge_functions" custom_query: "SELECT id, timestamp, event_message, m.function_id, m.execution_time_ms FROM function_edge_logs CROSS JOIN unnest(metadata) AS m WHERE m.execution_time_ms > 1000 ORDER BY timestamp DESC LIMIT 10" METADATA STRUCTURE: The metadata structure is important because it determines how to access nested fields in filters: - postgres_logs: Use "parsed.field_name" for fields like error_severity, query, application_name - edge_logs: Use "request.field_name" or "response.field_name" for HTTP details - function_edge_logs: Use "function_id", "execution_time_ms" for function metrics NOTE FOR LLM CLIENTS: When encountering errors with field access, examine the error message to see what fields are actually available in the structure. Start with basic fields before accessing nested metadata. SAFETY CONSIDERATIONS: - This is a low-risk read operation that can be executed in SAFE mode - Requires a valid Supabase Personal Access Token to be configured - Not available for local Supabase instances (requires cloud deployment) retrieve_advisor_analytics: | Get advisor analytics from the database. ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/models.py: -------------------------------------------------------------------------------- ```python from enum import Enum from pydantic import BaseModel, Field from supabase_mcp.services.safety.models import OperationRiskLevel class SQLQueryCategory(str, Enum): """Category of the SQL query tracked by the SQL validator""" DQL = "DQL" # Data Query Language (SELECT) DML = "DML" # Data Manipulation Language (INSERT, UPDATE, DELETE) DDL = "DDL" # Data Definition Language (CREATE, ALTER, DROP) TCL = "TCL" # Transaction Control Language (BEGIN, COMMIT, ROLLBACK) DCL = "DCL" # Data Control Language (GRANT, REVOKE) POSTGRES_SPECIFIC = "POSTGRES_SPECIFIC" # PostgreSQL-specific commands OTHER = "OTHER" # Other commands not fitting into the categories above class SQLQueryCommand(str, Enum): """Command of the SQL query tracked by the SQL validator""" # DQL Commands SELECT = "SELECT" # DML Commands INSERT = "INSERT" UPDATE = "UPDATE" DELETE = "DELETE" MERGE = "MERGE" # DDL Commands CREATE = "CREATE" ALTER = "ALTER" DROP = "DROP" TRUNCATE = "TRUNCATE" COMMENT = "COMMENT" RENAME = "RENAME" # DCL Commands GRANT = "GRANT" REVOKE = "REVOKE" # TCL Commands (for tracking, not query types) BEGIN = "BEGIN" COMMIT = "COMMIT" ROLLBACK = "ROLLBACK" SAVEPOINT = "SAVEPOINT" # PostgreSQL-specific Commands VACUUM = "VACUUM" ANALYZE = "ANALYZE" EXPLAIN = "EXPLAIN" COPY = "COPY" LISTEN = "LISTEN" NOTIFY = "NOTIFY" PREPARE = "PREPARE" EXECUTE = "EXECUTE" DEALLOCATE = "DEALLOCATE" # Other/Unknown UNKNOWN = "UNKNOWN" class ValidatedStatement(BaseModel): """Result of the query validation for a single SQL statement.""" category: SQLQueryCategory = Field( ..., description="The category of SQL statement (DQL, DML, DDL, etc.) derived from pglast parse tree" ) risk_level: OperationRiskLevel = Field( ..., description="The risk level associated with this statement based on category and command" ) command: SQLQueryCommand = Field( ..., description="The specific SQL command (SELECT, INSERT, CREATE, etc.) extracted from parse tree" ) object_type: str | None = Field( None, description="The type of object being operated on (TABLE, INDEX, etc.) when available" ) schema_name: str | None = Field(None, description="The schema name for the objects in the statement when available") needs_migration: bool = Field( ..., description="Whether this statement requires a migration based on statement type and safety rules" ) query: str | None = Field(None, description="The actual SQL text for this statement extracted from original query") class QueryValidationResults(BaseModel): """Result of the batch validation for one or more SQL statements.""" statements: list[ValidatedStatement] = Field( default_factory=list, description="List of validated statements from the query built during validation" ) highest_risk_level: OperationRiskLevel = Field( default=OperationRiskLevel.LOW, description="The highest risk level among all statements in the batch" ) has_transaction_control: bool = Field( default=False, description="Whether the query contains transaction control statements (BEGIN, COMMIT, etc.)" ) original_query: str = Field(..., description="The original SQL query text as provided by the user") def needs_migration(self) -> bool: """Check if any statement in the batch needs migration.""" return any(stmt.needs_migration for stmt in self.statements) ``` -------------------------------------------------------------------------------- /supabase_mcp/tools/descriptions/safety_tools.yaml: -------------------------------------------------------------------------------- ```yaml # Safety tools descriptions live_dangerously: | Toggle unsafe mode for either Management API or Database operations. WHAT THIS TOOL DOES: This tool switches between safe (default) and unsafe operation modes for either the Management API or Database operations. SAFETY MODES EXPLAINED: 1. Database Safety Modes: - SAFE mode (default): Only low-risk operations like SELECT queries are allowed - UNSAFE mode: Higher-risk operations including INSERT, UPDATE, DELETE, and schema changes are permitted 2. API Safety Modes: - SAFE mode (default): Only low-risk operations that don't modify state are allowed - UNSAFE mode: Higher-risk state-changing operations are permitted (except those explicitly blocked for safety) OPERATION RISK LEVELS: The system categorizes operations by risk level: - LOW: Safe read operations with minimal impact - MEDIUM: Write operations that modify data but don't change structure - HIGH: Operations that modify database structure or important system settings - EXTREME: Destructive operations that could cause data loss or service disruption WHEN TO USE THIS TOOL: - Use this tool BEFORE attempting write operations or schema changes - Enable unsafe mode only when you need to perform data modifications - Always return to safe mode after completing write operations USAGE GUIDELINES: - Start in safe mode by default for exploration and analysis - Switch to unsafe mode only when you need to make changes - Be specific about which service you're enabling unsafe mode for - Consider the risks before enabling unsafe mode, especially for database operations - For database operations requiring schema changes, you'll need to enable unsafe mode first Parameters: - service: Which service to toggle ("api" or "database") - enable_unsafe_mode: True to enable unsafe mode, False for safe mode (default: False) Examples: 1. Enable database unsafe mode: live_dangerously(service="database", enable_unsafe_mode=True) 2. Return to safe mode after operations: live_dangerously(service="database", enable_unsafe_mode=False) 3. Enable API unsafe mode: live_dangerously(service="api", enable_unsafe_mode=True) Note: This tool affects ALL subsequent operations for the specified service until changed again. confirm_destructive_operation: | Execute a destructive database or API operation after confirmation. Use this only after reviewing the risks with the user. HOW IT WORKS: - This tool executes a previously rejected high-risk operation using its confirmation ID - The operation will be exactly the same as the one that generated the ID - No need to retype the query or api request params - the system remembers it STEPS: 1. Explain the risks to the user and get their approval 2. Use this tool with the confirmation ID from the error message 3. The original query will be executed as-is PARAMETERS: - operation_type: Type of operation ("api" or "database") - confirmation_id: The ID provided in the error message (required) - user_confirmation: Set to true to confirm execution (default: false) NOTE: Confirmation IDs expire after 5 minutes for security get_management_api_safety_rules: | Get all safety rules for the Supabase Management API. Returns a comprehensive overview of all safety rules applied to the Management API, including: - Blocked operations (never allowed) - Unsafe operations (allowed only in unsafe mode) - Safe operations (always allowed) Each rule includes: - The HTTP method and path pattern - A human-readable explanation of why the operation has its safety designation - The safety level assigned to the operation This information helps you understand which operations require unsafe mode and why certain operations might be completely blocked for safety reasons. ``` -------------------------------------------------------------------------------- /supabase_mcp/core/container.py: -------------------------------------------------------------------------------- ```python from __future__ import annotations from mcp.server.fastmcp import FastMCP from supabase_mcp.clients.api_client import ApiClient from supabase_mcp.clients.management_client import ManagementAPIClient from supabase_mcp.clients.sdk_client import SupabaseSDKClient from supabase_mcp.core.feature_manager import FeatureManager from supabase_mcp.logger import logger from supabase_mcp.services.api.api_manager import SupabaseApiManager from supabase_mcp.services.database.postgres_client import PostgresClient from supabase_mcp.services.database.query_manager import QueryManager from supabase_mcp.services.logs.log_manager import LogManager from supabase_mcp.services.safety.safety_manager import SafetyManager from supabase_mcp.settings import Settings from supabase_mcp.tools import ToolManager class ServicesContainer: """Container for all services""" _instance: ServicesContainer | None = None def __init__( self, mcp_server: FastMCP | None = None, postgres_client: PostgresClient | None = None, api_client: ManagementAPIClient | None = None, sdk_client: SupabaseSDKClient | None = None, api_manager: SupabaseApiManager | None = None, safety_manager: SafetyManager | None = None, query_manager: QueryManager | None = None, tool_manager: ToolManager | None = None, log_manager: LogManager | None = None, query_api_client: ApiClient | None = None, feature_manager: FeatureManager | None = None, ) -> None: """Create a new container container reference""" self.postgres_client = postgres_client self.api_client = api_client self.api_manager = api_manager self.sdk_client = sdk_client self.safety_manager = safety_manager self.query_manager = query_manager self.tool_manager = tool_manager self.log_manager = log_manager self.query_api_client = query_api_client self.feature_manager = feature_manager self.mcp_server = mcp_server @classmethod def get_instance(cls) -> ServicesContainer: """Get the singleton instance of the container""" if cls._instance is None: cls._instance = cls() return cls._instance def initialize_services(self, settings: Settings) -> None: """Initializes all services in a synchronous manner to satisfy MCP runtime requirements""" # Create clients self.postgres_client = PostgresClient.get_instance(settings=settings) self.api_client = ManagementAPIClient(settings=settings) # not a singleton, simple self.sdk_client = SupabaseSDKClient.get_instance(settings=settings) # Create managers self.safety_manager = SafetyManager.get_instance() self.api_manager = SupabaseApiManager.get_instance( api_client=self.api_client, safety_manager=self.safety_manager, ) self.query_manager = QueryManager( postgres_client=self.postgres_client, safety_manager=self.safety_manager, ) self.tool_manager = ToolManager.get_instance() # Register safety configs self.safety_manager.register_safety_configs() # Create query api client self.query_api_client = ApiClient() self.feature_manager = FeatureManager(self.query_api_client) logger.info("✓ All services initialized successfully.") async def shutdown_services(self) -> None: """Properly close all relevant clients and connections""" # Postgres client if self.postgres_client: await self.postgres_client.close() # API clients if self.query_api_client: await self.query_api_client.close() if self.api_client: await self.api_client.close() # SDK client if self.sdk_client: await self.sdk_client.close() ``` -------------------------------------------------------------------------------- /tests/services/database/sql/test_loader.py: -------------------------------------------------------------------------------- ```python import os from pathlib import Path from unittest.mock import mock_open, patch import pytest from supabase_mcp.services.database.sql.loader import SQLLoader @pytest.mark.unit class TestSQLLoader: """Unit tests for the SQLLoader class.""" def test_load_sql_with_extension(self): """Test loading SQL with file extension provided.""" mock_sql = "SELECT * FROM test;" with patch("builtins.open", mock_open(read_data=mock_sql)): with patch.object(Path, "exists", return_value=True): result = SQLLoader.load_sql("test.sql") assert result == mock_sql def test_load_sql_without_extension(self): """Test loading SQL without file extension provided.""" mock_sql = "SELECT * FROM test;" with patch("builtins.open", mock_open(read_data=mock_sql)): with patch.object(Path, "exists", return_value=True): result = SQLLoader.load_sql("test") assert result == mock_sql def test_load_sql_file_not_found(self): """Test loading SQL when file doesn't exist.""" with patch.object(Path, "exists", return_value=False): with pytest.raises(FileNotFoundError): SQLLoader.load_sql("nonexistent") def test_get_schemas_query(self): """Test getting schemas query.""" mock_sql = "SELECT * FROM schemas;" with patch.object(SQLLoader, "load_sql", return_value=mock_sql): result = SQLLoader.get_schemas_query() assert result == mock_sql def test_get_tables_query(self): """Test getting tables query with schema replacement.""" mock_sql = "SELECT * FROM {schema_name}.tables;" expected = "SELECT * FROM test_schema.tables;" with patch.object(SQLLoader, "load_sql", return_value=mock_sql): result = SQLLoader.get_tables_query("test_schema") assert result == expected def test_get_table_schema_query(self): """Test getting table schema query with replacements.""" mock_sql = "SELECT * FROM {schema_name}.{table};" expected = "SELECT * FROM test_schema.test_table;" with patch.object(SQLLoader, "load_sql", return_value=mock_sql): result = SQLLoader.get_table_schema_query("test_schema", "test_table") assert result == expected def test_get_migrations_query(self): """Test getting migrations query with all parameters.""" mock_sql = "SELECT * FROM migrations WHERE name LIKE '%{name_pattern}%' LIMIT {limit} OFFSET {offset} AND include_queries = {include_full_queries};" expected = "SELECT * FROM migrations WHERE name LIKE '%test%' LIMIT 10 OFFSET 5 AND include_queries = true;" with patch.object(SQLLoader, "load_sql", return_value=mock_sql): result = SQLLoader.get_migrations_query(limit=10, offset=5, name_pattern="test", include_full_queries=True) assert result == expected def test_get_init_migrations_query(self): """Test getting init migrations query.""" mock_sql = "CREATE SCHEMA IF NOT EXISTS migrations;" with patch.object(SQLLoader, "load_sql", return_value=mock_sql): result = SQLLoader.get_init_migrations_query() assert result == mock_sql def test_get_create_migration_query(self): """Test getting create migration query with replacements.""" mock_sql = "INSERT INTO migrations VALUES ('{version}', '{name}', ARRAY['{statements}']);" expected = "INSERT INTO migrations VALUES ('20230101', 'test_migration', ARRAY['SELECT 1;']);" with patch.object(SQLLoader, "load_sql", return_value=mock_sql): result = SQLLoader.get_create_migration_query( version="20230101", name="test_migration", statements="SELECT 1;" ) assert result == expected def test_sql_dir_path(self): """Test that SQL_DIR points to the correct location.""" expected_path = Path(SQLLoader.__module__.replace(".", os.sep)).parent / "queries" assert str(SQLLoader.SQL_DIR).endswith(str(expected_path)) ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [build-system] requires = ["hatchling", "hatch-vcs"] build-backend = "hatchling.build" [project] name = "supabase-mcp-server" dynamic = ["version"] description = "Community Supabase MCP server that enables Cursor and Windsurf to end-to-end manage your Supabase project, execute SQL queries, and more." readme = "README.md" requires-python = ">=3.12" dependencies = [ "asyncpg>=0.30.0", "logfire[system-metrics]>=3.12.0", "mcp[cli]>=1.4.1", "pglast>=7.3", "pyyaml>=6.0.2", "supabase>=2.13.0", "tenacity>=9.0.0", ] authors = [ {name = "Alexander Zuev", email = "[email protected]"} ] keywords = ["supabase", "mcp", "cursor", "windsurf", "model-context-protocol", "claude", "cline"] license = "Apache-2.0" classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Programming Language :: Python :: 3.12", "Topic :: Software Development :: Libraries :: Python Modules", "Topic :: Database :: Database Engines/Servers", ] [project.urls] Homepage = "https://github.com/alexander-zuev/supabase-mcp-server" Repository = "https://github.com/alexander-zuev/supabase-mcp-server.git" Changelog = "https://github.com/alexander-zuev/supabase-mcp-server/blob/main/CHANGELOG.MD" Documentation = "https://github.com/alexander-zuev/supabase-mcp-server#readme" [tool.hatch.build.targets.wheel] packages = ["supabase_mcp"] [tool.uv] package = true [tool.hatch.version] source = "vcs" raw-options = { version_scheme = "no-guess-dev" } [tool.hatch.build.hooks.vcs] version-file = "supabase_mcp/_version.py" [project.scripts] supabase-mcp-server = "supabase_mcp.main:run_server" supabase-mcp-inspector = "supabase_mcp.main:run_inspector" # Configure PyPI publishing [[tool.uv.index]] name = "pypi" url = "https://pypi.org/simple/" publish-url = "https://upload.pypi.org/legacy/" [tool.ruff] target-version = "py312" line-length = 120 select = [ "E", # pycodestyle errors "W", # pycodestyle warnings "F", # pyflakes "I", # isort "B", # flake8-bugbear "C4", # flake8-comprehensions "UP", # pyupgrade ] ignore = [] [tool.ruff.format] quote-style = "double" indent-style = "space" skip-magic-trailing-comma = false line-ending = "auto" [tool.mypy] python_version = "3.12" strict = true ignore_missing_imports = true disallow_untyped_defs = true disallow_incomplete_defs = true check_untyped_defs = true disallow_untyped_decorators = true no_implicit_optional = true warn_redundant_casts = true warn_unused_ignores = true warn_return_any = true warn_unreachable = true # Relaxed rules for tests [[tool.mypy.overrides]] module = "tests.*" disallow_untyped_defs = false disallow_incomplete_defs = false disallow_untyped_decorators = false check_untyped_defs = false warn_return_any = false warn_unreachable = false [tool.pytest] testpaths = ["tests"] python_files = ["test_*.py"] python_classes = ["Test*"] python_functions = ["test_*"] addopts = "-v --no-header --tb=short" [tool.pytest.ini_options] asyncio_mode = "auto" asyncio_default_fixture_loop_scope = "module" markers = [ "unit: marks a test as a unit test", "integration: marks a test as an integration test that requires database access" ] [dependency-groups] dev = [ "asyncpg-stubs>=0.30.0", "mypy>=1.15.0", "pytest>=8.3.4", "pytest-asyncio>=0.25.3", "pytest-cov>=6.0.0", "pytest-mock>=3.14.0", "ruff>=0.9.9", "sqlfluff>=3.3.1", ] [tool.sqlfluff.core] dialect = "postgres" templater = "jinja" max_line_length = 120 [tool.sqlfluff.indentation] tab_space_size = 4 [tool.sqlfluff.rules] exclude_rules = [ "L016", # Line length rules "L031", # Table aliasing "L034", # Column order in GROUP BY "L036", # Select targets should be on a new line "L037", # Ambiguous ordering directions "L042", # Join condition required "L047", # DISTINCT used with parentheses "LT02", # Layout indent "LT12", # Files must end with a single trailing newline "LT14", # Keyword newline "AL01", # Aliasing of table "AM05", # Join clauses should be fully qualified "ST09", # Join order "CP03" # Function name capitalization ] ``` -------------------------------------------------------------------------------- /supabase_mcp/services/logs/log_manager.py: -------------------------------------------------------------------------------- ```python from typing import Any from supabase_mcp.logger import logger from supabase_mcp.services.database.sql.loader import SQLLoader class LogManager: """Manager for retrieving logs from Supabase services.""" # Map collection names to table names COLLECTION_TO_TABLE = { "postgres": "postgres_logs", "api_gateway": "edge_logs", "auth": "auth_logs", "postgrest": "postgrest_logs", "pooler": "supavisor_logs", "storage": "storage_logs", "realtime": "realtime_logs", "edge_functions": "function_edge_logs", "cron": "postgres_logs", "pgbouncer": "pgbouncer_logs", } def __init__(self) -> None: """Initialize the LogManager.""" self.sql_loader = SQLLoader() def _build_where_clause( self, collection: str, hours_ago: int | None = None, filters: list[dict[str, Any]] | None = None, search: str | None = None, ) -> str: """Build the WHERE clause for a log query. Args: collection: The log collection name hours_ago: Number of hours to look back filters: List of filter objects with field, operator, and value search: Text to search for in event messages Returns: The WHERE clause as a string """ logger.debug( f"Building WHERE clause for collection={collection}, hours_ago={hours_ago}, filters={filters}, search={search}" ) clauses = [] # Get the table name for this collection table_name = self.COLLECTION_TO_TABLE.get(collection, collection) # Add time filter using BigQuery's TIMESTAMP_SUB function if hours_ago: # Qualify the timestamp column with the table name to avoid ambiguity clauses.append(f"{table_name}.timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours_ago} HOUR)") # Add search filter if search: # Escape single quotes in search text search_escaped = search.replace("'", "''") clauses.append(f"event_message LIKE '%{search_escaped}%'") # Add custom filters if filters: for filter_obj in filters: field = filter_obj["field"] operator = filter_obj["operator"] value = filter_obj["value"] # Handle string values if isinstance(value, str) and not value.isdigit(): value = f"'{value.replace("'", "''")}'" clauses.append(f"{field} {operator} {value}") # For cron logs, we already have a WHERE clause in the template if collection == "cron": if clauses: where_clause = f"AND {' AND '.join(clauses)}" else: where_clause = "" else: if clauses: where_clause = f"WHERE {' AND '.join(clauses)}" else: where_clause = "" logger.debug(f"Built WHERE clause: {where_clause}") return where_clause def build_logs_query( self, collection: str, limit: int = 20, hours_ago: int | None = 1, filters: list[dict[str, Any]] | None = None, search: str | None = None, custom_query: str | None = None, ) -> str: """Build a query for retrieving logs from a Supabase service. Args: collection: The log collection to query limit: Maximum number of log entries to return hours_ago: Retrieve logs from the last N hours filters: List of filter objects with field, operator, and value search: Text to search for in event messages custom_query: Complete custom SQL query to execute Returns: The SQL query string Raises: ValueError: If the collection is unknown """ if custom_query: return custom_query # Build the WHERE clause where_clause = self._build_where_clause( collection=collection, hours_ago=hours_ago, filters=filters, search=search ) # Get the SQL query return self.sql_loader.get_logs_query(collection=collection, where_clause=where_clause, limit=limit) ``` -------------------------------------------------------------------------------- /tests/services/database/sql/conftest.py: -------------------------------------------------------------------------------- ```python import pytest @pytest.fixture def sample_dql_queries() -> dict[str, str]: """Sample DQL (SELECT) queries for testing.""" return { "simple_select": "SELECT * FROM users", "select_with_where": "SELECT id, name FROM users WHERE age > 18", "select_with_join": "SELECT u.id, p.title FROM users u JOIN posts p ON u.id = p.user_id", "select_with_subquery": "SELECT * FROM users WHERE id IN (SELECT user_id FROM posts)", "select_with_cte": "WITH active_users AS (SELECT * FROM users WHERE active = true) SELECT * FROM active_users", } @pytest.fixture def sample_dml_queries() -> dict[str, str]: """Sample DML (INSERT, UPDATE, DELETE) queries for testing.""" return { "simple_insert": "INSERT INTO users (name, email) VALUES ('John', '[email protected]')", "insert_with_select": "INSERT INTO user_backup SELECT * FROM users", "simple_update": "UPDATE users SET active = true WHERE id = 1", "simple_delete": "DELETE FROM users WHERE id = 1", "merge_statement": "MERGE INTO users u USING temp_users t ON (u.id = t.id) WHEN MATCHED THEN UPDATE SET name = t.name", } @pytest.fixture def sample_ddl_queries() -> dict[str, str]: """Sample DDL (CREATE, ALTER, DROP) queries for testing.""" return { "create_table": "CREATE TABLE users (id SERIAL PRIMARY KEY, name TEXT, email TEXT UNIQUE)", "alter_table": "ALTER TABLE users ADD COLUMN active BOOLEAN DEFAULT false", "drop_table": "DROP TABLE users", "truncate_table": "TRUNCATE TABLE users", "create_index": "CREATE INDEX idx_user_email ON users (email)", } @pytest.fixture def sample_dcl_queries() -> dict[str, str]: """Sample DCL (GRANT, REVOKE) queries for testing.""" return { "grant_select": "GRANT SELECT ON users TO read_role", "grant_all": "GRANT ALL PRIVILEGES ON users TO admin_role", "revoke_select": "REVOKE SELECT ON users FROM read_role", "create_role": "CREATE ROLE read_role", "drop_role": "DROP ROLE read_role", } @pytest.fixture def sample_tcl_queries() -> dict[str, str]: """Sample TCL (BEGIN, COMMIT, ROLLBACK) queries for testing.""" return { "begin_transaction": "BEGIN", "commit_transaction": "COMMIT", "rollback_transaction": "ROLLBACK", "savepoint": "SAVEPOINT my_savepoint", "mixed_case_transaction": "Begin Transaction", } @pytest.fixture def sample_postgres_specific_queries() -> dict[str, str]: """Sample PostgreSQL-specific queries for testing.""" return { "vacuum": "VACUUM users", "analyze": "ANALYZE users", "copy_to": "COPY users TO '/tmp/users.csv' WITH CSV", "copy_from": "COPY users FROM '/tmp/users.csv' WITH CSV", "explain": "EXPLAIN ANALYZE SELECT * FROM users", } @pytest.fixture def sample_invalid_queries() -> dict[str, str]: """Sample invalid SQL queries for testing error handling.""" return { "syntax_error": "SELECT * FORM users", "missing_parenthesis": "SELECT * FROM users WHERE id IN (1, 2, 3", "invalid_column": "SELECT nonexistent_column FROM users", "incomplete_statement": "SELECT * FROM", "invalid_table": "SELECT * FROM nonexistent_table", } @pytest.fixture def sample_multiple_statements() -> dict[str, str]: """Sample SQL with multiple statements for testing batch processing.""" return { "multiple_safe": "SELECT * FROM users; SELECT * FROM posts;", "safe_and_write": "SELECT * FROM users; INSERT INTO logs (message) VALUES ('queried users');", "write_and_destructive": "INSERT INTO logs (message) VALUES ('dropping users'); DROP TABLE users;", "with_transaction": "BEGIN; INSERT INTO users (name) VALUES ('John'); COMMIT;", "mixed_categories": "SELECT * FROM users; UPDATE users SET active = true; DROP TABLE old_users;", } @pytest.fixture def sample_edge_cases() -> dict[str, str]: """Sample edge cases for testing.""" return { "with_comments": "SELECT * FROM users; -- This is a comment\n/* Multi-line\ncomment */", "quoted_identifiers": 'SELECT * FROM "user table" WHERE "first name" = \'John\'', "special_characters": "SELECT * FROM users WHERE name LIKE 'O''Brien%'", "schema_qualified": "SELECT * FROM public.users", "with_dollar_quotes": "SELECT $$This is a dollar-quoted string with 'quotes'$$ AS message", } ``` -------------------------------------------------------------------------------- /supabase_mcp/services/database/sql/loader.py: -------------------------------------------------------------------------------- ```python from pathlib import Path from supabase_mcp.logger import logger class SQLLoader: """Responsible for loading SQL queries from files.""" # Path to SQL files directory SQL_DIR = Path(__file__).parent / "queries" @classmethod def load_sql(cls, filename: str) -> str: """ Load SQL from a file in the sql directory. Args: filename: Name of the SQL file (with or without .sql extension) Returns: str: The SQL query from the file Raises: FileNotFoundError: If the SQL file doesn't exist """ # Ensure the filename has .sql extension if not filename.endswith(".sql"): filename = f"{filename}.sql" file_path = cls.SQL_DIR / filename if not file_path.exists(): logger.error(f"SQL file not found: {file_path}") raise FileNotFoundError(f"SQL file not found: {file_path}") with open(file_path) as f: sql = f.read().strip() logger.debug(f"Loaded SQL file: {filename} ({len(sql)} chars)") return sql @classmethod def get_schemas_query(cls) -> str: """Get a query to list all schemas.""" return cls.load_sql("get_schemas") @classmethod def get_tables_query(cls, schema_name: str) -> str: """Get a query to list all tables in a schema.""" query = cls.load_sql("get_tables") return query.replace("{schema_name}", schema_name) @classmethod def get_table_schema_query(cls, schema_name: str, table: str) -> str: """Get a query to get the schema of a table.""" query = cls.load_sql("get_table_schema") return query.replace("{schema_name}", schema_name).replace("{table}", table) @classmethod def get_migrations_query( cls, limit: int = 50, offset: int = 0, name_pattern: str = "", include_full_queries: bool = False ) -> str: """Get a query to list migrations.""" query = cls.load_sql("get_migrations") return ( query.replace("{limit}", str(limit)) .replace("{offset}", str(offset)) .replace("{name_pattern}", name_pattern) .replace("{include_full_queries}", str(include_full_queries).lower()) ) @classmethod def get_init_migrations_query(cls) -> str: """Get a query to initialize the migrations schema and table.""" return cls.load_sql("init_migrations") @classmethod def get_create_migration_query(cls, version: str, name: str, statements: str) -> str: """Get a query to create a migration. Args: version: The migration version (timestamp) name: The migration name statements: The SQL statements (escaped) Returns: str: The SQL query to create a migration """ query = cls.load_sql("create_migration") return query.replace("{version}", version).replace("{name}", name).replace("{statements}", statements) @classmethod def get_logs_query(cls, collection: str, where_clause: str = "", limit: int = 20) -> str: """Get a query to retrieve logs from a specific collection. Args: collection: The log collection name (e.g., postgres, api_gateway, auth) where_clause: The WHERE clause to filter logs limit: Maximum number of log entries to return Returns: str: The SQL query to retrieve logs Raises: FileNotFoundError: If the log collection SQL file doesn't exist """ # Map collection names to SQL files collection_map = { "postgres": "logs/postgres_logs", "api_gateway": "logs/edge_logs", "auth": "logs/auth_logs", "postgrest": "logs/postgrest_logs", "pooler": "logs/supavisor_logs", "storage": "logs/storage_logs", "realtime": "logs/realtime_logs", "edge_functions": "logs/function_edge_logs", "cron": "logs/cron_logs", "pgbouncer": "logs/pgbouncer_logs", } # Get the SQL file path sql_file = collection_map.get(collection) if not sql_file: raise ValueError(f"Unknown log collection: {collection}") # Load the SQL template query = cls.load_sql(sql_file) # Handle special case for cron logs if collection == "cron": return query.replace("{and_where_clause}", where_clause).replace("{limit}", str(limit)) else: return query.replace("{where_clause}", where_clause).replace("{limit}", str(limit)) ``` -------------------------------------------------------------------------------- /supabase_mcp/tools/descriptions/api_tools.yaml: -------------------------------------------------------------------------------- ```yaml # API tools descriptions send_management_api_request: | Execute a Supabase Management API request. This tool allows you to make direct calls to the Supabase Management API, which provides programmatic access to manage your Supabase project settings, resources, and configurations. REQUEST FORMATTING: - Use paths exactly as defined in the API specification - The {ref} parameter will be automatically injected from settings - Format request bodies according to the API specification PARAMETERS: - method: HTTP method (GET, POST, PUT, PATCH, DELETE) - path: API path (e.g. /v1/projects/{ref}/functions) - path_params: Path parameters as dict (e.g. {"function_slug": "my-function"}) - use empty dict {} if not needed - request_params: Query parameters as dict (e.g. {"key": "value"}) - use empty dict {} if not needed - request_body: Request body as dict (e.g. {"name": "test"}) - use empty dict {} if not needed PATH PARAMETERS HANDLING: - The {ref} placeholder (project reference) is automatically injected - you don't need to provide it - All other path placeholders must be provided in the path_params dictionary - Common placeholders include: * {function_slug}: For Edge Functions operations * {id}: For operations on specific resources (API keys, auth providers, etc.) * {slug}: For organization operations * {branch_id}: For database branch operations * {provider_id}: For SSO provider operations * {tpa_id}: For third-party auth operations EXAMPLES: 1. GET request with path and query parameters: method: "GET" path: "/v1/projects/{ref}/functions/{function_slug}" path_params: {"function_slug": "my-function"} request_params: {"version": "1"} request_body: {} 2. POST request with body: method: "POST" path: "/v1/projects/{ref}/functions" path_params: {} request_params: {} request_body: {"name": "test-function", "slug": "test-function"} SAFETY SYSTEM: API operations are categorized by risk level: - LOW RISK: Read operations (GET) - allowed in SAFE mode - MEDIUM/HIGH RISK: Write operations (POST, PUT, PATCH, DELETE) - require UNSAFE mode - EXTREME RISK: Destructive operations - require UNSAFE mode and confirmation - BLOCKED: Some operations are completely blocked for safety reasons SAFETY CONSIDERATIONS: - By default, the API client starts in SAFE mode, allowing only read operations - To perform write operations, first use live_dangerously(service="api", enable=True) - High-risk operations will be rejected with a confirmation ID - Use confirm_destructive_operation with the provided ID after reviewing risks - Some operations may be completely blocked for safety reasons For a complete list of available API endpoints and their parameters, use the get_management_api_spec tool. For details on safety rules, use the get_management_api_safety_rules tool. get_management_api_spec: | Get the complete Supabase Management API specification. Returns the full OpenAPI specification for the Supabase Management API, including: - All available endpoints and operations - Required and optional parameters for each operation - Request and response schemas - Authentication requirements - Safety information for each operation This tool can be used in four different ways: 1. Without parameters: Returns all domains (default) 2. With path and method: Returns the full specification for a specific API endpoint 3. With domain only: Returns all paths and methods within that domain 4. With all_paths=True: Returns all paths and methods Parameters: - params: Dictionary containing optional parameters: - path: Optional API path (e.g., "/v1/projects/{ref}/functions") - method: Optional HTTP method (e.g., "GET", "POST") - domain: Optional domain/tag name (e.g., "Auth", "Storage") - all_paths: Optional boolean, if True returns all paths and methods Available domains: - Analytics: Analytics-related endpoints - Auth: Authentication and authorization endpoints - Database: Database management endpoints - Domains: Custom domain configuration endpoints - Edge Functions: Serverless function management endpoints - Environments: Environment configuration endpoints - OAuth: OAuth integration endpoints - Organizations: Organization management endpoints - Projects: Project management endpoints - Rest: RESTful API endpoints - Secrets: Secret management endpoints - Storage: Storage management endpoints This specification is useful for understanding: - What operations are available through the Management API - How to properly format requests for each endpoint - Which operations require unsafe mode - What data structures to expect in responses SAFETY: This is a low-risk read operation that can be executed in SAFE mode. ``` -------------------------------------------------------------------------------- /tests/test_settings.py: -------------------------------------------------------------------------------- ```python from unittest.mock import patch import pytest from pydantic import ValidationError from supabase_mcp.settings import SUPPORTED_REGIONS, Settings @pytest.fixture(autouse=True) def reset_settings_singleton() -> None: """Reset the Settings singleton before each test""" # Clear singleton instance if it exists if hasattr(Settings, "_instance"): delattr(Settings, "_instance") yield # Clean up after test if hasattr(Settings, "_instance"): delattr(Settings, "_instance") class TestSettings: """Integration tests for Settings.""" @pytest.mark.integration def test_settings_default_values(self, clean_environment: None) -> None: """Test default values (no config file, no env vars)""" settings = Settings.with_config() # No config file assert settings.supabase_project_ref == "127.0.0.1:54322" assert settings.supabase_db_password == "postgres" assert settings.supabase_region == "us-east-1" assert settings.supabase_access_token is None assert settings.supabase_service_role_key is None @pytest.mark.integration def test_settings_from_env_test(self, clean_environment: None) -> None: """Test loading from .env.test""" import os settings = Settings.with_config(".env.test") # In CI, we expect default values since .env.test might not be properly set up if os.environ.get("CI") == "true": assert settings.supabase_project_ref == "127.0.0.1:54322" # Default value in CI assert settings.supabase_db_password == "postgres" # Default value in CI else: # In local dev, we expect .env.test to override defaults assert settings.supabase_project_ref != "127.0.0.1:54322" # Should be overridden by .env.test assert settings.supabase_db_password != "postgres" # Should be overridden by .env.test # Check that the values are not empty assert settings.supabase_project_ref, "Project ref should not be empty" assert settings.supabase_db_password, "DB password should not be empty" @pytest.mark.integration def test_settings_from_env_vars(self, clean_environment: None) -> None: """Test env vars take precedence over config file""" env_values = { "SUPABASE_PROJECT_REF": "abcdefghij1234567890", # Valid 20-char project ref "SUPABASE_DB_PASSWORD": "env-password", } with patch.dict("os.environ", env_values, clear=False): settings = Settings.with_config(".env.test") # Even with config file assert settings.supabase_project_ref == "abcdefghij1234567890" assert settings.supabase_db_password == "env-password" @pytest.mark.integration def test_settings_integration_fixture(self, settings_integration: Settings) -> None: """Test the settings_integration fixture provides valid settings.""" # The settings_integration fixture should load from .env.test or environment variables assert settings_integration.supabase_project_ref, "Project ref should not be empty" assert settings_integration.supabase_db_password, "DB password should not be empty" assert settings_integration.supabase_region, "Region should not be empty" @pytest.mark.integration def test_settings_region_validation(self) -> None: """Test region validation.""" # Test default region settings = Settings() assert settings.supabase_region == "us-east-1" # Test valid region from environment env_values = {"SUPABASE_REGION": "ap-southeast-1"} with patch.dict("os.environ", env_values, clear=True): settings = Settings() assert settings.supabase_region == "ap-southeast-1" # Test invalid region with pytest.raises(ValidationError) as exc_info: env_values = {"SUPABASE_REGION": "invalid-region"} with patch.dict("os.environ", env_values, clear=True): Settings() assert "Region 'invalid-region' is not supported" in str(exc_info.value) @pytest.mark.integration def test_supported_regions(self) -> None: """Test that all supported regions are valid.""" for region in SUPPORTED_REGIONS.__args__: env_values = {"SUPABASE_REGION": region} with patch.dict("os.environ", env_values, clear=True): settings = Settings() assert settings.supabase_region == region @pytest.mark.integration def test_settings_access_token_and_service_role(self) -> None: """Test access token and service role key settings.""" # Test with environment variables env_values = { "SUPABASE_ACCESS_TOKEN": "test-access-token", "SUPABASE_SERVICE_ROLE_KEY": "test-service-role-key", } with patch.dict("os.environ", env_values, clear=True): settings = Settings() assert settings.supabase_access_token == "test-access-token" assert settings.supabase_service_role_key == "test-service-role-key" # Test defaults (should be None) with patch.dict("os.environ", {}, clear=True): settings = Settings() assert settings.supabase_access_token is None assert settings.supabase_service_role_key is None ``` -------------------------------------------------------------------------------- /supabase_mcp/tools/descriptions/database_tools.yaml: -------------------------------------------------------------------------------- ```yaml # Database tool descriptions get_db_schemas: | List all database schemas with their sizes and table counts. Returns a comprehensive overview of all schemas in the database, including: - Schema names - Total size of each schema - Number of tables in each schema - Owner information This is useful for getting a high-level understanding of the database structure. SAFETY: This is a low-risk read operation that can be executed in SAFE mode. get_tables: | List all tables, foreign tables, and views in a schema with their sizes, row counts, and metadata. Provides detailed information about all database objects in the specified schema: - Table/view names - Object types (table, view, foreign table) - Row counts - Size on disk - Column counts - Index information - Last vacuum/analyze times Parameters: - schema_name: Name of the schema to inspect (e.g., 'public', 'auth', etc.) SAFETY: This is a low-risk read operation that can be executed in SAFE mode. get_table_schema: | Get detailed table structure including columns, keys, and relationships. Returns comprehensive information about a specific table's structure: - Column definitions (names, types, constraints) - Primary key information - Foreign key relationships - Indexes - Constraints - Triggers Parameters: - schema_name: Name of the schema (e.g., 'public', 'auth') - table: Name of the table to inspect SAFETY: This is a low-risk read operation that can be executed in SAFE mode. execute_postgresql: | Execute PostgreSQL statements against your Supabase database. IMPORTANT: All SQL statements must end with a semicolon (;). OPERATION TYPES AND REQUIREMENTS: 1. READ Operations (SELECT, EXPLAIN, etc.): - Can be executed directly without special requirements - Example: SELECT * FROM public.users LIMIT 10; 2. WRITE Operations (INSERT, UPDATE, DELETE): - Require UNSAFE mode (use live_dangerously('database', True) first) - Example: INSERT INTO public.users (email) VALUES ('[email protected]'); 3. SCHEMA Operations (CREATE, ALTER, DROP): - Require UNSAFE mode (use live_dangerously('database', True) first) - Destructive operations (DROP, TRUNCATE) require additional confirmation - Example: CREATE TABLE public.test_table (id SERIAL PRIMARY KEY, name TEXT); MIGRATION HANDLING: All queries that modify the database will be automatically version controlled by the server. You can provide optional migration name, if you want to name the migration. - Respect the following format: verb_noun_detail. Be descriptive and concise. - Examples: - create_users_table - add_email_to_profiles - enable_rls_on_users - If you don't provide a migration name, the server will generate one based on the SQL statement - The system will sanitize your provided name to ensure compatibility with database systems - Migration names are prefixed with a timestamp in the format YYYYMMDDHHMMSS SAFETY SYSTEM: Operations are categorized by risk level: - LOW RISK: Read operations (SELECT, EXPLAIN) - allowed in SAFE mode - MEDIUM RISK: Write operations (INSERT, UPDATE, DELETE) - require UNSAFE mode - HIGH RISK: Schema operations (CREATE, ALTER) - require UNSAFE mode - EXTREME RISK: Destructive operations (DROP, TRUNCATE) - require UNSAFE mode and confirmation TRANSACTION HANDLING: - DO NOT use transaction control statements (BEGIN, COMMIT, ROLLBACK) - The database client automatically wraps queries in transactions - The SQL validator will reject queries containing transaction control statements - This ensures atomicity and provides rollback capability for data modifications MULTIPLE STATEMENTS: - You can send multiple SQL statements in a single query - Each statement will be executed in order within the same transaction - Example: CREATE TABLE public.test_table (id SERIAL PRIMARY KEY, name TEXT); INSERT INTO public.test_table (name) VALUES ('test'); CONFIRMATION FLOW FOR HIGH-RISK OPERATIONS: - High-risk operations (DROP TABLE, TRUNCATE, etc.) will be rejected with a confirmation ID - The error message will explain what happened and provide a confirmation ID - Review the risks with the user before proceeding - Use the confirm_destructive_operation tool with the provided ID to execute the operation IMPORTANT GUIDELINES: - The database client starts in SAFE mode by default for safety - Only enable UNSAFE mode when you need to modify data or schema - Never mix READ and WRITE operations in the same transaction - For destructive operations, be prepared to confirm with the confirm_destructive_operation tool WHEN TO USE OTHER TOOLS INSTEAD: - For Auth operations (users, authentication, etc.): Use call_auth_admin_method instead of direct SQL The Auth Admin SDK provides safer, validated methods for user management - For project configuration, functions, storage, etc.: Use send_management_api_request The Management API handles Supabase platform features that aren't directly in the database Note: This tool operates on the PostgreSQL database only. API operations use separate safety controls. retrieve_migrations: | Retrieve a list of all migrations a user has from Supabase. Returns a list of migrations with the following information: - Version (timestamp) - Name - SQL statements (if requested) - Statement count - Version type (named or numbered) Parameters: - limit: Maximum number of migrations to return (default: 50, max: 100) - offset: Number of migrations to skip for pagination (default: 0) - name_pattern: Optional pattern to filter migrations by name. Uses SQL ILIKE pattern matching (case-insensitive). The pattern is automatically wrapped with '%' wildcards, so "users" will match "create_users_table", "add_email_to_users", etc. To search for an exact match, use the complete name. - include_full_queries: Whether to include the full SQL statements in the result (default: false) SAFETY: This is a low-risk read operation that can be executed in SAFE mode. ``` -------------------------------------------------------------------------------- /tests/test_main.py: -------------------------------------------------------------------------------- ```python import asyncio from unittest.mock import patch import pytest from supabase_mcp.core.container import ServicesContainer from supabase_mcp.logger import logger from supabase_mcp.main import run_inspector, run_server from supabase_mcp.services.safety.models import ClientType from supabase_mcp.tools.manager import ToolName # === UNIT TESTS === class TestMain: """Tests for the main application functionality.""" @pytest.mark.unit def test_mcp_server_initializes(self, container_integration: ServicesContainer): """Test that the MCP server initializes correctly.""" # Verify server name mcp = container_integration.mcp_server assert mcp.name == "supabase" # Verify MCP server is created but not yet initialized with tools tools = asyncio.run(mcp.list_tools()) logger.info(f"Found {len(tools)} MCP tools registered in basic container") # At this point, no tools should be registered yet assert len(tools) == 0, f"Expected 0 tools in basic container, but got {len(tools)}" @pytest.mark.unit def test_services_container_initialization( self, initialized_container_integration: ServicesContainer, ): """Test that the services container is correctly initialized.""" # Verify container has all required services container = initialized_container_integration assert container.postgres_client is not None assert container.api_client is not None assert container.sdk_client is not None assert container.api_manager is not None assert container.safety_manager is not None assert container.query_manager is not None assert container.tool_manager is not None # Verify the container is fully initialized # Check that safety manager has been initialized by verifying it has configs registered safety_manager = container.safety_manager assert safety_manager.get_safety_mode(ClientType.DATABASE) is not None assert safety_manager.get_safety_mode(ClientType.API) is not None @pytest.mark.unit def test_tool_registration(self, tools_registry_integration: ServicesContainer): """Test that tools are registered correctly using ToolManager's tool names.""" # Get the tool manager from the container tool_manager = tools_registry_integration.tool_manager assert tool_manager is not None, "Tool manager should be initialized" # Get the MCP server from the container mcp = tools_registry_integration.mcp_server # Get expected tools from ToolName enum expected_tools = [ ToolName.GET_SCHEMAS, ToolName.GET_TABLES, ToolName.GET_TABLE_SCHEMA, ToolName.EXECUTE_POSTGRESQL, ToolName.CONFIRM_DESTRUCTIVE_OPERATION, ToolName.RETRIEVE_MIGRATIONS, ToolName.LIVE_DANGEROUSLY, ToolName.SEND_MANAGEMENT_API_REQUEST, ToolName.GET_MANAGEMENT_API_SPEC, ToolName.GET_AUTH_ADMIN_METHODS_SPEC, ToolName.CALL_AUTH_ADMIN_METHOD, ToolName.RETRIEVE_LOGS, ] # Verify tools are registered in MCP registered_tools = asyncio.run(mcp.list_tools()) registered_tool_names = {tool.name for tool in registered_tools} # We should have exactly 12 tools (all the tools defined in ToolName enum) assert len(registered_tools) == 12, f"Expected 12 tools, but got {len(registered_tools)}" # Log the actual number of tools for reference logger.info(f"Found {len(registered_tools)} MCP tools registered") # Verify each tool has proper MCP protocol structure for tool in registered_tools: assert tool.name, "Tool must have a name" assert tool.description, "Tool must have a description" assert tool.inputSchema, "Tool must have an input schema" # Check that each expected tool is registered by its string value for tool_name in expected_tools: # Convert enum to string value (e.g., 'get_schemas' instead of ToolName.GET_SCHEMAS) tool_str_value = str(tool_name.value) assert tool_str_value in registered_tool_names, f"Tool {tool_name} not registered" # Verify we have tools for core functionality categories # Instead of checking specific names, check for categories of functionality tool_names = {tool.name for tool in registered_tools} # Log all available tools for debugging tool_list = ", ".join(sorted(tool_names)) logger.info(f"Available MCP tools: {tool_list}") @pytest.mark.unit def test_run_server_starts(self): """Test that run_server starts the server.""" # Patch the global mcp instance and its run method with patch("supabase_mcp.main.mcp") as mock_mcp: # Call run_server which should use the global mcp instance run_server() mock_mcp.run.assert_called_once() @pytest.mark.unit def test_inspector_mode(self): """Test that inspector mode initializes correctly""" # This test is fine as is since it's testing the global function # and mocking an external dependency with patch("mcp.cli.cli.dev") as mock_dev: # Patch __file__ in the main module to match what we expect with patch("supabase_mcp.main.__file__", __file__): run_inspector() mock_dev.assert_called_once_with(__file__) @pytest.mark.unit def test_server_command_exists(self): """Test that the server command exists and is executable""" import os import shutil # Skip this test in CI environments if os.environ.get("CI") == "true": pytest.skip("Skipping server command test in CI environment") # Check if the command exists in PATH server_path = shutil.which("supabase-mcp-server") assert server_path is not None, "supabase-mcp-server command not found in PATH" # Check if the file is executable assert os.access(server_path, os.X_OK), "supabase-mcp-server is not executable" ``` -------------------------------------------------------------------------------- /CHANGELOG.MD: -------------------------------------------------------------------------------- ```markdown # Changelog All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [0.3.12] - 2025-03-12 ### Added - Implemented a new `retrieve_logs` tool that allows retrieval of logs from any Supabase log collection - Postgres, PostgREST, Auth, Edge Functions and others. Provides a way to query and filter - Implemented log rotation to prevent unbounded log file growth (5MB limit with 3 backup files) ### Changed - Improved region configuration with clearer error messages for region mismatches - Updated smithery.yaml to reduce configuration error rate (Tenant not found) - Improved PostgreSQL client connection error handling with specific guidance for "Tenant or user not found" errors ## [0.3.11] - 2025-03-10 ### Fixed - Fixed an error with creating a migration file when a user doesn't have `supabase_migrations` schema ## [0.3.10] - 2025-03-09 ### Added - Enhanced migration naming system with improved object type detection for procedures, functions, and views. - Expanded `retrieve_migrations` tool with pagination, name pattern filtering, and option to include full SQL queries. - Added a check to validate personal access token and service role key are set before calling API or SDK methods ### Changed - Updated setup instructions for Claude Desktop - Updated setup instructions for Cline - Updated and fixed Smithery.ai setup ### Removed - Removed redundant `get_api_safety_rules` tool since exceptions already provide enough information to the client ## [0.3.9] - 2025-03-08 ### Fixed - Fixed an issue with api spec tool that prevented spec retrieval ## [0.3.8] - 2025-03-07 ### Added - SQL query validation using PostgreSQL's parser (pglast v7.3+) - Automatic migration script generation for schema changes - Universal safety system with standardized risk levels (Low/Medium/High/Extreme) - Switched to asyncpg v0.30.0+ from psycopg2 - Enhanced API spec tool with multiple query modes and risk assessment - Connection retry logic for database and API operations - Code coverage with pytest-cov - SQL linting with SQLFluff - Added pyyaml v6.0.2+ for configuration ### Changed - Refactored to use dependency injection pattern - Standardized service initialization to synchronous pattern - Improved SQL safety categorization: - `safe`: Read-only operations (always allowed) - `write`: Data modification (requires unsafe mode) - `destructive`: Schema changes (requires unsafe mode + confirmation) - Updated Ruff to v0.9.9 - Added asyncpg-stubs and pytest-mock for testing ## [0.3.7] - 2025-03-02 ### Fixed - Documentation inaccuracies ### Added - Auth admin SDK support for local Supabase instances ## [0.3.6] - 2025-02-26 ### Added - Added `call_auth_admin_method` which enables MCP server to manage users in your database (create, update, delete, confirm). All Auth SDK methods are supported - Added `get_auth_admin_methods_spec` to retrieve documentation for all available Auth Admin methods. Response objects now use attribute access (dot notation) instead of dictionary access. ### Fixed - Fixed an issue with improper encoding of database passwords. Previously passwords containing "%" symbol led to connection failures ## [0.3.5] - 2025-02-26 ### Fixed - Fixed an issue with `get_tables` so that it reliably returns foreign tables and views - Updated docs to describe how to setup mcp.json with project-specific MCPs - Expanded and improved test suite to cover each MCP tool ## [0.3.4] - 2025-02-25 ### Fixed - Improved `get_tables` to return foreign data tables ## [0.3.3] - 2025-02-25 ### Fixed - Fixed a bug with `readonly` scope being incorrectly managed in db client ## [0.3.2] - 2025-02-25 ### Fixed - Fixed a bug preventing execution of DDL commands (create, alter tables, etc.) ## [0.3.1] - 2025-02-23 ### Changed - Significantly improved docs to make install, configuration, usage instructions super clear ## [0.3.0] - 2025-02-23 ### Added - Full support for read-write SQL operations: - Implemented safety mode system with read-only (default) and read-write modes - Added mode switching with automatic reset to read-only - Enhanced transaction support for testing write operations - Improved error handling for read-only violations - Support for Supabase Management API - Introduces supabase management API integration with safe (enabled by default) and yolo modes - Includes the following tools: - `send_management_api_request` to send arbitrary requests to Supabase Management API, with auto-injection of project ref and safety mode control. - `get_management_api_spec` to get the enriched API specification with safety information - `get_management_api_safety_rules` to get all safety rules including blocked and unsafe operations with human-readable explanations - `live_dangerously` to switch to yolo mode - Safety features: - Divides API methods into `safe`, `unsafe` and `blocked` categories based on the risk of the operation - Allows to switch between safe and yolo modes dynamically - Blocked operations (delete project, delete database) are not allowed regardless of the mode ## [0.2.2] - 2025-02-20 ### Added - Support for different Supabase regions: - Configuration via `SUPABASE_REGION` environment variable - Validation for all 16 supported AWS regions - Default to `us-east-1` for backward compatibility - Enhanced logging for region information - Comprehensive documentation and examples ## [0.2.1] - 2025-02-19 ### Added - Package distribution support: - PyPI package publishing setup - Installation via `pipx` and `uv` - Entry point scripts for direct execution - Smithery.ai deployment configuration ### Changed - BREAKING: Installation and execution methods: - Switched from direct script execution to proper module structure - Updated Cursor/Windsurf configuration for package-based execution - Improved setup instructions in README ## [0.2.0] - 2025-02-18 Intermediary release for package distribution support ## [0.1.0] - 2025-02-16 ### Added - Initial release - Basic MCP server functionality - Supabase database connection support - Integration with Cursor and Windsurf IDEs [0.3.0]: https://github.com/alexander-zuev/supabase-mcp-server/releases/tag/v0.3.0 [0.2.2]: https://github.com/alexander-zuev/supabase-mcp-server/releases/tag/v0.2.2 [0.2.1]: https://github.com/alexander-zuev/supabase-mcp-server/releases/tag/v0.2.1 [0.2.0]: https://github.com/alexander-zuev/supabase-mcp-server/releases/tag/v0.2.0-dev0 [0.1.0]: https://github.com/alexander-zuev/supabase-mcp-server/releases/tag/v0.1.0 ``` -------------------------------------------------------------------------------- /tests/services/api/test_api_manager.py: -------------------------------------------------------------------------------- ```python from typing import Any from unittest.mock import MagicMock, patch import pytest from supabase_mcp.exceptions import SafetyError from supabase_mcp.services.api.api_manager import SupabaseApiManager from supabase_mcp.services.safety.models import ClientType class TestApiManager: """Tests for the API Manager.""" @pytest.mark.unit def test_path_parameter_replacement(self, mock_api_manager: SupabaseApiManager): """ Test that path parameters are correctly replaced in API paths. This test verifies that the API Manager correctly replaces path placeholders with actual values, handling both required and optional parameters. """ # Use the mock_api_manager fixture instead of creating one manually api_manager = mock_api_manager # Test with a simple path and required parameters (avoiding 'ref' which is auto-injected) path = "/v1/organizations/{slug}/members" path_params = {"slug": "example-org"} result = api_manager.replace_path_params(path, path_params) expected = "/v1/organizations/example-org/members" assert result == expected, f"Expected {expected}, got {result}" # Test with missing required parameters path = "/v1/organizations/{slug}/members/{id}" path_params = {"slug": "example-org"} with pytest.raises(ValueError) as excinfo: api_manager.replace_path_params(path, path_params) assert "Missing path parameters" in str(excinfo.value) # Test with extra parameters (should be ignored) path = "/v1/organizations/{slug}" path_params = {"slug": "example-org", "extra": "should-be-ignored"} with pytest.raises(ValueError) as excinfo: api_manager.replace_path_params(path, path_params) assert "Unknown path parameter" in str(excinfo.value) # Test with no parameters path = "/v1/organizations" result = api_manager.replace_path_params(path) expected = "/v1/organizations" assert result == expected, f"Expected {expected}, got {result}" @pytest.mark.asyncio @pytest.mark.unit @patch("supabase_mcp.services.api.api_manager.logger") async def test_safety_validation(self, mock_logger: MagicMock, mock_api_manager: SupabaseApiManager): """ Test that API operations are properly validated through the safety manager. This test verifies that the API Manager correctly validates operations before executing them, and handles safety errors appropriately. """ # Use the mock_api_manager fixture instead of creating one manually api_manager = mock_api_manager # Mock the replace_path_params method to return the path unchanged api_manager.replace_path_params = MagicMock(return_value="/v1/organizations/example-org") # Mock the client's execute_request method to return a simple response mock_response = {"success": True} api_manager.client.execute_request = MagicMock() api_manager.client.execute_request.return_value = mock_response # Make the mock awaitable async def mock_execute_request(*args: Any, **kwargs: Any) -> dict[str, Any]: return mock_response api_manager.client.execute_request = mock_execute_request # Test a successful operation method = "GET" path = "/v1/organizations/{slug}" path_params = {"slug": "example-org"} result = await api_manager.execute_request(method, path, path_params) # Verify that the safety manager was called with the correct parameters api_manager.safety_manager.validate_operation.assert_called_once_with( ClientType.API, (method, path, path_params, None, None), has_confirmation=False ) # Verify that the result is what we expected assert result == {"success": True} # Test an operation that fails safety validation api_manager.safety_manager.validate_operation.reset_mock() # Make the safety manager raise a SafetyError def raise_safety_error(*args: Any, **kwargs: Any) -> None: raise SafetyError("Operation not allowed") api_manager.safety_manager.validate_operation.side_effect = raise_safety_error # The execute_request method should raise the SafetyError with pytest.raises(SafetyError) as excinfo: await api_manager.execute_request("DELETE", "/v1/organizations/{slug}", {"slug": "example-org"}) assert "Operation not allowed" in str(excinfo.value) @pytest.mark.asyncio @pytest.mark.unit async def test_retrieve_logs_basic(self, mock_api_manager: SupabaseApiManager): """ Test that the retrieve_logs method correctly builds and executes a logs query. This test verifies that the API Manager correctly builds a logs query using the LogManager and executes it through the Management API. """ # Mock the log_manager's build_logs_query method mock_api_manager.log_manager.build_logs_query = MagicMock(return_value="SELECT * FROM postgres_logs LIMIT 10") # Mock the execute_request method to return a simple response mock_response = {"result": [{"id": "123", "event_message": "test"}]} async def mock_execute_request(*args: Any, **kwargs: Any) -> dict[str, Any]: return mock_response mock_api_manager.execute_request = mock_execute_request # Execute the method result = await mock_api_manager.retrieve_logs( collection="postgres", limit=10, hours_ago=24, ) # Verify that the log_manager was called with the correct parameters mock_api_manager.log_manager.build_logs_query.assert_called_once_with( collection="postgres", limit=10, hours_ago=24, filters=None, search=None, custom_query=None, ) # Verify that the result is what we expected assert result == {"result": [{"id": "123", "event_message": "test"}]} @pytest.mark.asyncio @pytest.mark.unit async def test_retrieve_logs_error_handling(self, mock_api_manager: SupabaseApiManager): """ Test that the retrieve_logs method correctly handles errors. This test verifies that the API Manager correctly handles errors that occur during log retrieval and propagates them to the caller. """ # Mock the log_manager's build_logs_query method mock_api_manager.log_manager.build_logs_query = MagicMock(return_value="SELECT * FROM postgres_logs LIMIT 10") # Mock the execute_request method to raise an exception async def mock_execute_request_error(*args: Any, **kwargs: Any) -> dict[str, Any]: raise Exception("API error") mock_api_manager.execute_request = mock_execute_request_error # The retrieve_logs method should propagate the exception with pytest.raises(Exception) as excinfo: await mock_api_manager.retrieve_logs(collection="postgres") assert "API error" in str(excinfo.value) ``` -------------------------------------------------------------------------------- /tests/test_tool_manager.py: -------------------------------------------------------------------------------- ```python from unittest.mock import MagicMock, mock_open, patch from supabase_mcp.tools.manager import ToolManager, ToolName class TestToolManager: """Tests for the ToolManager class.""" def test_singleton_pattern(self): """Test that ToolManager follows the singleton pattern.""" # Get two instances manager1 = ToolManager.get_instance() manager2 = ToolManager.get_instance() # They should be the same object assert manager1 is manager2 # Reset the singleton for other tests # pylint: disable=protected-access # We need to reset the singleton for test isolation ToolManager._instance = None # type: ignore @patch("supabase_mcp.tools.manager.Path") @patch("supabase_mcp.tools.manager.yaml.safe_load") def test_load_descriptions(self, mock_yaml_load: MagicMock, mock_path: MagicMock): """Test that descriptions are loaded correctly from YAML files.""" # Setup mock directory structure mock_file_path = MagicMock() mock_dir = MagicMock() # Mock the Path(__file__) call mock_path.return_value = mock_file_path mock_file_path.parent = mock_dir mock_dir.__truediv__.return_value = mock_dir # For the / operator # Mock directory existence check mock_dir.exists.return_value = True # Mock the glob to return some YAML files mock_file1 = MagicMock() mock_file1.name = "database_tools.yaml" mock_file2 = MagicMock() mock_file2.name = "api_tools.yaml" mock_dir.glob.return_value = [mock_file1, mock_file2] # Mock the file open and YAML load mock_yaml_data = {"get_schemas": "Description for get_schemas", "get_tables": "Description for get_tables"} mock_yaml_load.return_value = mock_yaml_data # Create a new instance to trigger _load_descriptions with patch("builtins.open", mock_open(read_data="dummy yaml content")): # We need to create the manager to trigger _load_descriptions ToolManager() # Verify the descriptions were loaded assert mock_dir.glob.call_count > 0 assert mock_dir.glob.call_args[0][0] == "*.yaml" assert mock_yaml_load.call_count >= 1 # Reset the singleton for other tests # pylint: disable=protected-access ToolManager._instance = None # type: ignore def test_get_description_valid_tool(self): """Test getting a description for a valid tool.""" # Setup manager = ToolManager.get_instance() # Force the descriptions to have a known value for testing # pylint: disable=protected-access # We need to set the descriptions directly for testing manager.descriptions = { ToolName.GET_SCHEMAS.value: "Description for get_schemas", ToolName.GET_TABLES.value: "Description for get_tables", } # Test description = manager.get_description(ToolName.GET_SCHEMAS.value) # Verify assert description == "Description for get_schemas" # Reset the singleton for other tests # pylint: disable=protected-access ToolManager._instance = None # type: ignore def test_get_description_invalid_tool(self): """Test getting a description for an invalid tool.""" # Setup manager = ToolManager.get_instance() # Force the descriptions to have a known value for testing # pylint: disable=protected-access # We need to set the descriptions directly for testing manager.descriptions = { ToolName.GET_SCHEMAS.value: "Description for get_schemas", ToolName.GET_TABLES.value: "Description for get_tables", } # Test and verify description = manager.get_description("nonexistent_tool") assert description == "" # The method returns an empty string for unknown tools # Reset the singleton for other tests # pylint: disable=protected-access ToolManager._instance = None # type: ignore def test_all_tool_names_have_descriptions(self): """Test that all tools defined in ToolName enum have descriptions.""" # Setup - get a fresh instance # Reset the singleton first to ensure we get a clean instance # pylint: disable=protected-access ToolManager._instance = None # type: ignore # Get a fresh instance that will load the real YAML files manager = ToolManager.get_instance() # Print the loaded descriptions for debugging print(f"\nLoaded descriptions: {manager.descriptions}") # Verify that we have at least some descriptions loaded assert len(manager.descriptions) > 0, "No descriptions were loaded" # Check that descriptions are not empty empty_descriptions: list[str] = [] for tool_name, description in manager.descriptions.items(): if not description or len(description.strip()) == 0: empty_descriptions.append(tool_name) # Fail if we found any empty descriptions assert len(empty_descriptions) == 0, f"Found empty descriptions for tools: {empty_descriptions}" # Check that at least some of the tool names have descriptions found_descriptions = 0 missing_descriptions: list[str] = [] for tool_name in ToolName: description = manager.get_description(tool_name.value) if description: found_descriptions += 1 else: missing_descriptions.append(tool_name.value) # Print missing descriptions for debugging if missing_descriptions: print(f"\nMissing descriptions for: {missing_descriptions}") # We should have at least some descriptions assert found_descriptions > 0, "No tool has a description" # Reset the singleton for other tests # pylint: disable=protected-access ToolManager._instance = None # type: ignore @patch.object(ToolManager, "_load_descriptions") def test_initialization_loads_descriptions(self, mock_load_descriptions: MagicMock): """Test that descriptions are loaded during initialization.""" # Create a new instance # We need to create the manager to trigger __init__ ToolManager() # Verify _load_descriptions was called assert mock_load_descriptions.call_count > 0 # Reset the singleton for other tests # pylint: disable=protected-access ToolManager._instance = None # type: ignore def test_tool_enum_completeness(self): """Test that the ToolName enum contains all expected tools.""" # Get all tool values from the enum tool_values = [tool.value for tool in ToolName] # Verify the total number of tools # Update this number when new tools are added expected_tool_count = 12 assert len(tool_values) == expected_tool_count, f"Expected {expected_tool_count} tools, got {len(tool_values)}" # Verify specific tools are included assert "retrieve_logs" in tool_values, "retrieve_logs tool is missing from ToolName enum" # Reset the singleton for other tests # pylint: disable=protected-access ToolManager._instance = None # type: ignore ``` -------------------------------------------------------------------------------- /tests/services/safety/test_api_safety_config.py: -------------------------------------------------------------------------------- ```python """ Unit tests for the APISafetyConfig class. This file contains unit test cases for the APISafetyConfig class, which is responsible for determining the risk level of API operations and whether they are allowed or require confirmation. """ import pytest from supabase_mcp.services.safety.models import OperationRiskLevel, SafetyMode from supabase_mcp.services.safety.safety_configs import APISafetyConfig, HTTPMethod @pytest.mark.unit class TestAPISafetyConfig: """Unit tests for the APISafetyConfig class.""" def test_get_risk_level_low_risk(self): """Test getting risk level for low-risk operations (GET requests).""" config = APISafetyConfig() # API operations are tuples of (method, path, path_params, query_params, request_body) operation = ("GET", "/v1/projects/{ref}/functions", {}, {}, {}) risk_level = config.get_risk_level(operation) assert risk_level == OperationRiskLevel.LOW def test_get_risk_level_medium_risk(self): """Test getting risk level for medium-risk operations (POST/PUT/PATCH).""" config = APISafetyConfig() # Test POST request operation = ("POST", "/v1/projects/{ref}/functions", {}, {}, {}) risk_level = config.get_risk_level(operation) assert risk_level == OperationRiskLevel.MEDIUM # Test PUT request operation = ("PUT", "/v1/projects/{ref}/functions", {}, {}, {}) risk_level = config.get_risk_level(operation) assert risk_level == OperationRiskLevel.MEDIUM # Test PATCH request operation = ("PATCH", "/v1/projects/{ref}/functions/{function_slug}", {}, {}, {}) risk_level = config.get_risk_level(operation) assert risk_level == OperationRiskLevel.MEDIUM def test_get_risk_level_high_risk(self): """Test getting risk level for high-risk operations.""" config = APISafetyConfig() # Test DELETE request for a function operation = ("DELETE", "/v1/projects/{ref}/functions/{function_slug}", {}, {}, {}) risk_level = config.get_risk_level(operation) assert risk_level == OperationRiskLevel.HIGH # Test other high-risk operations high_risk_paths = [ "/v1/projects/{ref}/branches/{branch_id}", "/v1/projects/{ref}/custom-hostname", "/v1/projects/{ref}/network-bans", ] for path in high_risk_paths: operation = ("DELETE", path, {}, {}, {}) risk_level = config.get_risk_level(operation) assert risk_level == OperationRiskLevel.HIGH, f"Path {path} should be HIGH risk" def test_get_risk_level_extreme_risk(self): """Test getting risk level for extreme-risk operations.""" config = APISafetyConfig() # Test DELETE request for a project operation = ("DELETE", "/v1/projects/{ref}", {}, {}, {}) risk_level = config.get_risk_level(operation) assert risk_level == OperationRiskLevel.EXTREME def test_is_operation_allowed(self): """Test if operations are allowed based on risk level and safety mode.""" config = APISafetyConfig() # Low risk operations should be allowed in both safe and unsafe modes assert config.is_operation_allowed(OperationRiskLevel.LOW, SafetyMode.SAFE) is True assert config.is_operation_allowed(OperationRiskLevel.LOW, SafetyMode.UNSAFE) is True # Medium/high risk operations should only be allowed in unsafe mode assert config.is_operation_allowed(OperationRiskLevel.MEDIUM, SafetyMode.SAFE) is False assert config.is_operation_allowed(OperationRiskLevel.MEDIUM, SafetyMode.UNSAFE) is True assert config.is_operation_allowed(OperationRiskLevel.HIGH, SafetyMode.SAFE) is False assert config.is_operation_allowed(OperationRiskLevel.HIGH, SafetyMode.UNSAFE) is True # Extreme risk operations should not be allowed in safe mode assert config.is_operation_allowed(OperationRiskLevel.EXTREME, SafetyMode.SAFE) is False # In the current implementation, extreme risk operations are never allowed assert config.is_operation_allowed(OperationRiskLevel.EXTREME, SafetyMode.UNSAFE) is False def test_needs_confirmation(self): """Test if operations need confirmation based on risk level.""" config = APISafetyConfig() # Low and medium risk operations should not need confirmation assert config.needs_confirmation(OperationRiskLevel.LOW) is False assert config.needs_confirmation(OperationRiskLevel.MEDIUM) is False # High and extreme risk operations should need confirmation assert config.needs_confirmation(OperationRiskLevel.HIGH) is True assert config.needs_confirmation(OperationRiskLevel.EXTREME) is True def test_path_matching(self): """Test that path patterns are correctly matched.""" config = APISafetyConfig() # Test exact path matching operation = ("GET", "/v1/projects/{ref}/functions", {}, {}, {}) assert config.get_risk_level(operation) == OperationRiskLevel.LOW # Test path with parameters operation = ("GET", "/v1/projects/abc123/functions", {}, {}, {}) assert config.get_risk_level(operation) == OperationRiskLevel.LOW # Test path with multiple parameters operation = ("DELETE", "/v1/projects/abc123/functions/my-function", {}, {}, {}) assert config.get_risk_level(operation) == OperationRiskLevel.HIGH # Test path that doesn't match any pattern (should default to MEDIUM for non-GET) operation = ("DELETE", "/v1/some/unknown/path", {}, {}, {}) assert config.get_risk_level(operation) == OperationRiskLevel.LOW # Test path that doesn't match any pattern (should default to LOW for GET) operation = ("GET", "/v1/some/unknown/path", {}, {}, {}) assert config.get_risk_level(operation) == OperationRiskLevel.LOW def test_method_case_insensitivity(self): """Test that HTTP method matching is case-insensitive.""" config = APISafetyConfig() # Test with lowercase method operation = ("get", "/v1/projects/{ref}/functions", {}, {}, {}) assert config.get_risk_level(operation) == OperationRiskLevel.LOW # Test with uppercase method operation = ("GET", "/v1/projects/{ref}/functions", {}, {}, {}) assert config.get_risk_level(operation) == OperationRiskLevel.LOW # Test with mixed case method operation = ("GeT", "/v1/projects/{ref}/functions", {}, {}, {}) assert config.get_risk_level(operation) == OperationRiskLevel.LOW def test_path_safety_config_structure(self): """Test that the PATH_SAFETY_CONFIG structure is correctly defined.""" config = APISafetyConfig() # Check that the config has the expected structure assert hasattr(config, "PATH_SAFETY_CONFIG") # Check that risk levels are represented as keys assert OperationRiskLevel.MEDIUM in config.PATH_SAFETY_CONFIG assert OperationRiskLevel.HIGH in config.PATH_SAFETY_CONFIG assert OperationRiskLevel.EXTREME in config.PATH_SAFETY_CONFIG # Check that each risk level has a dictionary of methods to paths for risk_level, methods_dict in config.PATH_SAFETY_CONFIG.items(): assert isinstance(methods_dict, dict) for method, paths in methods_dict.items(): assert isinstance(method, HTTPMethod) assert isinstance(paths, list) for path in paths: assert isinstance(path, str) ``` -------------------------------------------------------------------------------- /supabase_mcp/settings.py: -------------------------------------------------------------------------------- ```python import os from pathlib import Path from typing import Literal from pydantic import Field, ValidationInfo, field_validator from pydantic_settings import BaseSettings, SettingsConfigDict from supabase_mcp.logger import logger SUPPORTED_REGIONS = Literal[ "us-west-1", # West US (North California) "us-east-1", # East US (North Virginia) "us-east-2", # East US (Ohio) "ca-central-1", # Canada (Central) "eu-west-1", # West EU (Ireland) "eu-west-2", # West Europe (London) "eu-west-3", # West EU (Paris) "eu-central-1", # Central EU (Frankfurt) "eu-central-2", # Central Europe (Zurich) "eu-north-1", # North EU (Stockholm) "ap-south-1", # South Asia (Mumbai) "ap-southeast-1", # Southeast Asia (Singapore) "ap-northeast-1", # Northeast Asia (Tokyo) "ap-northeast-2", # Northeast Asia (Seoul) "ap-southeast-2", # Oceania (Sydney) "sa-east-1", # South America (São Paulo) ] def find_config_file(env_file: str = ".env") -> str | None: """Find the specified env file in order of precedence: 1. Current working directory (where command is run) 2. Global config: - Windows: %APPDATA%/supabase-mcp/{env_file} - macOS/Linux: ~/.config/supabase-mcp/{env_file} Args: env_file: The name of the environment file to look for (default: ".env") Returns: The path to the found config file, or None if not found """ # 1. Check current directory cwd_config = Path.cwd() / env_file if cwd_config.exists(): return str(cwd_config) # 2. Check global config home = Path.home() if os.name == "nt": # Windows global_config = Path(os.environ.get("APPDATA", "")) / "supabase-mcp" / ".env" else: # macOS/Linux global_config = home / ".config" / "supabase-mcp" / ".env" if global_config.exists(): logger.error( f"DEPRECATED: {global_config} is deprecated and will be removed in a future release. " "Use your IDE's native .json config file to configure access to MCP." ) return str(global_config) return None class Settings(BaseSettings): """Initializes settings for Supabase MCP server.""" supabase_project_ref: str = Field( default="127.0.0.1:54322", # Local Supabase default description="Supabase project ref - Must be 20 chars for remote projects, can be local address for development", alias="SUPABASE_PROJECT_REF", ) supabase_db_password: str | None = Field( default=None, # Will be validated based on project_ref description="Supabase database password - Required for remote projects, defaults to 'postgres' for local", alias="SUPABASE_DB_PASSWORD", ) supabase_region: str = Field( default="us-east-1", # East US (North Virginia) - Supabase's default region description="Supabase region for connection", alias="SUPABASE_REGION", ) supabase_access_token: str | None = Field( default=None, description="Optional personal access token for accessing Supabase Management API", alias="SUPABASE_ACCESS_TOKEN", ) supabase_service_role_key: str | None = Field( default=None, description="Optional service role key for accessing Python SDK", alias="SUPABASE_SERVICE_ROLE_KEY", ) supabase_api_url: str = Field( default="https://api.supabase.com", description="Supabase API URL", ) query_api_key: str = Field( default="test-key", description="TheQuery.dev API key", alias="QUERY_API_KEY", ) query_api_url: str = Field( default="https://api.thequery.dev/v1", description="TheQuery.dev API URL", alias="QUERY_API_URL", ) @field_validator("supabase_region") @classmethod def validate_region(cls, v: str, info: ValidationInfo) -> str: """Validate that the region is supported by Supabase.""" # Get the project_ref from the values values = info.data project_ref = values.get("supabase_project_ref", "") # If this is a remote project and region is the default if not project_ref.startswith("127.0.0.1") and v == "us-east-1" and "SUPABASE_REGION" not in os.environ: logger.warning( "You're connecting to a remote Supabase project but haven't specified a region. " "Using default 'us-east-1', which may cause 'Tenant or user not found' errors if incorrect. " "Please set the correct SUPABASE_REGION in your configuration." ) # Validate that the region is supported if v not in SUPPORTED_REGIONS.__args__: supported = "\n - ".join([""] + list(SUPPORTED_REGIONS.__args__)) raise ValueError(f"Region '{v}' is not supported. Supported regions are:{supported}") return v @field_validator("supabase_project_ref") @classmethod def validate_project_ref(cls, v: str) -> str: """Validate the project ref format.""" if v.startswith("127.0.0.1"): # Local development - allow default format return v # Remote project - must be 20 chars if len(v) != 20: logger.error("Invalid Supabase project ref format") raise ValueError( "Invalid Supabase project ref format. " "Remote project refs must be exactly 20 characters long. " f"Got {len(v)} characters instead." ) return v @field_validator("supabase_db_password") @classmethod def validate_db_password(cls, v: str | None, info: ValidationInfo) -> str: """Validate database password based on project type.""" project_ref = info.data.get("supabase_project_ref", "") # For local development, allow default password if project_ref.startswith("127.0.0.1"): return v or "postgres" # Default to postgres for local # For remote projects, password is required if not v: logger.error("SUPABASE_DB_PASSWORD is required when connecting to a remote instance") raise ValueError( "Database password is required for remote Supabase projects. " "Please set SUPABASE_DB_PASSWORD in your environment variables." ) return v @classmethod def with_config(cls, config_file: str | None = None) -> "Settings": """Create Settings with a specific config file. Args: config_file: Path to .env file to use, or None for no config file """ # Create a new Settings class with the specific config class SettingsWithConfig(cls): model_config = SettingsConfigDict(env_file=config_file, env_file_encoding="utf-8") instance = SettingsWithConfig() # Log configuration source and precedence - simplified to a single clear message env_vars_present = any(var in os.environ for var in ["SUPABASE_PROJECT_REF", "SUPABASE_DB_PASSWORD"]) if env_vars_present and config_file: logger.info(f"Using environment variables (highest precedence) over config file: {config_file}") elif env_vars_present: logger.info("Using environment variables for configuration") elif config_file: logger.info(f"Using settings from config file: {config_file}") else: logger.info("Using default settings (local development)") return instance # Module-level singleton - maintains existing interface settings = Settings.with_config(find_config_file()) ``` -------------------------------------------------------------------------------- /tests/services/api/test_api_client.py: -------------------------------------------------------------------------------- ```python import httpx import pytest from unittest.mock import AsyncMock, MagicMock, patch from supabase_mcp.clients.management_client import ManagementAPIClient from supabase_mcp.exceptions import APIClientError, APIConnectionError from supabase_mcp.settings import Settings @pytest.mark.asyncio(loop_scope="module") class TestAPIClient: """Unit tests for the API client.""" @pytest.fixture def mock_settings(self): """Create mock settings for testing.""" settings = MagicMock(spec=Settings) settings.supabase_access_token = "test-token" settings.supabase_project_ref = "test-project-ref" settings.supabase_region = "us-east-1" settings.query_api_url = "https://api.test.com" settings.supabase_api_url = "https://api.supabase.com" return settings async def test_execute_get_request(self, mock_settings): """Test executing a GET request to the API.""" # Create client but don't mock the httpx client yet client = ManagementAPIClient(settings=mock_settings) # Setup mock response mock_response = MagicMock(spec=httpx.Response) mock_response.status_code = 404 mock_response.is_success = False mock_response.headers = {"content-type": "application/json"} mock_response.json.return_value = {"message": "Cannot GET /v1/health"} mock_response.text = '{"message": "Cannot GET /v1/health"}' mock_response.content = b'{"message": "Cannot GET /v1/health"}' # Mock the send_request method to return our mock response with patch.object(client, 'send_request', return_value=mock_response): path = "/v1/health" # Execute the request and expect a 404 error with pytest.raises(APIClientError) as exc_info: await client.execute_request( method="GET", path=path, ) # Verify the error details assert exc_info.value.status_code == 404 assert "Cannot GET /v1/health" in str(exc_info.value) async def test_request_preparation(self, mock_settings): """Test that requests are properly prepared with headers and parameters.""" client = ManagementAPIClient(settings=mock_settings) # Prepare a request with parameters method = "GET" path = "/v1/health" request_params = {"param1": "value1", "param2": "value2"} # Prepare the request request = client.prepare_request( method=method, path=path, request_params=request_params, ) # Verify the request assert request.method == method assert path in str(request.url) assert "param1=value1" in str(request.url) assert "param2=value2" in str(request.url) assert "Content-Type" in request.headers assert request.headers["Content-Type"] == "application/json" async def test_error_handling(self, mock_settings): """Test handling of API errors.""" client = ManagementAPIClient(settings=mock_settings) # Setup mock response mock_response = MagicMock(spec=httpx.Response) mock_response.status_code = 404 mock_response.is_success = False mock_response.headers = {"content-type": "application/json"} mock_response.json.return_value = {"message": "Cannot GET /v1/nonexistent-endpoint"} mock_response.text = '{"message": "Cannot GET /v1/nonexistent-endpoint"}' mock_response.content = b'{"message": "Cannot GET /v1/nonexistent-endpoint"}' with patch.object(client, 'send_request', return_value=mock_response): path = "/v1/nonexistent-endpoint" # Execute the request and expect an APIClientError with pytest.raises(APIClientError) as exc_info: await client.execute_request( method="GET", path=path, ) # Verify the error details assert exc_info.value.status_code == 404 assert "Cannot GET /v1/nonexistent-endpoint" in str(exc_info.value) async def test_request_with_body(self, mock_settings): """Test executing a request with a body.""" client = ManagementAPIClient(settings=mock_settings) # Test the request preparation method = "POST" path = "/v1/health/check" request_body = {"test": "data", "nested": {"value": 123}} # Prepare the request request = client.prepare_request( method=method, path=path, request_body=request_body, ) # Verify the request assert request.method == method assert path in str(request.url) assert request.content # Should have content for the body assert "Content-Type" in request.headers assert request.headers["Content-Type"] == "application/json" async def test_response_parsing(self, mock_settings): """Test parsing API responses.""" client = ManagementAPIClient(settings=mock_settings) # Setup mock response mock_response = MagicMock(spec=httpx.Response) mock_response.status_code = 200 mock_response.is_success = True mock_response.headers = {"content-type": "application/json"} mock_response.json.return_value = [{"id": "project1", "name": "Test Project"}] mock_response.content = b'[{"id": "project1", "name": "Test Project"}]' with patch.object(client, 'send_request', return_value=mock_response): path = "/v1/projects" # Execute the request response = await client.execute_request( method="GET", path=path, ) # Verify the response is parsed correctly assert isinstance(response, list) assert len(response) > 0 assert "id" in response[0] async def test_request_retry_mechanism(self, mock_settings): """Test that the tenacity retry mechanism works correctly for API requests.""" client = ManagementAPIClient(settings=mock_settings) # Create a mock request object for the NetworkError mock_request = MagicMock(spec=httpx.Request) mock_request.method = "GET" mock_request.url = "https://api.supabase.com/v1/projects" # Mock the client's send method to always raise a network error with patch.object(client.client, 'send', side_effect=httpx.NetworkError("Simulated network failure", request=mock_request)): # Execute a request - this should trigger retries and eventually fail with pytest.raises(APIConnectionError) as exc_info: await client.execute_request( method="GET", path="/v1/projects", ) # Verify the error message indicates retries were attempted assert "Network error after 3 retry attempts" in str(exc_info.value) async def test_request_without_access_token(self, mock_settings): """Test that an exception is raised when attempting to send a request without an access token.""" # Create client with no access token mock_settings.supabase_access_token = None client = ManagementAPIClient(settings=mock_settings) # Attempt to execute a request - should raise an exception with pytest.raises(APIClientError) as exc_info: await client.execute_request( method="GET", path="/v1/projects", ) assert "Supabase access token is not configured" in str(exc_info.value) ``` -------------------------------------------------------------------------------- /supabase_mcp/clients/base_http_client.py: -------------------------------------------------------------------------------- ```python from abc import ABC, abstractmethod from json.decoder import JSONDecodeError from typing import Any, TypeVar import httpx from pydantic import BaseModel from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential from supabase_mcp.exceptions import ( APIClientError, APIConnectionError, APIResponseError, APIServerError, UnexpectedError, ) from supabase_mcp.logger import logger T = TypeVar("T") # Helper function for retry decorator to safely log exceptions def log_retry_attempt(retry_state: RetryCallState) -> None: """Log retry attempts with exception details if available.""" exception = retry_state.outcome.exception() if retry_state.outcome and retry_state.outcome.failed else None exception_str = str(exception) if exception else "Unknown error" logger.warning(f"Network error, retrying ({retry_state.attempt_number}/3): {exception_str}") class AsyncHTTPClient(ABC): """Abstract base class for async HTTP clients.""" @abstractmethod async def _ensure_client(self) -> httpx.AsyncClient: """Ensure client exists and is ready for use. Creates the client if it doesn't exist yet. Returns the client instance. """ pass @abstractmethod async def close(self) -> None: """Close the client and release resources. Should be called when the client is no longer needed. """ pass def prepare_request( self, client: httpx.AsyncClient, method: str, path: str, request_params: dict[str, Any] | None = None, request_body: dict[str, Any] | None = None, ) -> httpx.Request: """ Prepare an HTTP request. Args: client: The httpx client to use method: HTTP method (GET, POST, etc.) path: API path request_params: Query parameters request_body: Request body Returns: Prepared httpx.Request object Raises: APIClientError: If request preparation fails """ try: return client.build_request(method=method, url=path, params=request_params, json=request_body) except Exception as e: raise APIClientError( message=f"Failed to build request: {str(e)}", status_code=None, ) from e @retry( retry=retry_if_exception_type(httpx.NetworkError), # This includes ConnectError and TimeoutException stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), reraise=True, # Ensure the original exception is raised before_sleep=log_retry_attempt, ) async def send_request(self, client: httpx.AsyncClient, request: httpx.Request) -> httpx.Response: """ Send an HTTP request with retry logic for transient errors. Args: client: The httpx client to use request: Prepared httpx.Request object Returns: httpx.Response object Raises: APIConnectionError: For connection issues APIClientError: For other request errors """ try: return await client.send(request) except httpx.NetworkError as e: # All NetworkErrors will be retried by the decorator # This will only be reached after all retries are exhausted logger.error(f"Network error after all retry attempts: {str(e)}") raise APIConnectionError( message=f"Network error after 3 retry attempts: {str(e)}", status_code=None, ) from e except Exception as e: # Other exceptions won't be retried raise APIClientError( message=f"Request failed: {str(e)}", status_code=None, ) from e def parse_response(self, response: httpx.Response) -> dict[str, Any]: """ Parse an HTTP response as JSON. Args: response: httpx.Response object Returns: Parsed response body as dictionary Raises: APIResponseError: If response cannot be parsed as JSON """ if not response.content: return {} try: return response.json() except JSONDecodeError as e: raise APIResponseError( message=f"Failed to parse response as JSON: {str(e)}", status_code=response.status_code, response_body={"raw_content": response.text}, ) from e def handle_error_response(self, response: httpx.Response, parsed_body: dict[str, Any] | None = None) -> None: """ Handle error responses based on status code. Args: response: httpx.Response object parsed_body: Parsed response body if available Raises: APIClientError: For client errors (4xx) APIServerError: For server errors (5xx) UnexpectedError: For unexpected status codes """ # Extract error message error_message = f"API request failed: {response.status_code}" if parsed_body and "message" in parsed_body: error_message = parsed_body["message"] # Determine error type based on status code if 400 <= response.status_code < 500: raise APIClientError( message=error_message, status_code=response.status_code, response_body=parsed_body, ) elif response.status_code >= 500: raise APIServerError( message=error_message, status_code=response.status_code, response_body=parsed_body, ) else: # This should not happen, but just in case raise UnexpectedError( message=f"Unexpected status code: {response.status_code}", status_code=response.status_code, response_body=parsed_body, ) async def execute_request( self, method: str, path: str, request_params: dict[str, Any] | None = None, request_body: dict[str, Any] | None = None, ) -> dict[str, Any] | BaseModel: """ Execute an HTTP request. Args: method: HTTP method (GET, POST, etc.) path: API path request_params: Query parameters request_body: Request body Returns: API response as a dictionary Raises: APIClientError: For client errors (4xx) APIConnectionError: For connection issues APIResponseError: For response parsing errors UnexpectedError: For unexpected errors """ # Log detailed request information logger.info(f"API Client: Executing {method} request to {path}") if request_params: logger.debug(f"Request params: {request_params}") if request_body: logger.debug(f"Request body: {request_body}") # Get client client = await self._ensure_client() # Prepare request request = self.prepare_request(client, method, path, request_params, request_body) # Send request response = await self.send_request(client, request) # Parse response (for both success and error cases) parsed_body = self.parse_response(response) # Check if successful if not response.is_success: logger.warning(f"Request failed: {method} {path} - Status {response.status_code}") self.handle_error_response(response, parsed_body) # Log success and return logger.info(f"Request successful: {method} {path} - Status {response.status_code}") return parsed_body ``` -------------------------------------------------------------------------------- /tests/services/logs/test_log_manager.py: -------------------------------------------------------------------------------- ```python from unittest.mock import patch import pytest from supabase_mcp.services.database.sql.loader import SQLLoader from supabase_mcp.services.logs.log_manager import LogManager class TestLogManager: """Tests for the LogManager class.""" def test_init(self): """Test initialization of LogManager.""" log_manager = LogManager() assert isinstance(log_manager.sql_loader, SQLLoader) assert log_manager.COLLECTION_TO_TABLE["postgres"] == "postgres_logs" assert log_manager.COLLECTION_TO_TABLE["api_gateway"] == "edge_logs" assert log_manager.COLLECTION_TO_TABLE["edge_functions"] == "function_edge_logs" @pytest.mark.parametrize( "collection,hours_ago,filters,search,expected_clause", [ # Test with hours_ago only ( "postgres", 24, None, None, "WHERE postgres_logs.timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)", ), # Test with search only ( "auth", None, None, "error", "WHERE event_message LIKE '%error%'", ), # Test with filters only ( "api_gateway", None, [{"field": "status_code", "operator": "=", "value": 500}], None, "WHERE status_code = 500", ), # Test with string value in filters ( "api_gateway", None, [{"field": "method", "operator": "=", "value": "GET"}], None, "WHERE method = 'GET'", ), # Test with multiple filters ( "postgres", None, [ {"field": "parsed.error_severity", "operator": "=", "value": "ERROR"}, {"field": "parsed.application_name", "operator": "LIKE", "value": "app%"}, ], None, "WHERE parsed.error_severity = 'ERROR' AND parsed.application_name LIKE 'app%'", ), # Test with hours_ago and search ( "storage", 12, None, "upload", "WHERE storage_logs.timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 12 HOUR) AND event_message LIKE '%upload%'", ), # Test with all parameters ( "edge_functions", 6, [{"field": "response.status_code", "operator": ">", "value": 400}], "timeout", "WHERE function_edge_logs.timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 6 HOUR) AND event_message LIKE '%timeout%' AND response.status_code > 400", ), # Test with cron logs (special case) ( "cron", 24, None, None, "AND postgres_logs.timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)", ), # Test with cron logs and other parameters ( "cron", 12, [{"field": "parsed.error_severity", "operator": "=", "value": "ERROR"}], "failed", "AND postgres_logs.timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 12 HOUR) AND event_message LIKE '%failed%' AND parsed.error_severity = 'ERROR'", ), ], ) def test_build_where_clause(self, collection, hours_ago, filters, search, expected_clause): """Test building WHERE clauses for different scenarios.""" log_manager = LogManager() where_clause = log_manager._build_where_clause( collection=collection, hours_ago=hours_ago, filters=filters, search=search ) assert where_clause == expected_clause def test_build_where_clause_escapes_single_quotes(self): """Test that single quotes in search strings are properly escaped.""" log_manager = LogManager() where_clause = log_manager._build_where_clause(collection="postgres", search="O'Reilly") assert where_clause == "WHERE event_message LIKE '%O''Reilly%'" # Test with filters containing single quotes where_clause = log_manager._build_where_clause( collection="postgres", filters=[{"field": "parsed.query", "operator": "LIKE", "value": "SELECT * FROM O'Reilly"}], ) assert where_clause == "WHERE parsed.query LIKE 'SELECT * FROM O''Reilly'" @patch.object(SQLLoader, "get_logs_query") def test_build_logs_query_with_custom_query(self, mock_get_logs_query): """Test building a logs query with a custom query.""" log_manager = LogManager() custom_query = "SELECT * FROM postgres_logs LIMIT 10" query = log_manager.build_logs_query(collection="postgres", custom_query=custom_query) assert query == custom_query # Ensure get_logs_query is not called when custom_query is provided mock_get_logs_query.assert_not_called() @patch.object(LogManager, "_build_where_clause") @patch.object(SQLLoader, "get_logs_query") def test_build_logs_query_standard(self, mock_get_logs_query, mock_build_where_clause): """Test building a standard logs query.""" log_manager = LogManager() mock_build_where_clause.return_value = "WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)" mock_get_logs_query.return_value = "SELECT * FROM postgres_logs WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR) LIMIT 20" query = log_manager.build_logs_query( collection="postgres", limit=20, hours_ago=24, filters=[{"field": "parsed.error_severity", "operator": "=", "value": "ERROR"}], search="connection", ) mock_build_where_clause.assert_called_once_with( collection="postgres", hours_ago=24, filters=[{"field": "parsed.error_severity", "operator": "=", "value": "ERROR"}], search="connection", ) mock_get_logs_query.assert_called_once_with( collection="postgres", where_clause="WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)", limit=20, ) assert ( query == "SELECT * FROM postgres_logs WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR) LIMIT 20" ) @patch.object(SQLLoader, "get_logs_query") def test_build_logs_query_integration(self, mock_get_logs_query, sql_loader): """Test building a logs query with integration between components.""" # Setup log_manager = LogManager() log_manager.sql_loader = sql_loader # Mock the SQL loader to return a predictable result mock_get_logs_query.return_value = ( "SELECT id, postgres_logs.timestamp, event_message FROM postgres_logs " "WHERE postgres_logs.timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR) " "ORDER BY timestamp DESC LIMIT 10" ) # Execute query = log_manager.build_logs_query( collection="postgres", limit=10, hours_ago=24, ) # Verify assert "SELECT id, postgres_logs.timestamp, event_message FROM postgres_logs" in query assert "LIMIT 10" in query mock_get_logs_query.assert_called_once() def test_unknown_collection(self): """Test handling of unknown collections.""" log_manager = LogManager() # Test with a collection that doesn't exist in the mapping where_clause = log_manager._build_where_clause( collection="unknown_collection", hours_ago=24, ) # Should use the collection name as the table name assert ( where_clause == "WHERE unknown_collection.timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)" ) ```