#
tokens: 49079/50000 81/106 files (page 1/5)
lines: off (toggle) GitHub
raw markdown copy
This is page 1 of 5. Use http://codebase.md/alexander-zuev/supabase-mcp-server?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   └── settings.local.json
├── .dockerignore
├── .env.example
├── .env.test.example
├── .github
│   ├── FUNDING.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── feature_request.md
│   │   └── roadmap_item.md
│   ├── PULL_REQUEST_TEMPLATE.md
│   └── workflows
│       ├── ci.yaml
│       ├── docs
│       │   └── release-checklist.md
│       └── publish.yaml
├── .gitignore
├── .pre-commit-config.yaml
├── .python-version
├── CHANGELOG.MD
├── codecov.yml
├── CONTRIBUTING.MD
├── Dockerfile
├── LICENSE
├── llms-full.txt
├── pyproject.toml
├── README.md
├── smithery.yaml
├── supabase_mcp
│   ├── __init__.py
│   ├── clients
│   │   ├── api_client.py
│   │   ├── base_http_client.py
│   │   ├── management_client.py
│   │   └── sdk_client.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── container.py
│   │   └── feature_manager.py
│   ├── exceptions.py
│   ├── logger.py
│   ├── main.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── api
│   │   │   ├── __init__.py
│   │   │   ├── api_manager.py
│   │   │   ├── spec_manager.py
│   │   │   └── specs
│   │   │       └── api_spec.json
│   │   ├── database
│   │   │   ├── __init__.py
│   │   │   ├── migration_manager.py
│   │   │   ├── postgres_client.py
│   │   │   ├── query_manager.py
│   │   │   └── sql
│   │   │       ├── loader.py
│   │   │       ├── models.py
│   │   │       ├── queries
│   │   │       │   ├── create_migration.sql
│   │   │       │   ├── get_migrations.sql
│   │   │       │   ├── get_schemas.sql
│   │   │       │   ├── get_table_schema.sql
│   │   │       │   ├── get_tables.sql
│   │   │       │   ├── init_migrations.sql
│   │   │       │   └── logs
│   │   │       │       ├── auth_logs.sql
│   │   │       │       ├── cron_logs.sql
│   │   │       │       ├── edge_logs.sql
│   │   │       │       ├── function_edge_logs.sql
│   │   │       │       ├── pgbouncer_logs.sql
│   │   │       │       ├── postgres_logs.sql
│   │   │       │       ├── postgrest_logs.sql
│   │   │       │       ├── realtime_logs.sql
│   │   │       │       ├── storage_logs.sql
│   │   │       │       └── supavisor_logs.sql
│   │   │       └── validator.py
│   │   ├── logs
│   │   │   ├── __init__.py
│   │   │   └── log_manager.py
│   │   ├── safety
│   │   │   ├── __init__.py
│   │   │   ├── models.py
│   │   │   ├── safety_configs.py
│   │   │   └── safety_manager.py
│   │   └── sdk
│   │       ├── __init__.py
│   │       ├── auth_admin_models.py
│   │       └── auth_admin_sdk_spec.py
│   ├── settings.py
│   └── tools
│       ├── __init__.py
│       ├── descriptions
│       │   ├── api_tools.yaml
│       │   ├── database_tools.yaml
│       │   ├── logs_and_analytics_tools.yaml
│       │   ├── safety_tools.yaml
│       │   └── sdk_tools.yaml
│       ├── manager.py
│       └── registry.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── api
│   │   │   ├── __init__.py
│   │   │   ├── test_api_client.py
│   │   │   ├── test_api_manager.py
│   │   │   └── test_spec_manager.py
│   │   ├── database
│   │   │   ├── sql
│   │   │   │   ├── __init__.py
│   │   │   │   ├── conftest.py
│   │   │   │   ├── test_loader.py
│   │   │   │   ├── test_sql_validator_integration.py
│   │   │   │   └── test_sql_validator.py
│   │   │   ├── test_migration_manager.py
│   │   │   ├── test_postgres_client.py
│   │   │   └── test_query_manager.py
│   │   ├── logs
│   │   │   └── test_log_manager.py
│   │   ├── safety
│   │   │   ├── test_api_safety_config.py
│   │   │   ├── test_safety_manager.py
│   │   │   └── test_sql_safety_config.py
│   │   └── sdk
│   │       ├── test_auth_admin_models.py
│   │       └── test_sdk_client.py
│   ├── test_container.py
│   ├── test_main.py
│   ├── test_settings.py
│   ├── test_tool_manager.py
│   ├── test_tools_integration.py.bak
│   └── test_tools.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.12.9

```

--------------------------------------------------------------------------------
/.env.test.example:
--------------------------------------------------------------------------------

```
# Supabase MCP Server Test Environment
# Copy this file to .env.test and modify as needed for your tests

# Connection settings for test database
SUPABASE_PROJECT_REF=127.0.0.1:54322
SUPABASE_DB_PASSWORD=postgres

# Optional: Management API access token (for API tests)
# SUPABASE_ACCESS_TOKEN=your_access_token

# Optional: Service role key (for auth tests)
# SUPABASE_SERVICE_ROLE_KEY=your_service_role_key

# TheQuery.dev API URL
QUERY_API_URL=http://127.0.0.1:8080/v1

```

--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------

```
# Supabase MCP Server Environment Configuration
# Copy this file to .env to configure your server

# API Key
QUERY_API_KEY=your-api-key # thequery.dev API key

# Required for remote Supabase projects (optional for local development)
SUPABASE_PROJECT_REF=your-project-ref # Your project reference from dashboard URL
SUPABASE_DB_PASSWORD=your-db-password # Database password for your project
SUPABASE_REGION=us-east-1 # Region where your Supabase project is hosted


# Optional configuration
SUPABASE_ACCESS_TOKEN=your-personal-access-token # Required for Management API tools
SUPABASE_SERVICE_ROLE_KEY=your-service-role-key # Required for Auth Admin SDK tools


# ONLY for local development
QUERY_API_URL=http://127.0.0.1:8080/v1 # TheQuery.dev API URL when developing locally

```

--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------

```
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
.pytest_cache/
.coverage
htmlcov/
.tox/
.nox/

# Virtual Environment
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# macOS
.DS_Store
.AppleDouble
.LSOverride
Icon
._*
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent

# IDEs and Editors
.idea/
.vscode/
*.swp
*.swo
*~
.project
.classpath
.settings/
*.sublime-workspace
*.sublime-project

# Local development
.env.mcp
.env.mcp2
*.log
logs/

# Ignore local assets
assets/
*.gif
*.mp4

# Generated version file
supabase_mcp/_version.py

# Docs
.llms-full.txt

# Docker specific ignores
Dockerfile
.dockerignore
docker-compose.yml
docker-compose.yaml

# Git
.git/
.github/
.gitignore

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
.pytest_cache/
.coverage
htmlcov/
.tox/
.nox/

# Virtual Environment
.env
.env.test
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# macOS
.DS_Store
.AppleDouble
.LSOverride
Icon
._*
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent

# IDEs and Editors
.idea/
.vscode/
*.swp
*.swo
*~
.project
.classpath
.settings/
*.sublime-workspace
*.sublime-project

# Local development

*.log
# Only ignore logs directory in the root, not in the package
/logs/


# Ignore local assets
assets/
*.gif
*.mp4

# Generated version file
supabase_mcp/_version.py

# Docs
.llms-full.txt
COMMIT_CONVENTION.md
feature-spec/

# Claude code
CLAUDE.md

#
future-evolution.md

```

--------------------------------------------------------------------------------
/.pre-commit-config.yaml:
--------------------------------------------------------------------------------

```yaml
repos:
  # === Syntax & Basic Checks ===
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v5.0.0
    hooks:
      - id: check-ast
        name: Validate Python syntax
      - id: check-toml
        name: Validate TOML files
      - id: mixed-line-ending
        name: Normalize line endings
        args: ['--fix=lf']
      - id: trailing-whitespace
        name: Remove trailing whitespace
      - id: end-of-file-fixer
        name: Ensure file ends with newline

  # === Security ===
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v5.0.0
    hooks:
      - id: detect-private-key
        name: Check for private keys
        stages: [pre-commit, pre-push, manual]
      - id: check-merge-conflict
        name: Check for merge conflicts
        stages: [pre-commit, manual]
      - id: debug-statements
        name: Check for debugger imports
        stages: [pre-commit, manual]

  # === SQL Linting ===
  - repo: https://github.com/sqlfluff/sqlfluff
    rev: 3.3.1
    hooks:
      - id: sqlfluff-lint
        name: Run SQLFluff linter
        description: Lint SQL files with SQLFluff
        types: [sql]
        args: [
          "--dialect", "postgres",
          "--exclude-rules", "L016,L031,LT02",  # Exclude some opinionated rules
        ]
        files: ^(supabase_mcp/sql|tests/sql)/
      - id: sqlfluff-fix
        name: Run SQLFluff fixer
        description: Auto-fix SQL files with SQLFluff
        types: [sql]
        args: [
          "--dialect", "postgres",
          "--exclude-rules", "L016,L031,LT02",  # Exclude some opinionated rules
        ]
        files: ^(supabase_mcp/sql|tests/sql)/

  # === Type Checking ===

  - repo: https://github.com/pre-commit/mirrors-mypy
    rev: "v1.15.0"
    hooks:
      - id: mypy
        name: Run mypy type checker
        args: [
          "--config-file=pyproject.toml",
          "--show-error-codes",
          "--pretty",
        ]
        additional_dependencies: [
            "types-requests",
            "types-aiofiles",
            "types-pytz",
            "pydantic",
            "chainlit",
            "anthropic",
            "fastapi",
            "httpx",
            "tiktoken",
            "weave",
            "chromadb",
            "cohere",
            "langchain"
        ]
        entry: bash -c 'mypy "$@" || true' --

  # === Code Quality & Style ===
  - repo: https://github.com/astral-sh/ruff-pre-commit
    rev: v0.9.9
    hooks:
      - id: ruff
        name: Run Ruff linter
        args: [
          --fix,
          --exit-zero,
          --quiet,
        ]
        types_or: [python, pyi, jupyter]
        files: ^(src|tests)/
        exclude: ^src/experimental/
        verbose: false
      - id: ruff-format
        name: Run Ruff formatter
        types_or: [python, pyi, jupyter]

  # === Documentation Checks ===
  - repo: https://github.com/tcort/markdown-link-check
    rev: v3.13.6
    hooks:
      - id: markdown-link-check
        name: Check Markdown links
        description: Extracts links from markdown texts and checks they're all alive
        stages: [pre-commit, pre-push, manual]

  # === Testing ===
  - repo: local
    hooks:
      - id: pytest
        name: Run tests
        entry: pytest
        language: system
        types: [python]
        pass_filenames: false
        args: [
          "--no-header",
          "--quiet",
          "--no-summary",
          "--show-capture=no",
          "--tb=line"  # Show only one line per failure
        ]
        stages: [pre-commit, pre-push]

  # === Build Check ===
  - repo: local
    hooks:
      - id: build-check
        name: Check build
        entry: uv build
        language: system
        pass_filenames: false
        stages: [pre-commit, pre-push]
      - id: version-check
        name: Check package version
        # Print version from the built package
        entry: python -c "from supabase_mcp import __version__; print('📦 Package version:', __version__)"
        language: system
        verbose: true
        pass_filenames: false
        stages: [pre-commit, pre-push]

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Query | MCP server for Supabase

> 🌅 More than 17k installs via pypi and close to 30k downloads on Smithery.ai — in short, this was fun! 🥳
> Thanks to everyone who has been using this server for the past few months, and I hope it was useful for you.
> Since Supabase has released their own [official MCP server](https://github.com/supabase-community/supabase-mcp),
> I've decided to no longer actively maintain this one. The official MCP server is as feature-rich, and many more
> features will be added in the future. Check it out!


<p class="center-text">
  <strong>Query MCP is an open-source MCP server that lets your IDE safely run SQL, manage schema changes, call the Supabase Management API, and use Auth Admin SDK — all with built-in safety controls.</strong>
</p>


<p class="center-text">
  <a href="https://pypi.org/project/supabase-mcp-server/"><img src="https://img.shields.io/pypi/v/supabase-mcp-server.svg" alt="PyPI version" /></a>
  <a href="https://github.com/alexander-zuev/supabase-mcp-server/actions"><img src="https://github.com/alexander-zuev/supabase-mcp-server/workflows/CI/badge.svg" alt="CI Status" /></a>
  <a href="https://codecov.io/gh/alexander-zuev/supabase-mcp-server"><img src="https://codecov.io/gh/alexander-zuev/supabase-mcp-server/branch/main/graph/badge.svg" alt="Code Coverage" /></a>
  <a href="https://www.python.org/downloads/"><img src="https://img.shields.io/badge/python-3.12%2B-blue.svg" alt="Python 3.12+" /></a>
  <a href="https://github.com/astral-sh/uv"><img src="https://img.shields.io/badge/uv-package%20manager-blueviolet" alt="uv package manager" /></a>
  <a href="https://pepy.tech/project/supabase-mcp-server"><img src="https://static.pepy.tech/badge/supabase-mcp-server" alt="PyPI Downloads" /></a>
  <a href="https://smithery.ai/server/@alexander-zuev/supabase-mcp-server"><img src="https://smithery.ai/badge/@alexander-zuev/supabase-mcp-server" alt="Smithery.ai Downloads" /></a>
  <a href="https://modelcontextprotocol.io/introduction"><img src="https://img.shields.io/badge/MCP-Server-orange" alt="MCP Server" /></a>
  <a href="LICENSE"><img src="https://img.shields.io/badge/license-Apache%202.0-blue.svg" alt="License" /></a>
</p>    

## Table of contents

<p class="center-text">
  <a href="#getting-started">Getting started</a> •
  <a href="#feature-overview">Feature overview</a> •
  <a href="#troubleshooting">Troubleshooting</a> •
  <a href="#changelog">Changelog</a>
</p>

## ✨ Key features
- 💻 Compatible with Cursor, Windsurf, Cline and other MCP clients supporting `stdio` protocol
- 🔐 Control read-only and read-write modes of SQL query execution
- 🔍 Runtime SQL query validation with risk level assessment
- 🛡️ Three-tier safety system for SQL operations: safe, write, and destructive
- 🔄 Robust transaction handling for both direct and pooled database connections
- 📝 Automatic versioning of database schema changes
- 💻 Manage your Supabase projects with Supabase Management API
- 🧑‍💻 Manage users with Supabase Auth Admin methods via Python SDK
- 🔨 Pre-built tools to help Cursor & Windsurf work with MCP more effectively
- 📦 Dead-simple install & setup via package manager (uv, pipx, etc.)


## Getting Started

### Prerequisites
Installing the server requires the following on your system:
- Python 3.12+

If you plan to install via `uv`, ensure it's [installed](https://docs.astral.sh/uv/getting-started/installation/#__tabbed_1_1).

### PostgreSQL Installation
PostgreSQL installation is no longer required for the MCP server itself, as it now uses asyncpg which doesn't depend on PostgreSQL development libraries.

However, you'll still need PostgreSQL if you're running a local Supabase instance:

**MacOS**
```bash
brew install postgresql@16
```

**Windows**
  - Download and install PostgreSQL 16+ from https://www.postgresql.org/download/windows/
  - Ensure "PostgreSQL Server" and "Command Line Tools" are selected during installation

### Step 1. Installation

Since v0.2.0 I introduced support for package installation. You can use your favorite Python package manager to install the server via:

```bash
# if pipx is installed (recommended)
pipx install supabase-mcp-server

# if uv is installed
uv pip install supabase-mcp-server
```

`pipx` is recommended because it creates isolated environments for each package.

You can also install the server manually by cloning the repository and running `pipx install -e .` from the root directory.

#### Installing from source
If you would like to install from source, for example for local development:
```bash
uv venv
# On Mac
source .venv/bin/activate
# On Windows
.venv\Scripts\activate
# Install package in editable mode
uv pip install -e .
```

#### Installing via Smithery.ai

You can find the full instructions on how to use Smithery.ai to connect to this MCP server [here](https://smithery.ai/server/@alexander-zuev/supabase-mcp-server).


### Step 2. Configuration

The Supabase MCP server requires configuration to connect to your Supabase database, access the Management API, and use the Auth Admin SDK. This section explains all available configuration options and how to set them up.

> 🔑 **Important**: Since v0.4 MCP server requires an API key which you can get for free at [thequery.dev](https://thequery.dev) to use this MCP server.

#### Environment Variables

The server uses the following environment variables:

| Variable | Required | Default | Description |
|----------|----------|---------|-------------|
| `SUPABASE_PROJECT_REF` | Yes | `127.0.0.1:54322` | Your Supabase project reference ID (or local host:port) |
| `SUPABASE_DB_PASSWORD` | Yes | `postgres` | Your database password |
| `SUPABASE_REGION` | Yes* | `us-east-1` | AWS region where your Supabase project is hosted |
| `SUPABASE_ACCESS_TOKEN` | No | None | Personal access token for Supabase Management API |
| `SUPABASE_SERVICE_ROLE_KEY` | No | None | Service role key for Auth Admin SDK |
| `QUERY_API_KEY` | Yes | None | API key from thequery.dev (required for all operations) |

> **Note**: The default values are configured for local Supabase development. For remote Supabase projects, you must provide your own values for `SUPABASE_PROJECT_REF` and `SUPABASE_DB_PASSWORD`.

> 🚨 **CRITICAL CONFIGURATION NOTE**: For remote Supabase projects, you MUST specify the correct region where your project is hosted using `SUPABASE_REGION`. If you encounter a "Tenant or user not found" error, this is almost certainly because your region setting doesn't match your project's actual region. You can find your project's region in the Supabase dashboard under Project Settings.

#### Connection Types

##### Database Connection
- The server connects to your Supabase PostgreSQL database using the transaction pooler endpoint
- Local development uses a direct connection to `127.0.0.1:54322`
- Remote projects use the format: `postgresql://postgres.[project_ref]:[password]@aws-0-[region].pooler.supabase.com:6543/postgres`

> ⚠️ **Important**: Session pooling connections are not supported. The server exclusively uses transaction pooling for better compatibility with the MCP server architecture.

##### Management API Connection
- Requires `SUPABASE_ACCESS_TOKEN` to be set
- Connects to the Supabase Management API at `https://api.supabase.com`
- Only works with remote Supabase projects (not local development)

##### Auth Admin SDK Connection
- Requires `SUPABASE_SERVICE_ROLE_KEY` to be set
- For local development, connects to `http://127.0.0.1:54321`
- For remote projects, connects to `https://[project_ref].supabase.co`

#### Configuration Methods

The server looks for configuration in this order (highest to lowest priority):

1. **Environment Variables**: Values set directly in your environment
2. **Local `.env` File**: A `.env` file in your current working directory (only works when running from source)
3. **Global Config File**:
   - Windows: `%APPDATA%\supabase-mcp\.env`
   - macOS/Linux: `~/.config/supabase-mcp/.env`
4. **Default Settings**: Local development defaults (if no other config is found)

> ⚠️ **Important**: When using the package installed via pipx or uv, local `.env` files in your project directory are **not** detected. You must use either environment variables or the global config file.

#### Setting Up Configuration

##### Option 1: Client-Specific Configuration (Recommended)

Set environment variables directly in your MCP client configuration (see client-specific setup instructions in Step 3). Most MCP clients support this approach, which keeps your configuration with your client settings.

##### Option 2: Global Configuration

Create a global `.env` configuration file that will be used for all MCP server instances:

```bash
# Create config directory
# On macOS/Linux
mkdir -p ~/.config/supabase-mcp
# On Windows (PowerShell)
mkdir -Force "$env:APPDATA\supabase-mcp"

# Create and edit .env file
# On macOS/Linux
nano ~/.config/supabase-mcp/.env
# On Windows (PowerShell)
notepad "$env:APPDATA\supabase-mcp\.env"
```

Add your configuration values to the file:

```
QUERY_API_KEY=your-api-key
SUPABASE_PROJECT_REF=your-project-ref
SUPABASE_DB_PASSWORD=your-db-password
SUPABASE_REGION=us-east-1
SUPABASE_ACCESS_TOKEN=your-access-token
SUPABASE_SERVICE_ROLE_KEY=your-service-role-key
```

##### Option 3: Project-Specific Configuration (Source Installation Only)

If you're running the server from source (not via package), you can create a `.env` file in your project directory with the same format as above.

#### Finding Your Supabase Project Information

- **Project Reference**: Found in your Supabase project URL: `https://supabase.com/dashboard/project/<project-ref>`
- **Database Password**: Set during project creation or found in Project Settings → Database
- **Access Token**: Generate at https://supabase.com/dashboard/account/tokens
- **Service Role Key**: Found in Project Settings → API → Project API keys

#### Supported Regions

The server supports all Supabase regions:

- `us-west-1` - West US (North California)
- `us-east-1` - East US (North Virginia) - default
- `us-east-2` - East US (Ohio)
- `ca-central-1` - Canada (Central)
- `eu-west-1` - West EU (Ireland)
- `eu-west-2` - West Europe (London)
- `eu-west-3` - West EU (Paris)
- `eu-central-1` - Central EU (Frankfurt)
- `eu-central-2` - Central Europe (Zurich)
- `eu-north-1` - North EU (Stockholm)
- `ap-south-1` - South Asia (Mumbai)
- `ap-southeast-1` - Southeast Asia (Singapore)
- `ap-northeast-1` - Northeast Asia (Tokyo)
- `ap-northeast-2` - Northeast Asia (Seoul)
- `ap-southeast-2` - Oceania (Sydney)
- `sa-east-1` - South America (São Paulo)

#### Limitations

- **No Self-Hosted Support**: The server only supports official Supabase.com hosted projects and local development
- **No Connection String Support**: Custom connection strings are not supported
- **No Session Pooling**: Only transaction pooling is supported for database connections
- **API and SDK Features**: Management API and Auth Admin SDK features only work with remote Supabase projects, not local development

### Step 3. Usage

In general, any MCP client that supports `stdio` protocol should work with this MCP server. This server was explicitly tested to work with:
- Cursor
- Windsurf
- Cline
- Claude Desktop

Additionally, you can also use smithery.ai to install this server a number of clients, including the ones above.

Follow the guides below to install this MCP server in your client.

#### Cursor
Go to Settings -> Features -> MCP Servers and add a new server with this configuration:
```bash
# can be set to any name
name: supabase
type: command
# if you installed with pipx
command: supabase-mcp-server
# if you installed with uv
command: uv run supabase-mcp-server
# if the above doesn't work, use the full path (recommended)
command: /full/path/to/supabase-mcp-server  # Find with 'which supabase-mcp-server' (macOS/Linux) or 'where supabase-mcp-server' (Windows)
```

If configuration is correct, you should see a green dot indicator and the number of tools exposed by the server.
![How successful Cursor config looks like](https://github.com/user-attachments/assets/45df080a-8199-4aca-b59c-a84dc7fe2c09)

#### Windsurf
Go to Cascade -> Click on the hammer icon -> Configure -> Fill in the configuration:
```json
{
    "mcpServers": {
      "supabase": {
        "command": "/Users/username/.local/bin/supabase-mcp-server",  // update path
        "env": {
          "QUERY_API_KEY": "your-api-key",  // Required - get your API key at thequery.dev
          "SUPABASE_PROJECT_REF": "your-project-ref",
          "SUPABASE_DB_PASSWORD": "your-db-password",
          "SUPABASE_REGION": "us-east-1",  // optional, defaults to us-east-1
          "SUPABASE_ACCESS_TOKEN": "your-access-token",  // optional, for management API
          "SUPABASE_SERVICE_ROLE_KEY": "your-service-role-key"  // optional, for Auth Admin SDK
        }
      }
    }
}
```
If configuration is correct, you should see green dot indicator and clickable supabase server in the list of available servers.

![How successful Windsurf config looks like](https://github.com/user-attachments/assets/322b7423-8c71-410b-bcab-aff1b143faa4)

#### Claude Desktop
Claude Desktop also supports MCP servers through a JSON configuration. Follow these steps to set up the Supabase MCP server:

1. **Find the full path to the executable** (this step is critical):
   ```bash
   # On macOS/Linux
   which supabase-mcp-server

   # On Windows
   where supabase-mcp-server
   ```
   Copy the full path that is returned (e.g., `/Users/username/.local/bin/supabase-mcp-server`).

2. **Configure the MCP server** in Claude Desktop:
   - Open Claude Desktop
   - Go to Settings → Developer -> Edit Config MCP Servers
   - Add a new configuration with the following JSON:

   ```json
   {
     "mcpServers": {
       "supabase": {
         "command": "/full/path/to/supabase-mcp-server",  // Replace with the actual path from step 1
         "env": {
           "QUERY_API_KEY": "your-api-key",  // Required - get your API key at thequery.dev
           "SUPABASE_PROJECT_REF": "your-project-ref",
           "SUPABASE_DB_PASSWORD": "your-db-password",
           "SUPABASE_REGION": "us-east-1",  // optional, defaults to us-east-1
           "SUPABASE_ACCESS_TOKEN": "your-access-token",  // optional, for management API
           "SUPABASE_SERVICE_ROLE_KEY": "your-service-role-key"  // optional, for Auth Admin SDK
         }
       }
     }
   }
   ```

> ⚠️ **Important**: Unlike Windsurf and Cursor, Claude Desktop requires the **full absolute path** to the executable. Using just the command name (`supabase-mcp-server`) will result in a "spawn ENOENT" error.

If configuration is correct, you should see the Supabase MCP server listed as available in Claude Desktop.

![How successful Windsurf config looks like](https://github.com/user-attachments/assets/500bcd40-6245-40a7-b23b-189827ed2923)

#### Cline
Cline also supports MCP servers through a similar JSON configuration. Follow these steps to set up the Supabase MCP server:

1. **Find the full path to the executable** (this step is critical):
   ```bash
   # On macOS/Linux
   which supabase-mcp-server

   # On Windows
   where supabase-mcp-server
   ```
   Copy the full path that is returned (e.g., `/Users/username/.local/bin/supabase-mcp-server`).

2. **Configure the MCP server** in Cline:
   - Open Cline in VS Code
   - Click on the "MCP Servers" tab in the Cline sidebar
   - Click "Configure MCP Servers"
   - This will open the `cline_mcp_settings.json` file
   - Add the following configuration:

   ```json
   {
     "mcpServers": {
       "supabase": {
         "command": "/full/path/to/supabase-mcp-server",  // Replace with the actual path from step 1
         "env": {
           "QUERY_API_KEY": "your-api-key",  // Required - get your API key at thequery.dev
           "SUPABASE_PROJECT_REF": "your-project-ref",
           "SUPABASE_DB_PASSWORD": "your-db-password",
           "SUPABASE_REGION": "us-east-1",  // optional, defaults to us-east-1
           "SUPABASE_ACCESS_TOKEN": "your-access-token",  // optional, for management API
           "SUPABASE_SERVICE_ROLE_KEY": "your-service-role-key"  // optional, for Auth Admin SDK
         }
       }
     }
   }
   ```

If configuration is correct, you should see a green indicator next to the Supabase MCP server in the Cline MCP Servers list, and a message confirming "supabase MCP server connected" at the bottom of the panel.

![How successful configuration in Cline looks like](https://github.com/user-attachments/assets/6c4446ad-7a58-44c6-bf12-6c82222bbe59)

### Troubleshooting

Here are some tips & tricks that might help you:
- **Debug installation** - run `supabase-mcp-server` directly from the terminal to see if it works. If it doesn't, there might be an issue with the installation.
- **MCP Server configuration** - if the above step works, it means the server is installed and configured correctly. As long as you provided the right command, IDE should be able to connect. Make sure to provide the right path to the server executable.
- **"No tools found" error** - If you see "Client closed - no tools available" in Cursor despite the package being installed:
  - Find the full path to the executable by running `which supabase-mcp-server` (macOS/Linux) or `where supabase-mcp-server` (Windows)
  - Use the full path in your MCP server configuration instead of just `supabase-mcp-server`
  - For example: `/Users/username/.local/bin/supabase-mcp-server` or `C:\Users\username\.local\bin\supabase-mcp-server.exe`
- **Environment variables** - to connect to the right database, make sure you either set env variables in `mcp_config.json` or in `.env` file placed in a global config directory (`~/.config/supabase-mcp/.env` on macOS/Linux or `%APPDATA%\supabase-mcp\.env` on Windows).
- **Accessing logs** - The MCP server writes detailed logs to a file:
  - Log file location:
    - macOS/Linux: `~/.local/share/supabase-mcp/mcp_server.log`
    - Windows: `%USERPROFILE%\.local\share\supabase-mcp\mcp_server.log`
  - Logs include connection status, configuration details, and operation results
  - View logs using any text editor or terminal commands:
    ```bash
    # On macOS/Linux
    cat ~/.local/share/supabase-mcp/mcp_server.log

    # On Windows (PowerShell)
    Get-Content "$env:USERPROFILE\.local\share\supabase-mcp\mcp_server.log"
    ```

If you are stuck or any of the instructions above are incorrect, please raise an issue.

### MCP Inspector
A super useful tool to help debug MCP server issues is MCP Inspector. If you installed from source, you can run `supabase-mcp-inspector` from the project repo and it will run the inspector instance. Coupled with logs this will give you complete overview over what's happening in the server.
> 📝 Running `supabase-mcp-inspector`, if installed from package, doesn't work properly - I will validate and fix in the coming release.

## Feature Overview

### Database query tools

Since v0.3+ server provides comprehensive database management capabilities with built-in safety controls:

- **SQL Query Execution**: Execute PostgreSQL queries with risk assessment
  - **Three-tier safety system**:
    - `safe`: Read-only operations (SELECT) - always allowed
    - `write`: Data modifications (INSERT, UPDATE, DELETE) - require unsafe mode
    - `destructive`: Schema changes (DROP, CREATE) - require unsafe mode + confirmation

- **SQL Parsing and Validation**:
  - Uses PostgreSQL's parser (pglast) for accurate analysis and provides clear feedback on safety requirements

- **Automatic Migration Versioning**:
  - Database-altering operations operations are automatically versioned
  - Generates descriptive names based on operation type and target


- **Safety Controls**:
  - Default SAFE mode allows only read-only operations
  - All statements run in transaction mode via `asyncpg`
  - 2-step confirmation for high-risk operations

- **Available Tools**:
  - `get_schemas`: Lists schemas with sizes and table counts
  - `get_tables`: Lists tables, foreign tables, and views with metadata
  - `get_table_schema`: Gets detailed table structure (columns, keys, relationships)
  - `execute_postgresql`: Executes SQL statements against your database
  - `confirm_destructive_operation`: Executes high-risk operations after confirmation
  - `retrieve_migrations`: Gets migrations with filtering and pagination options
  - `live_dangerously`: Toggles between safe and unsafe modes

### Management API tools

Since v0.3.0 server provides secure access to the Supabase Management API with built-in safety controls:

- **Available Tools**:
  - `send_management_api_request`: Sends arbitrary requests to Supabase Management API with auto-injection of project ref
  - `get_management_api_spec`: Gets the enriched API specification with safety information
    - Supports multiple query modes: by domain, by specific path/method, or all paths
    - Includes risk assessment information for each endpoint
    - Provides detailed parameter requirements and response formats
    - Helps LLMs understand the full capabilities of the Supabase Management API
  - `get_management_api_safety_rules`: Gets all safety rules with human-readable explanations
  - `live_dangerously`: Toggles between safe and unsafe operation modes

- **Safety Controls**:
  - Uses the same safety manager as database operations for consistent risk management
  - Operations categorized by risk level:
    - `safe`: Read-only operations (GET) - always allowed
    - `unsafe`: State-changing operations (POST, PUT, PATCH, DELETE) - require unsafe mode
    - `blocked`: Destructive operations (delete project, etc.) - never allowed
  - Default safe mode prevents accidental state changes
  - Path-based pattern matching for precise safety rules

**Note**: Management API tools only work with remote Supabase instances and are not compatible with local Supabase development setups.

### Auth Admin tools

I was planning to add support for Python SDK methods to the MCP server. Upon consideration I decided to only add support for Auth admin methods as I often found myself manually creating test users which was prone to errors and time consuming. Now I can just ask Cursor to create a test user and it will be done seamlessly. Check out the full Auth Admin SDK method docs to know what it can do.

Since v0.3.6 server supports direct access to Supabase Auth Admin methods via Python SDK:
  - Includes the following tools:
    - `get_auth_admin_methods_spec` to retrieve documentation for all available Auth Admin methods
    - `call_auth_admin_method` to directly invoke Auth Admin methods with proper parameter handling
  - Supported methods:
    - `get_user_by_id`: Retrieve a user by their ID
    - `list_users`: List all users with pagination
    - `create_user`: Create a new user
    - `delete_user`: Delete a user by their ID
    - `invite_user_by_email`: Send an invite link to a user's email
    - `generate_link`: Generate an email link for various authentication purposes
    - `update_user_by_id`: Update user attributes by ID
    - `delete_factor`: Delete a factor on a user (currently not implemented in SDK)

#### Why use Auth Admin SDK instead of raw SQL queries?

The Auth Admin SDK provides several key advantages over direct SQL manipulation:
- **Functionality**: Enables operations not possible with SQL alone (invites, magic links, MFA)
- **Accuracy**: More reliable then creating and executing raw SQL queries on auth schemas
- **Simplicity**: Offers clear methods with proper validation and error handling

  - Response format:
    - All methods return structured Python objects instead of raw dictionaries
    - Object attributes can be accessed using dot notation (e.g., `user.id` instead of `user["id"]`)
  - Edge cases and limitations:
    - UUID validation: Many methods require valid UUID format for user IDs and will return specific validation errors
    - Email configuration: Methods like `invite_user_by_email` and `generate_link` require email sending to be configured in your Supabase project
    - Link types: When generating links, different link types have different requirements:
      - `signup` links don't require the user to exist
      - `magiclink` and `recovery` links require the user to already exist in the system
    - Error handling: The server provides detailed error messages from the Supabase API, which may differ from the dashboard interface
    - Method availability: Some methods like `delete_factor` are exposed in the API but not fully implemented in the SDK

### Logs & Analytics

The server provides access to Supabase logs and analytics data, making it easier to monitor and troubleshoot your applications:

- **Available Tool**: `retrieve_logs` - Access logs from any Supabase service

- **Log Collections**:
  - `postgres`: Database server logs
  - `api_gateway`: API gateway requests
  - `auth`: Authentication events
  - `postgrest`: RESTful API service logs
  - `pooler`: Connection pooling logs
  - `storage`: Object storage operations
  - `realtime`: WebSocket subscription logs
  - `edge_functions`: Serverless function executions
  - `cron`: Scheduled job logs
  - `pgbouncer`: Connection pooler logs

- **Features**: Filter by time, search text, apply field filters, or use custom SQL queries

Simplifies debugging across your Supabase stack without switching between interfaces or writing complex queries.

### Automatic Versioning of Database Changes

"With great power comes great responsibility." While `execute_postgresql` tool coupled with aptly named `live_dangerously` tool provide a powerful and simple way to manage your Supabase database, it also means that dropping a table or modifying one is one chat message away. In order to reduce the risk of irreversible changes, since v0.3.8 the server supports:
- automatic creation of migration scripts for all write & destructive sql operations executed on the database
- improved safety mode of query execution, in which all queries are categorized in:
  - `safe` type: always allowed. Includes all read-only ops.
  - `write`type: requires `write` mode to be enabled by the user.
  - `destructive` type: requires `write` mode to be enabled by the user AND a 2-step confirmation of query execution for clients that do not execute tools automatically.

### Universal Safety Mode
Since v0.3.8 Safety Mode has been standardized across all services (database, API, SDK) using a universal safety manager. This provides consistent risk management and a unified interface for controlling safety settings across the entire MCP server.

All operations (SQL queries, API requests, SDK methods) are categorized into risk levels:
- `Low` risk: Read-only operations that don't modify data or structure (SELECT queries, GET API requests)
- `Medium` risk: Write operations that modify data but not structure (INSERT/UPDATE/DELETE, most POST/PUT API requests)
- `High` risk: Destructive operations that modify database structure or could cause data loss (DROP/TRUNCATE, DELETE API endpoints)
- `Extreme` risk: Operations with severe consequences that are blocked entirely (deleting projects)

Safety controls are applied based on risk level:
- Low risk operations are always allowed
- Medium risk operations require unsafe mode to be enabled
- High risk operations require unsafe mode AND explicit confirmation
- Extreme risk operations are never allowed

#### How confirmation flow works

Any high-risk operations (be it a postgresql or api request) will be blocked even in `unsafe` mode.
![Every high-risk operation is blocked](https://github.com/user-attachments/assets/c0df79c2-a879-4b1f-a39d-250f9965c36a)
You will have to confirm and approve every high-risk operation explicitly in order for it to be executed.
![Explicit approval is always required](https://github.com/user-attachments/assets/5cd7a308-ec2a-414e-abe2-ff2f3836dd8b)


## Changelog

- 📦 Simplified installation via package manager - ✅ (v0.2.0)
- 🌎 Support for different Supabase regions - ✅ (v0.2.2)
- 🎮 Programmatic access to Supabase management API with safety controls - ✅ (v0.3.0)
- 👷‍♂️ Read and read-write database SQL queries with safety controls - ✅ (v0.3.0)
- 🔄 Robust transaction handling for both direct and pooled connections - ✅ (v0.3.2)
- 🐍 Support methods and objects available in native Python SDK - ✅ (v0.3.6)
- 🔍 Stronger SQL query validation ✅ (v0.3.8)
- 📝 Automatic versioning of database changes ✅ (v0.3.8)
- 📖 Radically improved knowledge and tools of api spec ✅ (v0.3.8)
- ✍️ Improved consistency of migration-related tools for a more organized database vcs ✅ (v0.3.10)
- 🥳 Query MCP is released (v0.4.0)


For a more detailed roadmap, please see this [discussion](https://github.com/alexander-zuev/supabase-mcp-server/discussions/46) on GitHub.


## Star History

[![Star History Chart](https://api.star-history.com/svg?repos=alexander-zuev/supabase-mcp-server&type=Date)](https://star-history.com/#alexander-zuev/supabase-mcp-server&Date)

---

Enjoy! ☺️

```

--------------------------------------------------------------------------------
/CONTRIBUTING.MD:
--------------------------------------------------------------------------------

```markdown
# Contributing to Supabase MCP Server

Thank you for your interest in Supabase MCP Server. This project aims to maintain a high quality standard I've set for it. I welcome and carefully review all contributions. Please read the following guidelines carefully.

## 🤓 Important: Pre-Contribution Requirements

1. **Required: Open a Discussion First**
   - **All contributions** must start with a GitHub Discussion before any code is written
   - Explain your proposed changes, why they're needed, and how they align with the project's vision
   - Wait for explicit approval from the maintainer before proceeding
   - PRs without a prior approved discussion will be closed immediately without review

2. **Project Vision**
   - This project follows a specific development vision maintained by the owner
   - Not all feature ideas will be accepted, even if well-implemented
   - The maintainer reserves the right to decline contributions that don't align with the project's direction

## 🛠️ Contribution Process (Only After Discussion Approval)

1. **Fork the repository:** Click the "Fork" button in the top right corner of the GitHub page.

2. **Create a new branch:** Create a branch with a descriptive name related to your contribution.
   ```bash
   git checkout -b feature/your-approved-feature
   ```

3. **Quality Requirements:**
   - **Test Coverage:** All code changes must include appropriate tests
   - **Documentation:** Update all relevant documentation
   - **Code Style:** Follow the existing code style and patterns
   - **Commit Messages:** Use clear, descriptive commit messages

4. **Make your changes:** Implement the changes that were approved in the discussion.

5. **Test thoroughly:** Ensure all tests pass and add new tests for your changes.
   ```bash
   # Run tests
   pytest
   ```

6. **Commit your changes:** Use clear, descriptive commit messages that explain what you've done.
   ```bash
   git commit -m "feat: implement approved feature X"
   ```

7. **Push your branch:** Push your changes to your forked repository.
   ```bash
   git push origin feature/your-approved-feature
   ```

8. **Create a pull request:**
   - Go to the original repository on GitHub
   - Click "New Pull Request"
   - Select "compare across forks"
   - Select your fork and branch as the source
   - Add a detailed description that references the approved discussion
   - Include information about how you've tested the changes
   - Submit the pull request

9. **Review Process:**
   - PRs will be reviewed when time permits
   - Be prepared to make requested changes
   - The maintainer may request significant revisions
   - PRs may be rejected even after review if they don't meet quality standards

## ⚠️ Grounds for Immediate Rejection

Your PR will be closed without review if:
- No prior discussion was opened and approved
- Tests are missing or failing
- Documentation is not updated
- Code quality doesn't meet project standards
- PR description is inadequate
- Changes don't align with the approved discussion

## 🤔 Why These Requirements?

- This project is maintained by a single developer (me) with limited review time
- Quality and consistency are prioritized over quantity of contributions
- The project follows a specific vision that I want to maintain

## 🌟 Acceptable Contributions

The following types of contributions are most welcome:
- Bug fixes with clear reproduction steps
- Performance improvements with benchmarks
- Documentation improvements
- New features that have been pre-approved via discussion

## 💡 Alternative Ways to Contribute

If you have ideas but don't want to go through this process:
- Fork the project and build your own version
- Share your use case in Discussions
- Report bugs with detailed reproduction steps

Thank you for understanding and respecting these guidelines. They help maintain the quality and direction of the project.

```

--------------------------------------------------------------------------------
/supabase_mcp/services/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/supabase_mcp/services/api/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/supabase_mcp/services/logs/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/supabase_mcp/services/safety/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/supabase_mcp/services/sdk/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/tests/services/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/tests/services/api/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/tests/services/database/sql/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/tests/services/database/sql/test_sql_validator_integration.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/.github/FUNDING.yml:
--------------------------------------------------------------------------------

```yaml
github: alexander-zuev

```

--------------------------------------------------------------------------------
/.claude/settings.local.json:
--------------------------------------------------------------------------------

```json
{
  "permissions": {
    "allow": [
      "Bash(mv:*)"
    ],
    "deny": []
  }
}
```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/queries/create_migration.sql:
--------------------------------------------------------------------------------

```sql
-- Create a migration
INSERT INTO supabase_migrations.schema_migrations
(version, name, statements)
VALUES ('{version}', '{name}', ARRAY['{statements}']);

```

--------------------------------------------------------------------------------
/supabase_mcp/__init__.py:
--------------------------------------------------------------------------------

```python
"""Supabase MCP Server package."""

from supabase_mcp._version import __version__, version, version_tuple

__all__ = ["__version__", "version", "version_tuple"]

```

--------------------------------------------------------------------------------
/supabase_mcp/core/__init__.py:
--------------------------------------------------------------------------------

```python
"""Supabase MCP Server package."""

from supabase_mcp._version import __version__, version, version_tuple

__all__ = ["__version__", "version", "version_tuple"]

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/queries/logs/postgrest_logs.sql:
--------------------------------------------------------------------------------

```sql
SELECT
  id,
  postgrest_logs.timestamp,
  event_message,
  identifier,
  metadata.host
FROM postgrest_logs
CROSS JOIN unnest(metadata) AS metadata
{where_clause}
ORDER BY timestamp DESC
LIMIT {limit};

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/queries/logs/pgbouncer_logs.sql:
--------------------------------------------------------------------------------

```sql
SELECT
  id,
  pgbouncer_logs.timestamp,
  event_message,
  metadata.host,
  metadata.project
FROM pgbouncer_logs
CROSS JOIN unnest(metadata) AS metadata
{where_clause}
ORDER BY timestamp DESC
LIMIT {limit};

```

--------------------------------------------------------------------------------
/supabase_mcp/tools/__init__.py:
--------------------------------------------------------------------------------

```python
"""Tool manager module for Supabase MCP server.

This module provides a centralized way to manage tool descriptions and registration.
"""

from .manager import ToolManager, ToolName

__all__ = ["ToolManager", "ToolName"]

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/queries/logs/supavisor_logs.sql:
--------------------------------------------------------------------------------

```sql
SELECT
  id,
  supavisor_logs.timestamp,
  event_message,
  metadata.level,
  metadata.project,
  metadata.region
FROM supavisor_logs
CROSS JOIN unnest(metadata) AS metadata
{where_clause}
ORDER BY timestamp DESC
LIMIT {limit};

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/queries/logs/auth_logs.sql:
--------------------------------------------------------------------------------

```sql
SELECT
  id,
  auth_logs.timestamp,
  event_message,
  metadata.level,
  metadata.status,
  metadata.path,
  metadata.msg
FROM auth_logs
CROSS JOIN unnest(metadata) AS metadata
{where_clause}
ORDER BY timestamp DESC
LIMIT {limit};

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/queries/logs/storage_logs.sql:
--------------------------------------------------------------------------------

```sql
SELECT
  id,
  storage_logs.timestamp,
  event_message,
  metadata.level,
  metadata.project,
  metadata.responseTime,
  metadata.rawError
FROM storage_logs
CROSS JOIN unnest(metadata) AS metadata
{where_clause}
ORDER BY timestamp DESC
LIMIT {limit};

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/queries/logs/postgres_logs.sql:
--------------------------------------------------------------------------------

```sql
SELECT
  id,
  postgres_logs.timestamp,
  event_message,
  identifier,
  parsed.error_severity,
  parsed.query,
  parsed.application_name
FROM postgres_logs
CROSS JOIN unnest(metadata) AS m
CROSS JOIN unnest(m.parsed) AS parsed
{where_clause}
ORDER BY timestamp DESC
LIMIT {limit};

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/queries/logs/edge_logs.sql:
--------------------------------------------------------------------------------

```sql
SELECT
  id,
  edge_logs.timestamp,
  event_message,
  identifier,
  request.method,
  request.path,
  response.status_code,
  request.url,
  response.origin_time
FROM edge_logs
CROSS JOIN unnest(metadata) AS m
CROSS JOIN unnest(m.request) AS request
CROSS JOIN unnest(m.response) AS response
{where_clause}
ORDER BY timestamp DESC
LIMIT {limit};

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/queries/init_migrations.sql:
--------------------------------------------------------------------------------

```sql
-- Initialize migrations infrastructure
-- Create the migrations schema if it doesn't exist
CREATE SCHEMA IF NOT EXISTS supabase_migrations;

-- Create the migrations table if it doesn't exist
CREATE TABLE IF NOT EXISTS supabase_migrations.schema_migrations (
    version TEXT PRIMARY KEY,
    statements TEXT[] NOT NULL,
    name TEXT NOT NULL
);

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/queries/logs/cron_logs.sql:
--------------------------------------------------------------------------------

```sql
SELECT
  id,
  postgres_logs.timestamp,
  event_message,
  identifier,
  parsed.error_severity,
  parsed.query,
  parsed.application_name
FROM postgres_logs
CROSS JOIN unnest(metadata) AS m
CROSS JOIN unnest(m.parsed) AS parsed
WHERE (parsed.application_name = 'pg_cron' OR event_message LIKE '%cron job%')
{and_where_clause}
ORDER BY timestamp DESC
LIMIT {limit};

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/queries/logs/realtime_logs.sql:
--------------------------------------------------------------------------------

```sql
SELECT
  id,
  realtime_logs.timestamp,
  event_message,
  metadata.level,
  measurements.connected,
  measurements.connected_cluster,
  measurements.limit,
  measurements.sum,
  metadata.external_id
FROM realtime_logs
CROSS JOIN unnest(metadata) AS metadata
CROSS JOIN unnest(metadata.measurements) AS measurements
{where_clause}
ORDER BY timestamp DESC
LIMIT {limit};

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM python:3.12-slim-bookworm

WORKDIR /app

# Prepare the basic dependencies
RUN apt-get update && apt-get install -y \
    && rm -rf /var/lib/apt/lists/*

# Install pipx
RUN pip install --no-cache-dir pipx && \
    pipx ensurepath && \
    pipx install supabase-mcp-server

# Add pipx bin directory to PATH
ENV PATH="/root/.local/bin:$PATH"

CMD ["supabase-mcp-server"]

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/queries/logs/function_edge_logs.sql:
--------------------------------------------------------------------------------

```sql
SELECT
  id,
  function_edge_logs.timestamp,
  event_message,
  m.deployment_id,
  m.execution_time_ms,
  m.function_id,
  m.project_ref,
  request.method,
  request.pathname,
  request.url,
  request.host,
  response.status_code,
  m.version
FROM function_edge_logs
CROSS JOIN unnest(metadata) AS m
CROSS JOIN unnest(m.request) AS request
CROSS JOIN unnest(m.response) AS response
{where_clause}
ORDER BY timestamp DESC
LIMIT {limit};

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/queries/get_migrations.sql:
--------------------------------------------------------------------------------

```sql
SELECT
    version,
    name,
    CASE
        WHEN '{include_full_queries}' = 'true' THEN statements
        ELSE NULL
    END AS statements,
    array_length(statements, 1) AS statement_count,
    CASE
        WHEN version ~ '^[0-9]+$' THEN 'numbered'
        ELSE 'named'
    END AS version_type
FROM supabase_migrations.schema_migrations
WHERE
    -- Filter by name if provided
    ('{name_pattern}' = '' OR name ILIKE '%' || '{name_pattern}' || '%')
ORDER BY
    -- Order by version (timestamp) descending
    version DESC
LIMIT {limit} OFFSET {offset};

```

--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/roadmap_item.md:
--------------------------------------------------------------------------------

```markdown
---
name: Roadmap Item (only for maintainers)
about: Reserved for maintainers, used to track roadmap items
title: "[target version] Brief description of the feature"
labels: roadmap
assignees: alexander-zuev

---

## Context
What is the context of this item?

## Motivation
Why should this feature be added? What problems does it solve? For whom?

## Expected Impact
- **Users**: How will this benefit users?
- **Development**: How does this improve the codebase or development process?

## Implementation Ideas
Any initial thoughts on how this could be implemented.

```

--------------------------------------------------------------------------------
/supabase_mcp/services/safety/models.py:
--------------------------------------------------------------------------------

```python
from enum import Enum, IntEnum


class OperationRiskLevel(IntEnum):
    """Universal operation risk level mapping.

    Higher number reflects higher risk levels with 4 being the highest."""

    LOW = 1
    MEDIUM = 2
    HIGH = 3
    EXTREME = 4


class SafetyMode(str, Enum):
    """Universal safety mode of a client (database, api, etc).
    Clients should always default to safe mode."""

    SAFE = "safe"
    UNSAFE = "unsafe"


class ClientType(str, Enum):
    """Types of clients that can be managed by the safety system."""

    DATABASE = "database"
    API = "api"

```

--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/feature_request.md:
--------------------------------------------------------------------------------

```markdown
---
name: Feature Request
about: Suggest an idea to improve the Supabase MCP server
title: "I want X so that I can do Y and gain Z"
labels: ''
assignees: ''

---

## Is your feature request related to a problem?
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

## Describe the solution you'd like
A clear and concise description of what you want to happen.

## Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

## Additional context
Add any other context or screenshots about the feature request here.

```

--------------------------------------------------------------------------------
/.github/PULL_REQUEST_TEMPLATE.md:
--------------------------------------------------------------------------------

```markdown
 # Description

<!-- Provide a clear and concise description of what this PR accomplishes -->

## Type of Change

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] Documentation update
- [ ] Performance improvement
- [ ] Code refactoring (no functional changes)
- [ ] Test updates
- [ ] CI/CD or build process changes
- [ ] Other (please describe):

## Checklist
- [ ] I have performed a self-review of my own code
- [ ] I have made corresponding changes to the documentation
- [ ] New and existing unit tests pass locally with my changes

```

--------------------------------------------------------------------------------
/codecov.yml:
--------------------------------------------------------------------------------

```yaml
codecov:
  require_ci_to_pass: true
  notify:
    wait_for_ci: true

coverage:
  precision: 1
  round: down
  range: "60...90"

  status:
    # For overall project coverage
    project:
      default:
        target: 80%       # Target coverage of 80%
        threshold: 5%     # Allow a 5% decrease without failing
        informational: true  # Start as informational until coverage improves

    # For coverage of new/changed code in PRs
    patch:
      default:
        target: 80%
        threshold: 5%     # Be a bit more lenient on PR patches
        informational: true  # Start as informational until coverage improves

    # Don't fail if there are no changes
    changes: no

# Configure PR comment
comment:
  layout: "reach, diff, flags, files"
  behavior: default
  require_changes: false  # Comment even if coverage doesn't change
  require_base: false
  require_head: true
  hide_project_coverage: false  # Show both project and diff coverage

# Ignore certain paths
ignore:
  - "tests/**/*"          # Don't count test files in coverage
  - "supabase_mcp/_version.py"  # Auto-generated version file

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/queries/get_schemas.sql:
--------------------------------------------------------------------------------

```sql
WITH t AS (
    -- Regular tables
    SELECT
        schemaname AS schema_name,
        tablename AS table_name,
        'regular' AS table_type
    FROM pg_tables

    UNION ALL

    -- Foreign tables
    SELECT
        foreign_table_schema AS schema_name,
        foreign_table_name AS table_name,
        'foreign' AS table_type
    FROM information_schema.foreign_tables
)

SELECT
    s.schema_name,
    COALESCE(PG_SIZE_PRETTY(SUM(
        COALESCE(
            CASE
                WHEN t.table_type = 'regular'
                    THEN PG_TOTAL_RELATION_SIZE(
                        QUOTE_IDENT(t.schema_name) || '.' || QUOTE_IDENT(t.table_name)
                    )
                ELSE 0
            END, 0
        )
    )), '0 B') AS total_size,
    COUNT(t.table_name) AS table_count
FROM information_schema.schemata AS s
LEFT JOIN t ON s.schema_name = t.schema_name
WHERE
    s.schema_name NOT IN ('pg_catalog', 'information_schema')
    AND s.schema_name NOT LIKE 'pg_%'
    AND s.schema_name NOT LIKE 'pg_toast%'
GROUP BY s.schema_name
ORDER BY
    COUNT(t.table_name) DESC,           -- Schemas with most tables first
    total_size DESC,                    -- Then by size
    s.schema_name ASC;                      -- Then alphabetically

```

--------------------------------------------------------------------------------
/supabase_mcp/logger.py:
--------------------------------------------------------------------------------

```python
import logging
import logging.handlers
from pathlib import Path


def setup_logger() -> logging.Logger:
    """Configure logging for the MCP server with log rotation."""
    logger = logging.getLogger("supabase-mcp")

    # Remove existing handlers to avoid duplicate logs
    if logger.hasHandlers():
        logger.handlers.clear()

    # Define a consistent log directory in the user's home folder
    log_dir = Path.home() / ".local" / "share" / "supabase-mcp"
    log_dir.mkdir(parents=True, exist_ok=True)  # Ensure the directory exists

    # Define the log file path
    log_file = log_dir / "mcp_server.log"

    # Create a rotating file handler
    # - Rotate when log reaches 5MB
    # - Keep 3 backup files
    file_handler = logging.handlers.RotatingFileHandler(
        log_file,
        maxBytes=5 * 1024 * 1024,  # 5MB
        backupCount=3,
        encoding="utf-8",
    )

    # Create formatter
    formatter = logging.Formatter("[%(asctime)s] %(levelname)-8s %(message)s", datefmt="%y/%m/%d %H:%M:%S")

    # Add formatter to file handler
    file_handler.setFormatter(formatter)

    # Add handler to logger
    logger.addHandler(file_handler)

    # Set level
    logger.setLevel(logging.DEBUG)

    return logger


logger = setup_logger()

```

--------------------------------------------------------------------------------
/.github/workflows/docs/release-checklist.md:
--------------------------------------------------------------------------------

```markdown
# Release Checklist

Pre-release
1. Tests pass
2. CI passes
3. Build succeeds
4. Clean install succeeds
5. Documentation is up to date
6. Changelog is up to date
7. Tag and release on GitHub
8. Release is published to PyPI
9. Update dockerfile
10. Update .env.example (if necessary)

Post-release
- Clean install from PyPi works


## v0.3.12 - 2025-03-12

Pre-release
1. Tests pass - [X]
2. CI passes - [X]
3. Build succeeds - [X]
4. Documentation is up to date - [X]
5. Changelog is up to date - [X]
6. Tag and release on GitHub - []
7. Release is published to PyPI - []
8. Update dockerfile - [X]
9. Update .env.example (if necessary) - [X]

Post-release
10. Clean install from PyPi works - []


## v0.3.8 - 2025-03-07

Pre-release
1. Tests pass - [X]
2. CI passes - [X]
3. Build succeeds - [X]
4. Documentation is up to date - [X]
5. Changelog is up to date - [X]
6. Tag and release on GitHub
7. Release is published to PyPI
8. Update dockerfile - [X]
9. Update .env.example (if necessary) - [X]

Post-release
10. Clean install from PyPi works - [X]



## v0.3.0 - 2025-02-22

1. Tests pass - [X]
2. CI passes - [X]
3. Build succeeds - [X]
4. Clean install succeeds - [X]
5. Documentation is up to date - [X]
6. Changelog is up to date - [X]
7. Tag and release on GitHub - [X]
8. Release is published to PyPI - [X]
9. Clean install from PyPI works - [X]

```

--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/bug_report.md:
--------------------------------------------------------------------------------

```markdown
---
name: Bug Report
about: Report an issue with the Supabase MCP server
title: "An issue with doing X when Y under conditions Z"
labels: bug
assignees: alexander-zuev

---

## ⚠️ IMPORTANT NOTE
The following types of reports will be closed immediately without investigation:
- Vague reports like "Something is not working" without clear explanations
- Issues missing reproduction steps or necessary context
- Reports without essential information (logs, environment details)
- Issues already covered in the README

Please provide complete information as outlined below.

## Describe the bug
A clear and concise description of what the bug is.

## Steps to Reproduce
1.
2.
3.

## Connection Details
- Connection type: (Local or Remote)
- Using password with special characters? (Yes/No)

## Screenshots
If applicable, add screenshots to help explain your problem.

## Logs
HIGHLY USEFUL: Attach server logs from:
- macOS/Linux: ~/.local/share/supabase-mcp/mcp_server.log
- Windows: %USERPROFILE%\.local\share\supabase-mcp\mcp_server.log

You can get the last 50 lines with:
```
tail -n 50 ~/.local/share/supabase-mcp/mcp_server.log
```

## Additional context
Add any other context about the problem here.

## Checklist
Mark completed items with an [x]:
- [ ] I've included the server logs
- [ ] I've checked the README troubleshooting section
- [ ] I've verified my connection settings are correct

```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
# Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml

build:
  dockerBuildPath: .

startCommand:
  type: stdio
  configSchema:
    type: object
    required:
      - supabaseProjectRef
      - supabaseDbPassword
      - supabaseRegion
      - queryApiKey
    properties:
      queryApiKey:
        type: string
        description: "(required) - Your Query API key"
      supabaseProjectRef:
        type: string
        description: "(required) - Supabase project reference ID. Defaults to: 127.0.0.1:54322"
      supabaseDbPassword:
        type: string
        description: "(required) - Database password"
      supabaseRegion:
        type: string
        description: "(required) - AWS region where your Supabase project is hosted - Default: us-east-1"
      supabaseAccessToken:
        type: string
        description: "(optional) - Personal access token for Supabase Management API - Default: none"
      supabaseServiceRoleKey:
        type: string
        description: "(optional) - Project Service Role Key for Auth Admin SDK - Default: none"
  commandFunction:
    |-
    (config) => ({
      command: 'supabase-mcp-server',
      args: [],
      env: {
        QUERY_API_KEY: config.queryApiKey,
        SUPABASE_PROJECT_REF: config.supabaseProjectRef,
        SUPABASE_DB_PASSWORD: config.supabaseDbPassword,
        SUPABASE_REGION: config.supabaseRegion,
        SUPABASE_ACCESS_TOKEN: config.supabaseAccessToken,
        SUPABASE_SERVICE_ROLE_KEY: config.supabaseServiceRoleKey
      }
    })

```

--------------------------------------------------------------------------------
/supabase_mcp/main.py:
--------------------------------------------------------------------------------

```python
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager

from mcp.server.fastmcp import FastMCP

from supabase_mcp.core.container import ServicesContainer
from supabase_mcp.logger import logger
from supabase_mcp.settings import settings
from supabase_mcp.tools.registry import ToolRegistry


# Create lifespan for the MCP server
@asynccontextmanager
async def lifespan(app: FastMCP) -> AsyncGenerator[FastMCP, None]:
    try:
        logger.info("Initializing services")

        # Initialize services
        services_container = ServicesContainer.get_instance()
        services_container.initialize_services(settings)

        # Register tools
        mcp = ToolRegistry(mcp=app, services_container=services_container).register_tools()
        yield mcp
    finally:
        logger.info("Shutting down services")
        services_container = ServicesContainer.get_instance()
        await services_container.shutdown_services()
        # Force kill the entire process - doesn't care about async contexts
        import os

        os._exit(0)  # Use 0 for successful termination


# Create mcp instance
mcp = FastMCP("supabase", lifespan=lifespan)


def run_server() -> None:
    logger.info("Starting Supabase MCP server")
    mcp.run()
    logger.info("This code runs only if I don't exit in lifespan")


def run_inspector() -> None:
    """Inspector mode - same as mcp dev"""
    logger.info("Starting Supabase MCP server inspector")

    from mcp.cli.cli import dev

    return dev(__file__)


if __name__ == "__main__":
    logger.info("Starting Supabase MCP server")
    run_server()

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/queries/get_table_schema.sql:
--------------------------------------------------------------------------------

```sql
WITH pk AS (
    SELECT ccu.column_name
    FROM information_schema.table_constraints AS tc
    INNER JOIN information_schema.constraint_column_usage AS ccu
        ON tc.constraint_name = ccu.constraint_name
    WHERE
        tc.table_schema = '{schema_name}'
        AND tc.table_name = '{table}'
        AND tc.constraint_type = 'PRIMARY KEY'
),

fk AS (
    SELECT
        kcu.column_name,
        ccu.table_name AS foreign_table_name,
        ccu.column_name AS foreign_column_name
    FROM information_schema.table_constraints AS tc
    INNER JOIN information_schema.key_column_usage AS kcu
        ON tc.constraint_name = kcu.constraint_name
    INNER JOIN information_schema.constraint_column_usage AS ccu
        ON tc.constraint_name = ccu.constraint_name
    WHERE
        tc.table_schema = '{schema_name}'
        AND tc.table_name = '{table}'
        AND tc.constraint_type = 'FOREIGN KEY'
)

SELECT DISTINCT
    c.column_name,
    c.data_type,
    c.is_nullable,
    c.column_default,
    c.ordinal_position,
    fk.foreign_table_name,
    fk.foreign_column_name,
    col_description(pc.oid, c.ordinal_position) AS column_description,
    coalesce(pk.column_name IS NOT NULL, FALSE) AS is_primary_key
FROM information_schema.columns AS c
INNER JOIN pg_class AS pc
    ON
        pc.relname = '{table}'
        AND pc.relnamespace = (
            SELECT oid FROM pg_namespace
            WHERE nspname = '{schema_name}'
        )
LEFT JOIN pk ON c.column_name = pk.column_name
LEFT JOIN fk ON c.column_name = fk.column_name
WHERE
    c.table_schema = '{schema_name}'
    AND c.table_name = '{table}'
ORDER BY c.ordinal_position;

```

--------------------------------------------------------------------------------
/.github/workflows/publish.yaml:
--------------------------------------------------------------------------------

```yaml
name: Publish to PyPI

on:
  release:
    types: [published]
    branches: [main]  # Only trigger for releases from main

env:
  UV_VERSION: "0.6.0" # Pin uv version to avoid breaking changes

jobs:
  build-and-publish:
    runs-on: ubuntu-latest
    environment:
      name: pypi
      url: https://pypi.org/project/supabase-mcp-server/
    permissions:
      id-token: write  # Required for trusted publishing
      contents: read

    steps:
      - uses: actions/checkout@v4
        with:
          fetch-depth: 0  # Required for proper version detection

      - name: Install uv
        uses: astral-sh/setup-uv@v5
        with:
          version: ${{ env.UV_VERSION }}

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.12"

      - name: Build package
        run: uv build --no-sources

      - name: Verify package installation and entry points
        env:
          SUPABASE_PROJECT_REF: ${{ secrets.SUPABASE_PROJECT_REF }}
          SUPABASE_DB_PASSWORD: ${{ secrets.SUPABASE_DB_PASSWORD }}
        run: |
          # Create a new venv for testing
          uv venv
          source .venv/bin/activate

          # Install the built wheel
          uv pip install dist/*.whl

          echo "Testing supabase-mcp-server entry point..."
          # Run with --help to test basic functionality without needing actual connection
          if ! uv run supabase-mcp-server --help; then
            echo "❌ supabase-mcp-server --help failed"
            exit 1
          fi
          echo "✅ supabase-mcp-server --help succeeded"

      - name: Publish to PyPI
        uses: pypa/gh-action-pypi-publish@release/v1

```

--------------------------------------------------------------------------------
/.github/workflows/ci.yaml:
--------------------------------------------------------------------------------

```yaml
name: CI

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]


env:
  UV_VERSION: "0.6.1" # Pin uv version to avoid breaking changes

jobs:
  test:
    runs-on: ubuntu-latest
    env:
      # Test environment variables - no real secrets needed anymore
      SUPABASE_PROJECT_REF: "abcdefghij1234567890"
      SUPABASE_DB_PASSWORD: "test-password-123"
      SUPABASE_ACCESS_TOKEN: "test-access-token"
      SUPABASE_SERVICE_ROLE_KEY: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python 3.12
        uses: actions/setup-python@v5
        with:
          python-version: "3.12"

      - name: Install uv
        uses: astral-sh/setup-uv@v5
        with:
          version: ${{ env.UV_VERSION }}

      - name: Create venv and install dependencies
        run: |
          # Create venv and install dependencies
          uv venv
          source .venv/bin/activate
          uv sync --group dev --frozen

      - name: Run tests
        run: |
          source .venv/bin/activate # necessary for pytest
          pytest --cov=supabase_mcp --cov-report=xml --cov-report=term

      - name: Upload coverage to Codecov
        uses: codecov/codecov-action@v3
        with:
          token: ${{ secrets.CODECOV_TOKEN }}
          files: ./coverage.xml
          fail_ci_if_error: false

      - name: Build distribution packages
        run: |
          uv build --no-sources
          # Verify dist contains both wheel and tar.gz
          test -f dist/*.whl
          test -f dist/*.tar.gz

```

--------------------------------------------------------------------------------
/tests/test_container.py:
--------------------------------------------------------------------------------

```python
from typing import Any, cast

from mcp.server.fastmcp import FastMCP

from supabase_mcp.core.container import ServicesContainer
from supabase_mcp.settings import Settings


class TestContainer:
    """Tests for the Container class functionality."""

    def test_container_initialization(self, container_integration: ServicesContainer):
        """Test that the container is properly initialized with all services."""
        # Verify all services are properly initialized
        assert container_integration.postgres_client is not None
        assert container_integration.api_client is not None
        assert container_integration.sdk_client is not None
        assert container_integration.api_manager is not None
        assert container_integration.safety_manager is not None
        assert container_integration.query_manager is not None
        assert container_integration.tool_manager is not None
        assert container_integration.mcp_server is not None

    def test_container_initialize_method(self, settings_integration: Settings, mock_mcp_server: Any):
        """Test the initialize method creates all services properly."""
        # Create empty container
        container = ServicesContainer(mcp_server=cast(FastMCP, mock_mcp_server))

        # Initialize with settings
        container.initialize_services(settings_integration)

        # Verify all services were created
        assert container.postgres_client is not None
        assert container.api_client is not None
        assert container.sdk_client is not None
        assert container.api_manager is not None
        assert container.safety_manager is not None
        assert container.query_manager is not None
        assert container.tool_manager is not None
        assert container.mcp_server is not None

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/queries/get_tables.sql:
--------------------------------------------------------------------------------

```sql
(
-- Regular tables & views: full metadata available
    SELECT
        t.table_name,
        t.table_type,
        obj_description(pc.oid) AS description,
        pg_total_relation_size(format('%I.%I', t.table_schema, t.table_name))::bigint AS size_bytes,
        pg_stat_get_live_tuples(pc.oid)::bigint AS row_count,
        (
            SELECT count(*) FROM information_schema.columns AS c
            WHERE
                c.table_schema = t.table_schema
                AND c.table_name = t.table_name
        ) AS column_count,
        (
            SELECT count(*) FROM pg_indexes AS i
            WHERE
                i.schemaname = t.table_schema
                AND i.tablename = t.table_name
        ) AS index_count
    FROM information_schema.tables AS t
    INNER JOIN pg_class AS pc
        ON
            t.table_name = pc.relname
            AND pc.relnamespace = (
                SELECT oid FROM pg_namespace
                WHERE nspname = '{schema_name}'
            )
    WHERE
        t.table_schema = '{schema_name}'
        AND t.table_type IN ('BASE TABLE', 'VIEW')
)
UNION ALL
(
-- Foreign tables: limited metadata (size & row count functions don't apply)
    SELECT
        ft.foreign_table_name AS table_name,
        'FOREIGN TABLE' AS table_type,
        (
            SELECT obj_description(
                (quote_ident(ft.foreign_table_schema) || '.' || quote_ident(ft.foreign_table_name))::regclass
            )
        ) AS description,
        0::bigint AS size_bytes,
        NULL::bigint AS row_count,
        (
            SELECT count(*) FROM information_schema.columns AS c
            WHERE
                c.table_schema = ft.foreign_table_schema
                AND c.table_name = ft.foreign_table_name
        ) AS column_count,
        0 AS index_count
    FROM information_schema.foreign_tables AS ft
    WHERE ft.foreign_table_schema = '{schema_name}'
)
ORDER BY size_bytes DESC;

```

--------------------------------------------------------------------------------
/supabase_mcp/tools/descriptions/sdk_tools.yaml:
--------------------------------------------------------------------------------

```yaml
# Python SDK tools descriptions

get_auth_admin_methods_spec: |
  Get Python SDK methods specification for Auth Admin.

  Returns a comprehensive dictionary of all Auth Admin methods available in the Supabase Python SDK, including:
  - Method names and descriptions
  - Required and optional parameters for each method
  - Parameter types and constraints
  - Return value information

  This tool is useful for exploring the capabilities of the Auth Admin SDK and understanding
  how to properly format parameters for the call_auth_admin_method tool.

  No parameters required.

call_auth_admin_method: |
  Call an Auth Admin method from Supabase Python SDK.

  This tool provides a safe, validated interface to the Supabase Auth Admin SDK, allowing you to:
  - Manage users (create, update, delete)
  - List and search users
  - Generate authentication links
  - Manage multi-factor authentication
  - And more

  IMPORTANT NOTES:
  - Request bodies must adhere to the Python SDK specification
  - Some methods may have nested parameter structures
  - The tool validates all parameters against Pydantic models
  - Extra fields not defined in the models will be rejected

  AVAILABLE METHODS:
  - get_user_by_id: Retrieve a user by their ID
  - list_users: List all users with pagination
  - create_user: Create a new user
  - delete_user: Delete a user by their ID
  - invite_user_by_email: Send an invite link to a user's email
  - generate_link: Generate an email link for various authentication purposes
  - update_user_by_id: Update user attributes by ID
  - delete_factor: Delete a factor on a user

  EXAMPLES:
  1. Get user by ID:
     method: "get_user_by_id"
     params: {"uid": "user-uuid-here"}

  2. Create user:
     method: "create_user"
     params: {
       "email": "[email protected]",
       "password": "secure-password"
     }

  3. Update user by ID:
     method: "update_user_by_id"
     params: {
       "uid": "user-uuid-here",
       "attributes": {
         "email": "[email protected]"
       }
     }

  For complete documentation of all methods and their parameters, use the get_auth_admin_methods_spec tool.

```

--------------------------------------------------------------------------------
/supabase_mcp/services/sdk/auth_admin_models.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Literal

from pydantic import BaseModel, model_validator


class GetUserByIdParams(BaseModel):
    uid: str


class ListUsersParams(BaseModel):
    page: int | None = 1
    per_page: int | None = 50


class UserMetadata(BaseModel):
    email: str | None = None
    email_verified: bool | None = None
    phone_verified: bool | None = None
    sub: str | None = None


class AdminUserAttributes(BaseModel):
    email: str | None = None
    password: str | None = None
    email_confirm: bool | None = False
    phone: str | None = None
    phone_confirm: bool | None = False
    user_metadata: UserMetadata | None = None
    app_metadata: dict[str, Any] | None = None
    role: str | None = None
    ban_duration: str | None = None
    nonce: str | None = None


class CreateUserParams(AdminUserAttributes):
    pass

    @model_validator(mode="after")
    def check_email_or_phone(self) -> "CreateUserParams":
        if not self.email and not self.phone:
            raise ValueError("Either email or phone must be provided")
        return self


class DeleteUserParams(BaseModel):
    id: str
    should_soft_delete: bool | None = False


class InviteUserByEmailParams(BaseModel):
    email: str
    options: dict[str, Any] | None = None


class GenerateLinkParams(BaseModel):
    type: Literal[
        "signup", "invite", "magiclink", "recovery", "email_change_current", "email_change_new", "phone_change"
    ]
    email: str
    password: str | None = None
    new_email: str | None = None
    options: dict[str, Any] | None = None

    @model_validator(mode="after")
    def validate_required_fields(self) -> "GenerateLinkParams":
        # Check password for signup
        if self.type == "signup" and not self.password:
            raise ValueError("Password is required for signup links")

        # Check new_email for email change
        if self.type in ["email_change_current", "email_change_new"] and not self.new_email:
            raise ValueError("new_email is required for email change links")

        return self


class UpdateUserByIdParams(BaseModel):
    uid: str
    attributes: AdminUserAttributes


class DeleteFactorParams(BaseModel):
    id: str
    user_id: str


# Map method names to their parameter models
PARAM_MODELS = {
    "get_user_by_id": GetUserByIdParams,
    "list_users": ListUsersParams,
    "create_user": CreateUserParams,
    "delete_user": DeleteUserParams,
    "invite_user_by_email": InviteUserByEmailParams,
    "generate_link": GenerateLinkParams,
    "update_user_by_id": UpdateUserByIdParams,
    "delete_factor": DeleteFactorParams,
}

```

--------------------------------------------------------------------------------
/supabase_mcp/clients/api_client.py:
--------------------------------------------------------------------------------

```python
import httpx
from pydantic import BaseModel

from supabase_mcp.clients.base_http_client import AsyncHTTPClient
from supabase_mcp.logger import logger
from supabase_mcp.settings import settings


class ApiRoutes:
    """Routes for the Query API"""

    FEATURES_ACCESS = "/features/{feature_name}/access"


class FeatureAccessRequest(BaseModel):
    """Request for feature access. Later can be extended with additional metadata."""

    feature_name: str


class FeatureAccessResponse(BaseModel):
    """Response for feature access. Later can be extended with additional metadata."""

    access_granted: bool


class ApiClient(AsyncHTTPClient):
    """Client for communicating with the Query API server for premium features.

    To preserve backwards compatibility and ensure a smooth UX for existing users,
    API key is not required as of now.
    """

    def __init__(
        self,
        query_api_key: str | None = None,
        query_api_url: str | None = None,
    ):
        """Initialize the Query API client"""
        self.query_api_key = query_api_key or settings.query_api_key
        self.query_api_url = query_api_url or settings.query_api_url
        self._check_api_key_set()
        self.client: httpx.AsyncClient | None = None
        logger.info(
            f"✔️ Query API client initialized successfully with URL: {self.query_api_url}, with key: {bool(self.query_api_key)}"
        )

    async def _ensure_client(self) -> httpx.AsyncClient:
        """Ensure client exists and is ready for use.

        Creates the client if it doesn't exist yet.
        Returns the client instance.
        """
        if self.client is None:
            logger.info("Creating new Query API client")
            self.client = httpx.AsyncClient(
                base_url=self.query_api_url,
                headers={"X-API-Key": f"{self.query_api_key}"},
                timeout=30.0,
            )
        logger.info("Returning existing Query API client")
        return self.client

    async def close(self) -> None:
        """Close the client and release resources."""
        if self.client:
            await self.client.aclose()
            logger.info("Query API client closed")

    def _check_api_key_set(self) -> None:
        """Check if the API key is set"""
        if not self.query_api_key:
            logger.warning("Query API key is not set. Only free features will be available.")
            return

    async def check_feature_access(self, feature_name: str) -> FeatureAccessResponse:
        """Check if the feature is available for the user"""

        try:
            result = await self.execute_request(
                method="GET",
                path=ApiRoutes.FEATURES_ACCESS.format(feature_name=feature_name),
            )
            logger.debug(f"Feature access response: {result}")
            return FeatureAccessResponse.model_validate(result)
        except Exception as e:
            logger.error(f"Error checking feature access: {e}")
            raise e

```

--------------------------------------------------------------------------------
/supabase_mcp/tools/manager.py:
--------------------------------------------------------------------------------

```python
from __future__ import annotations

from enum import Enum
from pathlib import Path

import yaml

from supabase_mcp.logger import logger


class ToolName(str, Enum):
    """Enum of all available tools in the Supabase MCP server."""

    # Database tools
    GET_SCHEMAS = "get_schemas"
    GET_TABLES = "get_tables"
    GET_TABLE_SCHEMA = "get_table_schema"
    EXECUTE_POSTGRESQL = "execute_postgresql"
    RETRIEVE_MIGRATIONS = "retrieve_migrations"

    # Safety tools
    LIVE_DANGEROUSLY = "live_dangerously"
    CONFIRM_DESTRUCTIVE_OPERATION = "confirm_destructive_operation"

    # Management API tools
    SEND_MANAGEMENT_API_REQUEST = "send_management_api_request"
    GET_MANAGEMENT_API_SPEC = "get_management_api_spec"

    # Auth Admin tools
    GET_AUTH_ADMIN_METHODS_SPEC = "get_auth_admin_methods_spec"
    CALL_AUTH_ADMIN_METHOD = "call_auth_admin_method"

    # Logs & Analytics tools
    RETRIEVE_LOGS = "retrieve_logs"


class ToolManager:
    """Manager for tool descriptions and registration.

    This class is responsible for loading tool descriptions from YAML files
    and providing them to the main application.
    """

    _instance: ToolManager | None = None  # Singleton instance

    def __init__(self) -> None:
        """Initialize the tool manager."""
        self.descriptions: dict[str, str] = {}
        self._load_descriptions()

    @classmethod
    def get_instance(cls) -> ToolManager:
        """Get or create the singleton instance of ToolManager."""
        if cls._instance is None:
            cls._instance = cls()
        return cls._instance

    @classmethod
    def reset(cls) -> None:
        """Reset the singleton instance of ToolManager."""
        if cls._instance is not None:
            cls._instance = None
            logger.info("ToolManager instance reset complete")

    def _load_descriptions(self) -> None:
        """Load tool descriptions from YAML files."""
        # Path to the descriptions directory
        descriptions_dir = Path(__file__).parent / "descriptions"

        # Check if the directory exists
        if not descriptions_dir.exists():
            raise FileNotFoundError(f"Tool descriptions directory not found: {descriptions_dir}")

        # Load all YAML files in the directory
        for yaml_file in descriptions_dir.glob("*.yaml"):
            try:
                with open(yaml_file) as f:
                    tool_descriptions = yaml.safe_load(f)
                    if tool_descriptions:
                        self.descriptions.update(tool_descriptions)
            except Exception as e:
                print(f"Error loading tool descriptions from {yaml_file}: {e}")

    def get_description(self, tool_name: str) -> str:
        """Get the description for a specific tool.

        Args:
            tool_name: The name of the tool to get the description for.

        Returns:
            The description of the tool, or an empty string if not found.
        """
        return self.descriptions.get(tool_name, "")

```

--------------------------------------------------------------------------------
/supabase_mcp/exceptions.py:
--------------------------------------------------------------------------------

```python
from typing import Any


class DatabaseError(Exception):
    """Base class for database-related errors."""

    pass


class ConnectionError(DatabaseError):
    """Raised when a database connection fails."""

    pass


class PermissionError(DatabaseError):
    """Raised when a database operation is not permitted."""

    pass


class QueryError(DatabaseError):
    """Raised when a database query fails."""

    pass


class TimeoutError(DatabaseError):
    """Raised when a database operation times out."""

    pass


class ValidationError(Exception):
    """Raised when validation fails."""

    pass


class SafetyError(Exception):
    """Raised when a safety check fails."""

    pass


class OperationNotAllowedError(SafetyError):
    """Raised when an operation is not allowed in the current safety mode."""

    pass


class ConfirmationRequiredError(SafetyError):
    """Raised when a user needs to confirm destructive SQL operation"""

    pass


class APIError(Exception):
    """Base class for API-related errors."""

    def __init__(
        self,
        message: str,
        status_code: int | None = None,
        response_body: dict[str, Any] | None = None,
    ):
        self.status_code = status_code
        self.response_body = response_body
        super().__init__(message)


class APIConnectionError(APIError):
    """Raised when an API connection fails."""

    pass


class PythonSDKError(Exception):
    """Raised when a Python SDK operation fails."""

    pass


class APIResponseError(APIError):
    """Raised when an API response is invalid."""

    pass


class APIClientError(APIError):
    """Raised when an API client error occurs."""

    pass


class APIServerError(APIError):
    """Raised when an API server error occurs."""

    pass


class UnexpectedError(APIError):
    """Raised when an unexpected error occurs."""

    pass


class FeatureAccessError(APIError):
    """Raised when a user does not have access to a premium feature."""

    def __init__(
        self,
        feature_name: str,
        status_code: int | None = None,
        response_body: dict[str, Any] | None = None,
    ):
        message = (
            f"This feature '{feature_name}' is available in our Pro plan. "
            f"Upgrade at https://thequery.dev to unlock advanced capabilities "
            f"and take your database experience to the next level!"
        )
        super().__init__(message, status_code, response_body)


class FeatureTemporaryError(APIError):
    """Raised when a feature check encounters a temporary error."""

    def __init__(
        self,
        feature_name: str,
        status_code: int | None = None,
        response_body: dict[str, Any] | None = None,
    ):
        message = (
            f"We couldn't verify access to '{feature_name}' at the moment. "
            f"Please try again in a few moments. If this persists, "
            f"check your connection or visit https://thequery.dev/status for updates."
        )
        super().__init__(message, status_code, response_body)

```

--------------------------------------------------------------------------------
/tests/services/safety/test_sql_safety_config.py:
--------------------------------------------------------------------------------

```python
"""
Unit tests for the SQLSafetyConfig class.

This file contains unit test cases for the SQLSafetyConfig class, which is responsible for
determining the risk level of SQL operations and whether they are allowed or require confirmation.
"""

from unittest.mock import MagicMock

import pytest

from supabase_mcp.services.safety.models import OperationRiskLevel, SafetyMode
from supabase_mcp.services.safety.safety_configs import SQLSafetyConfig


@pytest.mark.unit
class TestSQLSafetyConfig:
    """Unit tests for the SQLSafetyConfig class."""

    def test_get_risk_level(self):
        """Test that get_risk_level returns the highest_risk_level from the operation."""
        config = SQLSafetyConfig()

        # Create mock QueryValidationResults objects with different risk levels
        low_risk_op = MagicMock()
        low_risk_op.highest_risk_level = OperationRiskLevel.LOW

        medium_risk_op = MagicMock()
        medium_risk_op.highest_risk_level = OperationRiskLevel.MEDIUM

        high_risk_op = MagicMock()
        high_risk_op.highest_risk_level = OperationRiskLevel.HIGH

        extreme_risk_op = MagicMock()
        extreme_risk_op.highest_risk_level = OperationRiskLevel.EXTREME

        # Test that the risk level is correctly returned
        assert config.get_risk_level(low_risk_op) == OperationRiskLevel.LOW
        assert config.get_risk_level(medium_risk_op) == OperationRiskLevel.MEDIUM
        assert config.get_risk_level(high_risk_op) == OperationRiskLevel.HIGH
        assert config.get_risk_level(extreme_risk_op) == OperationRiskLevel.EXTREME

    def test_is_operation_allowed(self):
        """Test if operations are allowed based on risk level and safety mode.

        This tests the behavior inherited from SafetyConfigBase.
        """
        config = SQLSafetyConfig()

        # Low risk operations should be allowed in both safe and unsafe modes
        assert config.is_operation_allowed(OperationRiskLevel.LOW, SafetyMode.SAFE) is True
        assert config.is_operation_allowed(OperationRiskLevel.LOW, SafetyMode.UNSAFE) is True

        # Medium/high risk operations should only be allowed in unsafe mode
        assert config.is_operation_allowed(OperationRiskLevel.MEDIUM, SafetyMode.SAFE) is False
        assert config.is_operation_allowed(OperationRiskLevel.MEDIUM, SafetyMode.UNSAFE) is True
        assert config.is_operation_allowed(OperationRiskLevel.HIGH, SafetyMode.SAFE) is False
        assert config.is_operation_allowed(OperationRiskLevel.HIGH, SafetyMode.UNSAFE) is True

        # Extreme risk operations are never allowed
        assert config.is_operation_allowed(OperationRiskLevel.EXTREME, SafetyMode.SAFE) is False
        assert config.is_operation_allowed(OperationRiskLevel.EXTREME, SafetyMode.UNSAFE) is False

    def test_needs_confirmation(self):
        """Test if operations need confirmation based on risk level.

        This tests the behavior inherited from SafetyConfigBase.
        """
        config = SQLSafetyConfig()

        # Low and medium risk operations should not need confirmation
        assert config.needs_confirmation(OperationRiskLevel.LOW) is False
        assert config.needs_confirmation(OperationRiskLevel.MEDIUM) is False

        # High risk operations should need confirmation
        assert config.needs_confirmation(OperationRiskLevel.HIGH) is True

        # Extreme risk operations should need confirmation
        assert config.needs_confirmation(OperationRiskLevel.EXTREME) is True

```

--------------------------------------------------------------------------------
/supabase_mcp/tools/descriptions/logs_and_analytics_tools.yaml:
--------------------------------------------------------------------------------

```yaml
# Tools related to logs and Advisor analytics

retrieve_logs: |
  Retrieve logs from your Supabase project's services for debugging and monitoring.

  Returns log entries from various Supabase services with timestamps, messages, and metadata.
  This tool provides access to the same logs available in the Supabase dashboard's Logs & Analytics section.

  AVAILABLE LOG COLLECTIONS:
  - postgres: Database server logs including queries, errors, warnings, and system messages
  - api_gateway: API requests, responses, and errors processed by the Kong API gateway
  - auth: Authentication and authorization logs for sign-ups, logins, and token operations
  - postgrest: Logs from the RESTful API service that exposes your PostgreSQL database
  - pooler: Connection pooling logs from pgbouncer and supavisor services
  - storage: Object storage service logs for file uploads, downloads, and permissions
  - realtime: Logs from the real-time subscription service for WebSocket connections
  - edge_functions: Serverless function execution logs including invocations and errors
  - cron: Scheduled job logs (can be queried through postgres logs with specific filters)
  - pgbouncer: Connection pooler logs

  PARAMETERS:
  - collection: The log collection to query (required, one of the values listed above)
  - limit: Maximum number of log entries to return (default: 20)
  - hours_ago: Retrieve logs from the last N hours (default: 1)
  - filters: List of filter objects with field, operator, and value (default: [])
    Format: [{"field": "field_name", "operator": "=", "value": "value"}]
  - search: Text to search for in event messages (default: "")
  - custom_query: Complete custom SQL query to execute instead of the pre-built queries (default: "")

  HOW IT WORKS:
  This tool makes a request to the Supabase Management API endpoint for logs, sending
  either a pre-built optimized query for the selected collection or your custom query.
  Each log collection has a specific table structure and metadata format that requires
  appropriate CROSS JOIN UNNEST operations to access nested fields.

  EXAMPLES:
  1. Using pre-built parameters:
     collection: "postgres"
     limit: 20
     hours_ago: 24
     filters: [{"field": "parsed.error_severity", "operator": "=", "value": "ERROR"}]
     search: "connection"

  2. Using a custom query:
     collection: "edge_functions"
     custom_query: "SELECT id, timestamp, event_message, m.function_id, m.execution_time_ms
                   FROM function_edge_logs
                   CROSS JOIN unnest(metadata) AS m
                   WHERE m.execution_time_ms > 1000
                   ORDER BY timestamp DESC LIMIT 10"

  METADATA STRUCTURE:
  The metadata structure is important because it determines how to access nested fields in filters:
  - postgres_logs: Use "parsed.field_name" for fields like error_severity, query, application_name
  - edge_logs: Use "request.field_name" or "response.field_name" for HTTP details
  - function_edge_logs: Use "function_id", "execution_time_ms" for function metrics

  NOTE FOR LLM CLIENTS:
  When encountering errors with field access, examine the error message to see what fields are
  actually available in the structure. Start with basic fields before accessing nested metadata.

  SAFETY CONSIDERATIONS:
  - This is a low-risk read operation that can be executed in SAFE mode
  - Requires a valid Supabase Personal Access Token to be configured
  - Not available for local Supabase instances (requires cloud deployment)


retrieve_advisor_analytics: |
  Get advisor analytics from the database.

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/models.py:
--------------------------------------------------------------------------------

```python
from enum import Enum

from pydantic import BaseModel, Field

from supabase_mcp.services.safety.models import OperationRiskLevel


class SQLQueryCategory(str, Enum):
    """Category of the SQL query tracked by the SQL validator"""

    DQL = "DQL"  # Data Query Language (SELECT)
    DML = "DML"  # Data Manipulation Language (INSERT, UPDATE, DELETE)
    DDL = "DDL"  # Data Definition Language (CREATE, ALTER, DROP)
    TCL = "TCL"  # Transaction Control Language (BEGIN, COMMIT, ROLLBACK)
    DCL = "DCL"  # Data Control Language (GRANT, REVOKE)
    POSTGRES_SPECIFIC = "POSTGRES_SPECIFIC"  # PostgreSQL-specific commands
    OTHER = "OTHER"  # Other commands not fitting into the categories above


class SQLQueryCommand(str, Enum):
    """Command of the SQL query tracked by the SQL validator"""

    # DQL Commands
    SELECT = "SELECT"

    # DML Commands
    INSERT = "INSERT"
    UPDATE = "UPDATE"
    DELETE = "DELETE"
    MERGE = "MERGE"

    # DDL Commands
    CREATE = "CREATE"
    ALTER = "ALTER"
    DROP = "DROP"
    TRUNCATE = "TRUNCATE"
    COMMENT = "COMMENT"
    RENAME = "RENAME"

    # DCL Commands
    GRANT = "GRANT"
    REVOKE = "REVOKE"

    # TCL Commands (for tracking, not query types)
    BEGIN = "BEGIN"
    COMMIT = "COMMIT"
    ROLLBACK = "ROLLBACK"
    SAVEPOINT = "SAVEPOINT"

    # PostgreSQL-specific Commands
    VACUUM = "VACUUM"
    ANALYZE = "ANALYZE"
    EXPLAIN = "EXPLAIN"
    COPY = "COPY"
    LISTEN = "LISTEN"
    NOTIFY = "NOTIFY"
    PREPARE = "PREPARE"
    EXECUTE = "EXECUTE"
    DEALLOCATE = "DEALLOCATE"

    # Other/Unknown
    UNKNOWN = "UNKNOWN"


class ValidatedStatement(BaseModel):
    """Result of the query validation for a single SQL statement."""

    category: SQLQueryCategory = Field(
        ..., description="The category of SQL statement (DQL, DML, DDL, etc.) derived from pglast parse tree"
    )
    risk_level: OperationRiskLevel = Field(
        ..., description="The risk level associated with this statement based on category and command"
    )
    command: SQLQueryCommand = Field(
        ..., description="The specific SQL command (SELECT, INSERT, CREATE, etc.) extracted from parse tree"
    )
    object_type: str | None = Field(
        None, description="The type of object being operated on (TABLE, INDEX, etc.) when available"
    )
    schema_name: str | None = Field(None, description="The schema name for the objects in the statement when available")
    needs_migration: bool = Field(
        ..., description="Whether this statement requires a migration based on statement type and safety rules"
    )
    query: str | None = Field(None, description="The actual SQL text for this statement extracted from original query")


class QueryValidationResults(BaseModel):
    """Result of the batch validation for one or more SQL statements."""

    statements: list[ValidatedStatement] = Field(
        default_factory=list, description="List of validated statements from the query built during validation"
    )
    highest_risk_level: OperationRiskLevel = Field(
        default=OperationRiskLevel.LOW, description="The highest risk level among all statements in the batch"
    )
    has_transaction_control: bool = Field(
        default=False, description="Whether the query contains transaction control statements (BEGIN, COMMIT, etc.)"
    )
    original_query: str = Field(..., description="The original SQL query text as provided by the user")

    def needs_migration(self) -> bool:
        """Check if any statement in the batch needs migration."""
        return any(stmt.needs_migration for stmt in self.statements)

```

--------------------------------------------------------------------------------
/supabase_mcp/tools/descriptions/safety_tools.yaml:
--------------------------------------------------------------------------------

```yaml
# Safety tools descriptions

live_dangerously: |
  Toggle unsafe mode for either Management API or Database operations.

  WHAT THIS TOOL DOES:
  This tool switches between safe (default) and unsafe operation modes for either the Management API or Database operations.

  SAFETY MODES EXPLAINED:
  1. Database Safety Modes:
     - SAFE mode (default): Only low-risk operations like SELECT queries are allowed
     - UNSAFE mode: Higher-risk operations including INSERT, UPDATE, DELETE, and schema changes are permitted

  2. API Safety Modes:
     - SAFE mode (default): Only low-risk operations that don't modify state are allowed
     - UNSAFE mode: Higher-risk state-changing operations are permitted (except those explicitly blocked for safety)

  OPERATION RISK LEVELS:
  The system categorizes operations by risk level:
  - LOW: Safe read operations with minimal impact
  - MEDIUM: Write operations that modify data but don't change structure
  - HIGH: Operations that modify database structure or important system settings
  - EXTREME: Destructive operations that could cause data loss or service disruption

  WHEN TO USE THIS TOOL:
  - Use this tool BEFORE attempting write operations or schema changes
  - Enable unsafe mode only when you need to perform data modifications
  - Always return to safe mode after completing write operations

  USAGE GUIDELINES:
  - Start in safe mode by default for exploration and analysis
  - Switch to unsafe mode only when you need to make changes
  - Be specific about which service you're enabling unsafe mode for
  - Consider the risks before enabling unsafe mode, especially for database operations
  - For database operations requiring schema changes, you'll need to enable unsafe mode first

  Parameters:
  - service: Which service to toggle ("api" or "database")
  - enable_unsafe_mode: True to enable unsafe mode, False for safe mode (default: False)

  Examples:
  1. Enable database unsafe mode:
     live_dangerously(service="database", enable_unsafe_mode=True)

  2. Return to safe mode after operations:
     live_dangerously(service="database", enable_unsafe_mode=False)

  3. Enable API unsafe mode:
     live_dangerously(service="api", enable_unsafe_mode=True)

  Note: This tool affects ALL subsequent operations for the specified service until changed again.

confirm_destructive_operation: |
  Execute a destructive database or API operation after confirmation. Use this only after reviewing the risks with the user.

  HOW IT WORKS:
  - This tool executes a previously rejected high-risk operation using its confirmation ID
  - The operation will be exactly the same as the one that generated the ID
  - No need to retype the query or api request params - the system remembers it

  STEPS:
  1. Explain the risks to the user and get their approval
  2. Use this tool with the confirmation ID from the error message
  3. The original query will be executed as-is

  PARAMETERS:
  - operation_type: Type of operation ("api" or "database")
  - confirmation_id: The ID provided in the error message (required)
  - user_confirmation: Set to true to confirm execution (default: false)

  NOTE: Confirmation IDs expire after 5 minutes for security

get_management_api_safety_rules: |
  Get all safety rules for the Supabase Management API.

  Returns a comprehensive overview of all safety rules applied to the Management API, including:
  - Blocked operations (never allowed)
  - Unsafe operations (allowed only in unsafe mode)
  - Safe operations (always allowed)

  Each rule includes:
  - The HTTP method and path pattern
  - A human-readable explanation of why the operation has its safety designation
  - The safety level assigned to the operation

  This information helps you understand which operations require unsafe mode and why certain operations might be completely blocked for safety reasons.

```

--------------------------------------------------------------------------------
/supabase_mcp/core/container.py:
--------------------------------------------------------------------------------

```python
from __future__ import annotations

from mcp.server.fastmcp import FastMCP

from supabase_mcp.clients.api_client import ApiClient
from supabase_mcp.clients.management_client import ManagementAPIClient
from supabase_mcp.clients.sdk_client import SupabaseSDKClient
from supabase_mcp.core.feature_manager import FeatureManager
from supabase_mcp.logger import logger
from supabase_mcp.services.api.api_manager import SupabaseApiManager
from supabase_mcp.services.database.postgres_client import PostgresClient
from supabase_mcp.services.database.query_manager import QueryManager
from supabase_mcp.services.logs.log_manager import LogManager
from supabase_mcp.services.safety.safety_manager import SafetyManager
from supabase_mcp.settings import Settings
from supabase_mcp.tools import ToolManager


class ServicesContainer:
    """Container for all services"""

    _instance: ServicesContainer | None = None

    def __init__(
        self,
        mcp_server: FastMCP | None = None,
        postgres_client: PostgresClient | None = None,
        api_client: ManagementAPIClient | None = None,
        sdk_client: SupabaseSDKClient | None = None,
        api_manager: SupabaseApiManager | None = None,
        safety_manager: SafetyManager | None = None,
        query_manager: QueryManager | None = None,
        tool_manager: ToolManager | None = None,
        log_manager: LogManager | None = None,
        query_api_client: ApiClient | None = None,
        feature_manager: FeatureManager | None = None,
    ) -> None:
        """Create a new container container reference"""
        self.postgres_client = postgres_client
        self.api_client = api_client
        self.api_manager = api_manager
        self.sdk_client = sdk_client
        self.safety_manager = safety_manager
        self.query_manager = query_manager
        self.tool_manager = tool_manager
        self.log_manager = log_manager
        self.query_api_client = query_api_client
        self.feature_manager = feature_manager
        self.mcp_server = mcp_server

    @classmethod
    def get_instance(cls) -> ServicesContainer:
        """Get the singleton instance of the container"""
        if cls._instance is None:
            cls._instance = cls()
        return cls._instance

    def initialize_services(self, settings: Settings) -> None:
        """Initializes all services in a synchronous manner to satisfy MCP runtime requirements"""
        # Create clients
        self.postgres_client = PostgresClient.get_instance(settings=settings)
        self.api_client = ManagementAPIClient(settings=settings)  # not a singleton, simple
        self.sdk_client = SupabaseSDKClient.get_instance(settings=settings)

        # Create managers
        self.safety_manager = SafetyManager.get_instance()
        self.api_manager = SupabaseApiManager.get_instance(
            api_client=self.api_client,
            safety_manager=self.safety_manager,
        )
        self.query_manager = QueryManager(
            postgres_client=self.postgres_client,
            safety_manager=self.safety_manager,
        )
        self.tool_manager = ToolManager.get_instance()

        # Register safety configs
        self.safety_manager.register_safety_configs()

        # Create query api client
        self.query_api_client = ApiClient()
        self.feature_manager = FeatureManager(self.query_api_client)

        logger.info("✓ All services initialized successfully.")

    async def shutdown_services(self) -> None:
        """Properly close all relevant clients and connections"""
        # Postgres client
        if self.postgres_client:
            await self.postgres_client.close()

        # API clients
        if self.query_api_client:
            await self.query_api_client.close()

        if self.api_client:
            await self.api_client.close()

        # SDK client
        if self.sdk_client:
            await self.sdk_client.close()

```

--------------------------------------------------------------------------------
/tests/services/database/sql/test_loader.py:
--------------------------------------------------------------------------------

```python
import os
from pathlib import Path
from unittest.mock import mock_open, patch

import pytest

from supabase_mcp.services.database.sql.loader import SQLLoader


@pytest.mark.unit
class TestSQLLoader:
    """Unit tests for the SQLLoader class."""

    def test_load_sql_with_extension(self):
        """Test loading SQL with file extension provided."""
        mock_sql = "SELECT * FROM test;"

        with patch("builtins.open", mock_open(read_data=mock_sql)):
            with patch.object(Path, "exists", return_value=True):
                result = SQLLoader.load_sql("test.sql")

        assert result == mock_sql

    def test_load_sql_without_extension(self):
        """Test loading SQL without file extension provided."""
        mock_sql = "SELECT * FROM test;"

        with patch("builtins.open", mock_open(read_data=mock_sql)):
            with patch.object(Path, "exists", return_value=True):
                result = SQLLoader.load_sql("test")

        assert result == mock_sql

    def test_load_sql_file_not_found(self):
        """Test loading SQL when file doesn't exist."""
        with patch.object(Path, "exists", return_value=False):
            with pytest.raises(FileNotFoundError):
                SQLLoader.load_sql("nonexistent")

    def test_get_schemas_query(self):
        """Test getting schemas query."""
        mock_sql = "SELECT * FROM schemas;"

        with patch.object(SQLLoader, "load_sql", return_value=mock_sql):
            result = SQLLoader.get_schemas_query()

        assert result == mock_sql

    def test_get_tables_query(self):
        """Test getting tables query with schema replacement."""
        mock_sql = "SELECT * FROM {schema_name}.tables;"
        expected = "SELECT * FROM test_schema.tables;"

        with patch.object(SQLLoader, "load_sql", return_value=mock_sql):
            result = SQLLoader.get_tables_query("test_schema")

        assert result == expected

    def test_get_table_schema_query(self):
        """Test getting table schema query with replacements."""
        mock_sql = "SELECT * FROM {schema_name}.{table};"
        expected = "SELECT * FROM test_schema.test_table;"

        with patch.object(SQLLoader, "load_sql", return_value=mock_sql):
            result = SQLLoader.get_table_schema_query("test_schema", "test_table")

        assert result == expected

    def test_get_migrations_query(self):
        """Test getting migrations query with all parameters."""
        mock_sql = "SELECT * FROM migrations WHERE name LIKE '%{name_pattern}%' LIMIT {limit} OFFSET {offset} AND include_queries = {include_full_queries};"
        expected = "SELECT * FROM migrations WHERE name LIKE '%test%' LIMIT 10 OFFSET 5 AND include_queries = true;"

        with patch.object(SQLLoader, "load_sql", return_value=mock_sql):
            result = SQLLoader.get_migrations_query(limit=10, offset=5, name_pattern="test", include_full_queries=True)

        assert result == expected

    def test_get_init_migrations_query(self):
        """Test getting init migrations query."""
        mock_sql = "CREATE SCHEMA IF NOT EXISTS migrations;"

        with patch.object(SQLLoader, "load_sql", return_value=mock_sql):
            result = SQLLoader.get_init_migrations_query()

        assert result == mock_sql

    def test_get_create_migration_query(self):
        """Test getting create migration query with replacements."""
        mock_sql = "INSERT INTO migrations VALUES ('{version}', '{name}', ARRAY['{statements}']);"
        expected = "INSERT INTO migrations VALUES ('20230101', 'test_migration', ARRAY['SELECT 1;']);"

        with patch.object(SQLLoader, "load_sql", return_value=mock_sql):
            result = SQLLoader.get_create_migration_query(
                version="20230101", name="test_migration", statements="SELECT 1;"
            )

        assert result == expected

    def test_sql_dir_path(self):
        """Test that SQL_DIR points to the correct location."""
        expected_path = Path(SQLLoader.__module__.replace(".", os.sep)).parent / "queries"
        assert str(SQLLoader.SQL_DIR).endswith(str(expected_path))

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[build-system]
requires = ["hatchling", "hatch-vcs"]
build-backend = "hatchling.build"

[project]
name = "supabase-mcp-server"
dynamic = ["version"]
description = "Community Supabase MCP server that enables Cursor and Windsurf to end-to-end manage your Supabase project, execute SQL queries, and more."
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
    "asyncpg>=0.30.0",
    "logfire[system-metrics]>=3.12.0",
    "mcp[cli]>=1.4.1",
    "pglast>=7.3",
    "pyyaml>=6.0.2",
    "supabase>=2.13.0",
    "tenacity>=9.0.0",
]
authors = [
    {name = "Alexander Zuev", email = "[email protected]"}
]
keywords = ["supabase", "mcp", "cursor", "windsurf", "model-context-protocol", "claude", "cline"]
license = "Apache-2.0"
classifiers = [
    "Development Status :: 4 - Beta",
    "Intended Audience :: Developers",
    "License :: OSI Approved :: Apache Software License",
    "Programming Language :: Python :: 3.12",
    "Topic :: Software Development :: Libraries :: Python Modules",
    "Topic :: Database :: Database Engines/Servers",
]

[project.urls]
Homepage = "https://github.com/alexander-zuev/supabase-mcp-server"
Repository = "https://github.com/alexander-zuev/supabase-mcp-server.git"
Changelog = "https://github.com/alexander-zuev/supabase-mcp-server/blob/main/CHANGELOG.MD"
Documentation = "https://github.com/alexander-zuev/supabase-mcp-server#readme"



[tool.hatch.build.targets.wheel]
packages = ["supabase_mcp"]

[tool.uv]
package = true

[tool.hatch.version]
source = "vcs"
raw-options = { version_scheme = "no-guess-dev" }

[tool.hatch.build.hooks.vcs]
version-file = "supabase_mcp/_version.py"

[project.scripts]
supabase-mcp-server = "supabase_mcp.main:run_server"
supabase-mcp-inspector = "supabase_mcp.main:run_inspector"


# Configure PyPI publishing
[[tool.uv.index]]
name = "pypi"
url = "https://pypi.org/simple/"
publish-url = "https://upload.pypi.org/legacy/"

[tool.ruff]
target-version = "py312"
line-length = 120
select = [
    "E",   # pycodestyle errors
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "I",   # isort
    "B",   # flake8-bugbear
    "C4",  # flake8-comprehensions
    "UP",  # pyupgrade
]
ignore = []

[tool.ruff.format]
quote-style = "double"
indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "auto"


[tool.mypy]
python_version = "3.12"
strict = true
ignore_missing_imports = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true
warn_return_any = true
warn_unreachable = true

# Relaxed rules for tests
[[tool.mypy.overrides]]
module = "tests.*"
disallow_untyped_defs = false
disallow_incomplete_defs = false
disallow_untyped_decorators = false
check_untyped_defs = false
warn_return_any = false
warn_unreachable = false

[tool.pytest]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "-v --no-header --tb=short"


[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "module"

markers = [
    "unit: marks a test as a unit test",
    "integration: marks a test as an integration test that requires database access"
]

[dependency-groups]
dev = [
    "asyncpg-stubs>=0.30.0",
    "mypy>=1.15.0",
    "pytest>=8.3.4",
    "pytest-asyncio>=0.25.3",
    "pytest-cov>=6.0.0",
    "pytest-mock>=3.14.0",
    "ruff>=0.9.9",
    "sqlfluff>=3.3.1",
]


[tool.sqlfluff.core]
dialect = "postgres"
templater = "jinja"
max_line_length = 120

[tool.sqlfluff.indentation]
tab_space_size = 4

[tool.sqlfluff.rules]
exclude_rules = [
    "L016",  # Line length rules
    "L031",  # Table aliasing
    "L034",  # Column order in GROUP BY
    "L036",  # Select targets should be on a new line
    "L037",  # Ambiguous ordering directions
    "L042",  # Join condition required
    "L047",  # DISTINCT used with parentheses
    "LT02",  # Layout indent
    "LT12",  # Files must end with a single trailing newline
    "LT14",  # Keyword newline
    "AL01",  # Aliasing of table
    "AM05",  # Join clauses should be fully qualified
    "ST09",  # Join order
    "CP03"   # Function name capitalization
]

```

--------------------------------------------------------------------------------
/supabase_mcp/services/logs/log_manager.py:
--------------------------------------------------------------------------------

```python
from typing import Any

from supabase_mcp.logger import logger
from supabase_mcp.services.database.sql.loader import SQLLoader


class LogManager:
    """Manager for retrieving logs from Supabase services."""

    # Map collection names to table names
    COLLECTION_TO_TABLE = {
        "postgres": "postgres_logs",
        "api_gateway": "edge_logs",
        "auth": "auth_logs",
        "postgrest": "postgrest_logs",
        "pooler": "supavisor_logs",
        "storage": "storage_logs",
        "realtime": "realtime_logs",
        "edge_functions": "function_edge_logs",
        "cron": "postgres_logs",
        "pgbouncer": "pgbouncer_logs",
    }

    def __init__(self) -> None:
        """Initialize the LogManager."""
        self.sql_loader = SQLLoader()

    def _build_where_clause(
        self,
        collection: str,
        hours_ago: int | None = None,
        filters: list[dict[str, Any]] | None = None,
        search: str | None = None,
    ) -> str:
        """Build the WHERE clause for a log query.

        Args:
            collection: The log collection name
            hours_ago: Number of hours to look back
            filters: List of filter objects with field, operator, and value
            search: Text to search for in event messages

        Returns:
            The WHERE clause as a string
        """
        logger.debug(
            f"Building WHERE clause for collection={collection}, hours_ago={hours_ago}, filters={filters}, search={search}"
        )

        clauses = []

        # Get the table name for this collection
        table_name = self.COLLECTION_TO_TABLE.get(collection, collection)

        # Add time filter using BigQuery's TIMESTAMP_SUB function
        if hours_ago:
            # Qualify the timestamp column with the table name to avoid ambiguity
            clauses.append(f"{table_name}.timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours_ago} HOUR)")

        # Add search filter
        if search:
            # Escape single quotes in search text
            search_escaped = search.replace("'", "''")
            clauses.append(f"event_message LIKE '%{search_escaped}%'")

        # Add custom filters
        if filters:
            for filter_obj in filters:
                field = filter_obj["field"]
                operator = filter_obj["operator"]
                value = filter_obj["value"]

                # Handle string values
                if isinstance(value, str) and not value.isdigit():
                    value = f"'{value.replace("'", "''")}'"

                clauses.append(f"{field} {operator} {value}")

        # For cron logs, we already have a WHERE clause in the template
        if collection == "cron":
            if clauses:
                where_clause = f"AND {' AND '.join(clauses)}"
            else:
                where_clause = ""
        else:
            if clauses:
                where_clause = f"WHERE {' AND '.join(clauses)}"
            else:
                where_clause = ""

        logger.debug(f"Built WHERE clause: {where_clause}")
        return where_clause

    def build_logs_query(
        self,
        collection: str,
        limit: int = 20,
        hours_ago: int | None = 1,
        filters: list[dict[str, Any]] | None = None,
        search: str | None = None,
        custom_query: str | None = None,
    ) -> str:
        """Build a query for retrieving logs from a Supabase service.

        Args:
            collection: The log collection to query
            limit: Maximum number of log entries to return
            hours_ago: Retrieve logs from the last N hours
            filters: List of filter objects with field, operator, and value
            search: Text to search for in event messages
            custom_query: Complete custom SQL query to execute

        Returns:
            The SQL query string

        Raises:
            ValueError: If the collection is unknown
        """
        if custom_query:
            return custom_query

        # Build the WHERE clause
        where_clause = self._build_where_clause(
            collection=collection, hours_ago=hours_ago, filters=filters, search=search
        )

        # Get the SQL query
        return self.sql_loader.get_logs_query(collection=collection, where_clause=where_clause, limit=limit)

```

--------------------------------------------------------------------------------
/tests/services/database/sql/conftest.py:
--------------------------------------------------------------------------------

```python
import pytest


@pytest.fixture
def sample_dql_queries() -> dict[str, str]:
    """Sample DQL (SELECT) queries for testing."""
    return {
        "simple_select": "SELECT * FROM users",
        "select_with_where": "SELECT id, name FROM users WHERE age > 18",
        "select_with_join": "SELECT u.id, p.title FROM users u JOIN posts p ON u.id = p.user_id",
        "select_with_subquery": "SELECT * FROM users WHERE id IN (SELECT user_id FROM posts)",
        "select_with_cte": "WITH active_users AS (SELECT * FROM users WHERE active = true) SELECT * FROM active_users",
    }


@pytest.fixture
def sample_dml_queries() -> dict[str, str]:
    """Sample DML (INSERT, UPDATE, DELETE) queries for testing."""
    return {
        "simple_insert": "INSERT INTO users (name, email) VALUES ('John', '[email protected]')",
        "insert_with_select": "INSERT INTO user_backup SELECT * FROM users",
        "simple_update": "UPDATE users SET active = true WHERE id = 1",
        "simple_delete": "DELETE FROM users WHERE id = 1",
        "merge_statement": "MERGE INTO users u USING temp_users t ON (u.id = t.id) WHEN MATCHED THEN UPDATE SET name = t.name",
    }


@pytest.fixture
def sample_ddl_queries() -> dict[str, str]:
    """Sample DDL (CREATE, ALTER, DROP) queries for testing."""
    return {
        "create_table": "CREATE TABLE users (id SERIAL PRIMARY KEY, name TEXT, email TEXT UNIQUE)",
        "alter_table": "ALTER TABLE users ADD COLUMN active BOOLEAN DEFAULT false",
        "drop_table": "DROP TABLE users",
        "truncate_table": "TRUNCATE TABLE users",
        "create_index": "CREATE INDEX idx_user_email ON users (email)",
    }


@pytest.fixture
def sample_dcl_queries() -> dict[str, str]:
    """Sample DCL (GRANT, REVOKE) queries for testing."""
    return {
        "grant_select": "GRANT SELECT ON users TO read_role",
        "grant_all": "GRANT ALL PRIVILEGES ON users TO admin_role",
        "revoke_select": "REVOKE SELECT ON users FROM read_role",
        "create_role": "CREATE ROLE read_role",
        "drop_role": "DROP ROLE read_role",
    }


@pytest.fixture
def sample_tcl_queries() -> dict[str, str]:
    """Sample TCL (BEGIN, COMMIT, ROLLBACK) queries for testing."""
    return {
        "begin_transaction": "BEGIN",
        "commit_transaction": "COMMIT",
        "rollback_transaction": "ROLLBACK",
        "savepoint": "SAVEPOINT my_savepoint",
        "mixed_case_transaction": "Begin Transaction",
    }


@pytest.fixture
def sample_postgres_specific_queries() -> dict[str, str]:
    """Sample PostgreSQL-specific queries for testing."""
    return {
        "vacuum": "VACUUM users",
        "analyze": "ANALYZE users",
        "copy_to": "COPY users TO '/tmp/users.csv' WITH CSV",
        "copy_from": "COPY users FROM '/tmp/users.csv' WITH CSV",
        "explain": "EXPLAIN ANALYZE SELECT * FROM users",
    }


@pytest.fixture
def sample_invalid_queries() -> dict[str, str]:
    """Sample invalid SQL queries for testing error handling."""
    return {
        "syntax_error": "SELECT * FORM users",
        "missing_parenthesis": "SELECT * FROM users WHERE id IN (1, 2, 3",
        "invalid_column": "SELECT nonexistent_column FROM users",
        "incomplete_statement": "SELECT * FROM",
        "invalid_table": "SELECT * FROM nonexistent_table",
    }


@pytest.fixture
def sample_multiple_statements() -> dict[str, str]:
    """Sample SQL with multiple statements for testing batch processing."""
    return {
        "multiple_safe": "SELECT * FROM users; SELECT * FROM posts;",
        "safe_and_write": "SELECT * FROM users; INSERT INTO logs (message) VALUES ('queried users');",
        "write_and_destructive": "INSERT INTO logs (message) VALUES ('dropping users'); DROP TABLE users;",
        "with_transaction": "BEGIN; INSERT INTO users (name) VALUES ('John'); COMMIT;",
        "mixed_categories": "SELECT * FROM users; UPDATE users SET active = true; DROP TABLE old_users;",
    }


@pytest.fixture
def sample_edge_cases() -> dict[str, str]:
    """Sample edge cases for testing."""
    return {
        "with_comments": "SELECT * FROM users; -- This is a comment\n/* Multi-line\ncomment */",
        "quoted_identifiers": 'SELECT * FROM "user table" WHERE "first name" = \'John\'',
        "special_characters": "SELECT * FROM users WHERE name LIKE 'O''Brien%'",
        "schema_qualified": "SELECT * FROM public.users",
        "with_dollar_quotes": "SELECT $$This is a dollar-quoted string with 'quotes'$$ AS message",
    }

```

--------------------------------------------------------------------------------
/supabase_mcp/services/database/sql/loader.py:
--------------------------------------------------------------------------------

```python
from pathlib import Path

from supabase_mcp.logger import logger


class SQLLoader:
    """Responsible for loading SQL queries from files."""

    # Path to SQL files directory
    SQL_DIR = Path(__file__).parent / "queries"

    @classmethod
    def load_sql(cls, filename: str) -> str:
        """
        Load SQL from a file in the sql directory.

        Args:
            filename: Name of the SQL file (with or without .sql extension)

        Returns:
            str: The SQL query from the file

        Raises:
            FileNotFoundError: If the SQL file doesn't exist
        """
        # Ensure the filename has .sql extension
        if not filename.endswith(".sql"):
            filename = f"{filename}.sql"

        file_path = cls.SQL_DIR / filename

        if not file_path.exists():
            logger.error(f"SQL file not found: {file_path}")
            raise FileNotFoundError(f"SQL file not found: {file_path}")

        with open(file_path) as f:
            sql = f.read().strip()
            logger.debug(f"Loaded SQL file: {filename} ({len(sql)} chars)")
            return sql

    @classmethod
    def get_schemas_query(cls) -> str:
        """Get a query to list all schemas."""
        return cls.load_sql("get_schemas")

    @classmethod
    def get_tables_query(cls, schema_name: str) -> str:
        """Get a query to list all tables in a schema."""
        query = cls.load_sql("get_tables")
        return query.replace("{schema_name}", schema_name)

    @classmethod
    def get_table_schema_query(cls, schema_name: str, table: str) -> str:
        """Get a query to get the schema of a table."""
        query = cls.load_sql("get_table_schema")
        return query.replace("{schema_name}", schema_name).replace("{table}", table)

    @classmethod
    def get_migrations_query(
        cls, limit: int = 50, offset: int = 0, name_pattern: str = "", include_full_queries: bool = False
    ) -> str:
        """Get a query to list migrations."""
        query = cls.load_sql("get_migrations")
        return (
            query.replace("{limit}", str(limit))
            .replace("{offset}", str(offset))
            .replace("{name_pattern}", name_pattern)
            .replace("{include_full_queries}", str(include_full_queries).lower())
        )

    @classmethod
    def get_init_migrations_query(cls) -> str:
        """Get a query to initialize the migrations schema and table."""
        return cls.load_sql("init_migrations")

    @classmethod
    def get_create_migration_query(cls, version: str, name: str, statements: str) -> str:
        """Get a query to create a migration.

        Args:
            version: The migration version (timestamp)
            name: The migration name
            statements: The SQL statements (escaped)

        Returns:
            str: The SQL query to create a migration
        """
        query = cls.load_sql("create_migration")
        return query.replace("{version}", version).replace("{name}", name).replace("{statements}", statements)

    @classmethod
    def get_logs_query(cls, collection: str, where_clause: str = "", limit: int = 20) -> str:
        """Get a query to retrieve logs from a specific collection.

        Args:
            collection: The log collection name (e.g., postgres, api_gateway, auth)
            where_clause: The WHERE clause to filter logs
            limit: Maximum number of log entries to return

        Returns:
            str: The SQL query to retrieve logs

        Raises:
            FileNotFoundError: If the log collection SQL file doesn't exist
        """
        # Map collection names to SQL files
        collection_map = {
            "postgres": "logs/postgres_logs",
            "api_gateway": "logs/edge_logs",
            "auth": "logs/auth_logs",
            "postgrest": "logs/postgrest_logs",
            "pooler": "logs/supavisor_logs",
            "storage": "logs/storage_logs",
            "realtime": "logs/realtime_logs",
            "edge_functions": "logs/function_edge_logs",
            "cron": "logs/cron_logs",
            "pgbouncer": "logs/pgbouncer_logs",
        }

        # Get the SQL file path
        sql_file = collection_map.get(collection)
        if not sql_file:
            raise ValueError(f"Unknown log collection: {collection}")

        # Load the SQL template
        query = cls.load_sql(sql_file)

        # Handle special case for cron logs
        if collection == "cron":
            return query.replace("{and_where_clause}", where_clause).replace("{limit}", str(limit))
        else:
            return query.replace("{where_clause}", where_clause).replace("{limit}", str(limit))

```

--------------------------------------------------------------------------------
/supabase_mcp/tools/descriptions/api_tools.yaml:
--------------------------------------------------------------------------------

```yaml
# API tools descriptions

send_management_api_request: |
  Execute a Supabase Management API request.

  This tool allows you to make direct calls to the Supabase Management API, which provides
  programmatic access to manage your Supabase project settings, resources, and configurations.

  REQUEST FORMATTING:
  - Use paths exactly as defined in the API specification
  - The {ref} parameter will be automatically injected from settings
  - Format request bodies according to the API specification

  PARAMETERS:
  - method: HTTP method (GET, POST, PUT, PATCH, DELETE)
  - path: API path (e.g. /v1/projects/{ref}/functions)
  - path_params: Path parameters as dict (e.g. {"function_slug": "my-function"}) - use empty dict {} if not needed
  - request_params: Query parameters as dict (e.g. {"key": "value"}) - use empty dict {} if not needed
  - request_body: Request body as dict (e.g. {"name": "test"}) - use empty dict {} if not needed

  PATH PARAMETERS HANDLING:
  - The {ref} placeholder (project reference) is automatically injected - you don't need to provide it
  - All other path placeholders must be provided in the path_params dictionary
  - Common placeholders include:
    * {function_slug}: For Edge Functions operations
    * {id}: For operations on specific resources (API keys, auth providers, etc.)
    * {slug}: For organization operations
    * {branch_id}: For database branch operations
    * {provider_id}: For SSO provider operations
    * {tpa_id}: For third-party auth operations

  EXAMPLES:
  1. GET request with path and query parameters:
     method: "GET"
     path: "/v1/projects/{ref}/functions/{function_slug}"
     path_params: {"function_slug": "my-function"}
     request_params: {"version": "1"}
     request_body: {}

  2. POST request with body:
     method: "POST"
     path: "/v1/projects/{ref}/functions"
     path_params: {}
     request_params: {}
     request_body: {"name": "test-function", "slug": "test-function"}

  SAFETY SYSTEM:
  API operations are categorized by risk level:
  - LOW RISK: Read operations (GET) - allowed in SAFE mode
  - MEDIUM/HIGH RISK: Write operations (POST, PUT, PATCH, DELETE) - require UNSAFE mode
  - EXTREME RISK: Destructive operations - require UNSAFE mode and confirmation
  - BLOCKED: Some operations are completely blocked for safety reasons

  SAFETY CONSIDERATIONS:
  - By default, the API client starts in SAFE mode, allowing only read operations
  - To perform write operations, first use live_dangerously(service="api", enable=True)
  - High-risk operations will be rejected with a confirmation ID
  - Use confirm_destructive_operation with the provided ID after reviewing risks
  - Some operations may be completely blocked for safety reasons

  For a complete list of available API endpoints and their parameters, use the get_management_api_spec tool.
  For details on safety rules, use the get_management_api_safety_rules tool.

get_management_api_spec: |
  Get the complete Supabase Management API specification.

  Returns the full OpenAPI specification for the Supabase Management API, including:
  - All available endpoints and operations
  - Required and optional parameters for each operation
  - Request and response schemas
  - Authentication requirements
  - Safety information for each operation

  This tool can be used in four different ways:
  1. Without parameters: Returns all domains (default)
  2. With path and method: Returns the full specification for a specific API endpoint
  3. With domain only: Returns all paths and methods within that domain
  4. With all_paths=True: Returns all paths and methods

  Parameters:
  - params: Dictionary containing optional parameters:
      - path: Optional API path (e.g., "/v1/projects/{ref}/functions")
      - method: Optional HTTP method (e.g., "GET", "POST")
      - domain: Optional domain/tag name (e.g., "Auth", "Storage")
      - all_paths: Optional boolean, if True returns all paths and methods

  Available domains:
  - Analytics: Analytics-related endpoints
  - Auth: Authentication and authorization endpoints
  - Database: Database management endpoints
  - Domains: Custom domain configuration endpoints
  - Edge Functions: Serverless function management endpoints
  - Environments: Environment configuration endpoints
  - OAuth: OAuth integration endpoints
  - Organizations: Organization management endpoints
  - Projects: Project management endpoints
  - Rest: RESTful API endpoints
  - Secrets: Secret management endpoints
  - Storage: Storage management endpoints

  This specification is useful for understanding:
  - What operations are available through the Management API
  - How to properly format requests for each endpoint
  - Which operations require unsafe mode
  - What data structures to expect in responses

  SAFETY: This is a low-risk read operation that can be executed in SAFE mode.

```

--------------------------------------------------------------------------------
/tests/test_settings.py:
--------------------------------------------------------------------------------

```python
from unittest.mock import patch

import pytest
from pydantic import ValidationError

from supabase_mcp.settings import SUPPORTED_REGIONS, Settings


@pytest.fixture(autouse=True)
def reset_settings_singleton() -> None:
    """Reset the Settings singleton before each test"""
    # Clear singleton instance if it exists
    if hasattr(Settings, "_instance"):
        delattr(Settings, "_instance")
    yield
    # Clean up after test
    if hasattr(Settings, "_instance"):
        delattr(Settings, "_instance")


class TestSettings:
    """Integration tests for Settings."""

    @pytest.mark.integration
    def test_settings_default_values(self, clean_environment: None) -> None:
        """Test default values (no config file, no env vars)"""
        settings = Settings.with_config()  # No config file
        assert settings.supabase_project_ref == "127.0.0.1:54322"
        assert settings.supabase_db_password == "postgres"
        assert settings.supabase_region == "us-east-1"
        assert settings.supabase_access_token is None
        assert settings.supabase_service_role_key is None

    @pytest.mark.integration
    def test_settings_from_env_test(self, clean_environment: None) -> None:
        """Test loading from .env.test"""
        import os

        settings = Settings.with_config(".env.test")

        # In CI, we expect default values since .env.test might not be properly set up
        if os.environ.get("CI") == "true":
            assert settings.supabase_project_ref == "127.0.0.1:54322"  # Default value in CI
            assert settings.supabase_db_password == "postgres"  # Default value in CI
        else:
            # In local dev, we expect .env.test to override defaults
            assert settings.supabase_project_ref != "127.0.0.1:54322"  # Should be overridden by .env.test
            assert settings.supabase_db_password != "postgres"  # Should be overridden by .env.test

        # Check that the values are not empty
        assert settings.supabase_project_ref, "Project ref should not be empty"
        assert settings.supabase_db_password, "DB password should not be empty"

    @pytest.mark.integration
    def test_settings_from_env_vars(self, clean_environment: None) -> None:
        """Test env vars take precedence over config file"""
        env_values = {
            "SUPABASE_PROJECT_REF": "abcdefghij1234567890",  # Valid 20-char project ref
            "SUPABASE_DB_PASSWORD": "env-password",
        }
        with patch.dict("os.environ", env_values, clear=False):
            settings = Settings.with_config(".env.test")  # Even with config file
            assert settings.supabase_project_ref == "abcdefghij1234567890"
            assert settings.supabase_db_password == "env-password"

    @pytest.mark.integration
    def test_settings_integration_fixture(self, settings_integration: Settings) -> None:
        """Test the settings_integration fixture provides valid settings."""
        # The settings_integration fixture should load from .env.test or environment variables
        assert settings_integration.supabase_project_ref, "Project ref should not be empty"
        assert settings_integration.supabase_db_password, "DB password should not be empty"
        assert settings_integration.supabase_region, "Region should not be empty"

    @pytest.mark.integration
    def test_settings_region_validation(self) -> None:
        """Test region validation."""
        # Test default region
        settings = Settings()
        assert settings.supabase_region == "us-east-1"

        # Test valid region from environment
        env_values = {"SUPABASE_REGION": "ap-southeast-1"}
        with patch.dict("os.environ", env_values, clear=True):
            settings = Settings()
            assert settings.supabase_region == "ap-southeast-1"

        # Test invalid region
        with pytest.raises(ValidationError) as exc_info:
            env_values = {"SUPABASE_REGION": "invalid-region"}
            with patch.dict("os.environ", env_values, clear=True):
                Settings()
        assert "Region 'invalid-region' is not supported" in str(exc_info.value)

    @pytest.mark.integration
    def test_supported_regions(self) -> None:
        """Test that all supported regions are valid."""
        for region in SUPPORTED_REGIONS.__args__:
            env_values = {"SUPABASE_REGION": region}
            with patch.dict("os.environ", env_values, clear=True):
                settings = Settings()
                assert settings.supabase_region == region

    @pytest.mark.integration
    def test_settings_access_token_and_service_role(self) -> None:
        """Test access token and service role key settings."""
        # Test with environment variables
        env_values = {
            "SUPABASE_ACCESS_TOKEN": "test-access-token",
            "SUPABASE_SERVICE_ROLE_KEY": "test-service-role-key",
        }
        with patch.dict("os.environ", env_values, clear=True):
            settings = Settings()
            assert settings.supabase_access_token == "test-access-token"
            assert settings.supabase_service_role_key == "test-service-role-key"

        # Test defaults (should be None)
        with patch.dict("os.environ", {}, clear=True):
            settings = Settings()
            assert settings.supabase_access_token is None
            assert settings.supabase_service_role_key is None

```

--------------------------------------------------------------------------------
/supabase_mcp/tools/descriptions/database_tools.yaml:
--------------------------------------------------------------------------------

```yaml
# Database tool descriptions

get_db_schemas: |
  List all database schemas with their sizes and table counts.

  Returns a comprehensive overview of all schemas in the database, including:
  - Schema names
  - Total size of each schema
  - Number of tables in each schema
  - Owner information

  This is useful for getting a high-level understanding of the database structure.

  SAFETY: This is a low-risk read operation that can be executed in SAFE mode.

get_tables: |
  List all tables, foreign tables, and views in a schema with their sizes, row counts, and metadata.

  Provides detailed information about all database objects in the specified schema:
  - Table/view names
  - Object types (table, view, foreign table)
  - Row counts
  - Size on disk
  - Column counts
  - Index information
  - Last vacuum/analyze times

  Parameters:
  - schema_name: Name of the schema to inspect (e.g., 'public', 'auth', etc.)

  SAFETY: This is a low-risk read operation that can be executed in SAFE mode.

get_table_schema: |
  Get detailed table structure including columns, keys, and relationships.

  Returns comprehensive information about a specific table's structure:
  - Column definitions (names, types, constraints)
  - Primary key information
  - Foreign key relationships
  - Indexes
  - Constraints
  - Triggers

  Parameters:
  - schema_name: Name of the schema (e.g., 'public', 'auth')
  - table: Name of the table to inspect

  SAFETY: This is a low-risk read operation that can be executed in SAFE mode.

execute_postgresql: |
  Execute PostgreSQL statements against your Supabase database.

  IMPORTANT: All SQL statements must end with a semicolon (;).

  OPERATION TYPES AND REQUIREMENTS:
  1. READ Operations (SELECT, EXPLAIN, etc.):
     - Can be executed directly without special requirements
     - Example: SELECT * FROM public.users LIMIT 10;

  2. WRITE Operations (INSERT, UPDATE, DELETE):
     - Require UNSAFE mode (use live_dangerously('database', True) first)
     - Example:
       INSERT INTO public.users (email) VALUES ('[email protected]');

  3. SCHEMA Operations (CREATE, ALTER, DROP):
     - Require UNSAFE mode (use live_dangerously('database', True) first)
     - Destructive operations (DROP, TRUNCATE) require additional confirmation
     - Example:
       CREATE TABLE public.test_table (id SERIAL PRIMARY KEY, name TEXT);

  MIGRATION HANDLING:
  All queries that modify the database will be automatically version controlled by the server. You can provide optional migration name, if you want to name the migration.
   - Respect the following format: verb_noun_detail. Be descriptive and concise.
   - Examples:
     - create_users_table
     - add_email_to_profiles
     - enable_rls_on_users
   - If you don't provide a migration name, the server will generate one based on the SQL statement
   - The system will sanitize your provided name to ensure compatibility with database systems
   - Migration names are prefixed with a timestamp in the format YYYYMMDDHHMMSS

  SAFETY SYSTEM:
  Operations are categorized by risk level:
  - LOW RISK: Read operations (SELECT, EXPLAIN) - allowed in SAFE mode
  - MEDIUM RISK: Write operations (INSERT, UPDATE, DELETE) - require UNSAFE mode
  - HIGH RISK: Schema operations (CREATE, ALTER) - require UNSAFE mode
  - EXTREME RISK: Destructive operations (DROP, TRUNCATE) - require UNSAFE mode and confirmation

  TRANSACTION HANDLING:
  - DO NOT use transaction control statements (BEGIN, COMMIT, ROLLBACK)
  - The database client automatically wraps queries in transactions
  - The SQL validator will reject queries containing transaction control statements
  - This ensures atomicity and provides rollback capability for data modifications

  MULTIPLE STATEMENTS:
  - You can send multiple SQL statements in a single query
  - Each statement will be executed in order within the same transaction
  - Example:
    CREATE TABLE public.test_table (id SERIAL PRIMARY KEY, name TEXT);
    INSERT INTO public.test_table (name) VALUES ('test');

  CONFIRMATION FLOW FOR HIGH-RISK OPERATIONS:
  - High-risk operations (DROP TABLE, TRUNCATE, etc.) will be rejected with a confirmation ID
  - The error message will explain what happened and provide a confirmation ID
  - Review the risks with the user before proceeding
  - Use the confirm_destructive_operation tool with the provided ID to execute the operation

  IMPORTANT GUIDELINES:
  - The database client starts in SAFE mode by default for safety
  - Only enable UNSAFE mode when you need to modify data or schema
  - Never mix READ and WRITE operations in the same transaction
  - For destructive operations, be prepared to confirm with the confirm_destructive_operation tool

  WHEN TO USE OTHER TOOLS INSTEAD:
  - For Auth operations (users, authentication, etc.): Use call_auth_admin_method instead of direct SQL
    The Auth Admin SDK provides safer, validated methods for user management
  - For project configuration, functions, storage, etc.: Use send_management_api_request
    The Management API handles Supabase platform features that aren't directly in the database

  Note: This tool operates on the PostgreSQL database only. API operations use separate safety controls.

retrieve_migrations: |
  Retrieve a list of all migrations a user has from Supabase.

  Returns a list of migrations with the following information:
  - Version (timestamp)
  - Name
  - SQL statements (if requested)
  - Statement count
  - Version type (named or numbered)

  Parameters:
  - limit: Maximum number of migrations to return (default: 50, max: 100)
  - offset: Number of migrations to skip for pagination (default: 0)
  - name_pattern: Optional pattern to filter migrations by name. Uses SQL ILIKE pattern matching (case-insensitive).
    The pattern is automatically wrapped with '%' wildcards, so "users" will match "create_users_table",
    "add_email_to_users", etc. To search for an exact match, use the complete name.
  - include_full_queries: Whether to include the full SQL statements in the result (default: false)

  SAFETY: This is a low-risk read operation that can be executed in SAFE mode.

```

--------------------------------------------------------------------------------
/tests/test_main.py:
--------------------------------------------------------------------------------

```python
import asyncio
from unittest.mock import patch

import pytest

from supabase_mcp.core.container import ServicesContainer
from supabase_mcp.logger import logger
from supabase_mcp.main import run_inspector, run_server
from supabase_mcp.services.safety.models import ClientType
from supabase_mcp.tools.manager import ToolName

# === UNIT TESTS ===


class TestMain:
    """Tests for the main application functionality."""

    @pytest.mark.unit
    def test_mcp_server_initializes(self, container_integration: ServicesContainer):
        """Test that the MCP server initializes correctly."""
        # Verify server name
        mcp = container_integration.mcp_server
        assert mcp.name == "supabase"

        # Verify MCP server is created but not yet initialized with tools
        tools = asyncio.run(mcp.list_tools())
        logger.info(f"Found {len(tools)} MCP tools registered in basic container")

        # At this point, no tools should be registered yet
        assert len(tools) == 0, f"Expected 0 tools in basic container, but got {len(tools)}"

    @pytest.mark.unit
    def test_services_container_initialization(
        self,
        initialized_container_integration: ServicesContainer,
    ):
        """Test that the services container is correctly initialized."""
        # Verify container has all required services
        container = initialized_container_integration
        assert container.postgres_client is not None
        assert container.api_client is not None
        assert container.sdk_client is not None
        assert container.api_manager is not None
        assert container.safety_manager is not None
        assert container.query_manager is not None
        assert container.tool_manager is not None

        # Verify the container is fully initialized
        # Check that safety manager has been initialized by verifying it has configs registered
        safety_manager = container.safety_manager
        assert safety_manager.get_safety_mode(ClientType.DATABASE) is not None
        assert safety_manager.get_safety_mode(ClientType.API) is not None

    @pytest.mark.unit
    def test_tool_registration(self, tools_registry_integration: ServicesContainer):
        """Test that tools are registered correctly using ToolManager's tool names."""

        # Get the tool manager from the container
        tool_manager = tools_registry_integration.tool_manager
        assert tool_manager is not None, "Tool manager should be initialized"

        # Get the MCP server from the container
        mcp = tools_registry_integration.mcp_server

        # Get expected tools from ToolName enum
        expected_tools = [
            ToolName.GET_SCHEMAS,
            ToolName.GET_TABLES,
            ToolName.GET_TABLE_SCHEMA,
            ToolName.EXECUTE_POSTGRESQL,
            ToolName.CONFIRM_DESTRUCTIVE_OPERATION,
            ToolName.RETRIEVE_MIGRATIONS,
            ToolName.LIVE_DANGEROUSLY,
            ToolName.SEND_MANAGEMENT_API_REQUEST,
            ToolName.GET_MANAGEMENT_API_SPEC,
            ToolName.GET_AUTH_ADMIN_METHODS_SPEC,
            ToolName.CALL_AUTH_ADMIN_METHOD,
            ToolName.RETRIEVE_LOGS,
        ]

        # Verify tools are registered in MCP
        registered_tools = asyncio.run(mcp.list_tools())
        registered_tool_names = {tool.name for tool in registered_tools}

        # We should have exactly 12 tools (all the tools defined in ToolName enum)
        assert len(registered_tools) == 12, f"Expected 12 tools, but got {len(registered_tools)}"

        # Log the actual number of tools for reference
        logger.info(f"Found {len(registered_tools)} MCP tools registered")

        # Verify each tool has proper MCP protocol structure
        for tool in registered_tools:
            assert tool.name, "Tool must have a name"
            assert tool.description, "Tool must have a description"
            assert tool.inputSchema, "Tool must have an input schema"

        # Check that each expected tool is registered by its string value
        for tool_name in expected_tools:
            # Convert enum to string value (e.g., 'get_schemas' instead of ToolName.GET_SCHEMAS)
            tool_str_value = str(tool_name.value)
            assert tool_str_value in registered_tool_names, f"Tool {tool_name} not registered"

        # Verify we have tools for core functionality categories
        # Instead of checking specific names, check for categories of functionality
        tool_names = {tool.name for tool in registered_tools}

        # Log all available tools for debugging
        tool_list = ", ".join(sorted(tool_names))
        logger.info(f"Available MCP tools: {tool_list}")

    @pytest.mark.unit
    def test_run_server_starts(self):
        """Test that run_server starts the server."""
        # Patch the global mcp instance and its run method
        with patch("supabase_mcp.main.mcp") as mock_mcp:
            # Call run_server which should use the global mcp instance
            run_server()
            mock_mcp.run.assert_called_once()

    @pytest.mark.unit
    def test_inspector_mode(self):
        """Test that inspector mode initializes correctly"""
        # This test is fine as is since it's testing the global function
        # and mocking an external dependency
        with patch("mcp.cli.cli.dev") as mock_dev:
            # Patch __file__ in the main module to match what we expect
            with patch("supabase_mcp.main.__file__", __file__):
                run_inspector()
                mock_dev.assert_called_once_with(__file__)

    @pytest.mark.unit
    def test_server_command_exists(self):
        """Test that the server command exists and is executable"""
        import os
        import shutil

        # Skip this test in CI environments
        if os.environ.get("CI") == "true":
            pytest.skip("Skipping server command test in CI environment")

        # Check if the command exists in PATH
        server_path = shutil.which("supabase-mcp-server")
        assert server_path is not None, "supabase-mcp-server command not found in PATH"

        # Check if the file is executable
        assert os.access(server_path, os.X_OK), "supabase-mcp-server is not executable"

```

--------------------------------------------------------------------------------
/CHANGELOG.MD:
--------------------------------------------------------------------------------

```markdown
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).


## [0.3.12] - 2025-03-12
### Added
- Implemented a new `retrieve_logs` tool that allows retrieval of logs from any Supabase log collection - Postgres, PostgREST, Auth, Edge Functions and others. Provides a way to query and filter
- Implemented log rotation to prevent unbounded log file growth (5MB limit with 3 backup files)

### Changed
- Improved region configuration with clearer error messages for region mismatches
- Updated smithery.yaml to reduce configuration error rate (Tenant not found)
- Improved PostgreSQL client connection error handling with specific guidance for "Tenant or user not found" errors


## [0.3.11] - 2025-03-10
### Fixed
- Fixed an error with creating a migration file when a user doesn't have `supabase_migrations` schema


## [0.3.10] - 2025-03-09
### Added
- Enhanced migration naming system with improved object type detection for procedures, functions, and views.
- Expanded `retrieve_migrations` tool with pagination, name pattern filtering, and option to include full SQL queries.
- Added a check to validate personal access token and service role key are set before calling API or SDK methods

### Changed
- Updated setup instructions for Claude Desktop
- Updated setup instructions for Cline
- Updated and fixed Smithery.ai setup

### Removed
- Removed redundant `get_api_safety_rules` tool since exceptions already provide enough information to the client


## [0.3.9] - 2025-03-08
### Fixed
- Fixed an issue with api spec tool that prevented spec retrieval


## [0.3.8] - 2025-03-07
### Added
- SQL query validation using PostgreSQL's parser (pglast v7.3+)
- Automatic migration script generation for schema changes
- Universal safety system with standardized risk levels (Low/Medium/High/Extreme)
- Switched to asyncpg v0.30.0+ from psycopg2
- Enhanced API spec tool with multiple query modes and risk assessment
- Connection retry logic for database and API operations
- Code coverage with pytest-cov
- SQL linting with SQLFluff
- Added pyyaml v6.0.2+ for configuration

### Changed
- Refactored to use dependency injection pattern
- Standardized service initialization to synchronous pattern
- Improved SQL safety categorization:
  - `safe`: Read-only operations (always allowed)
  - `write`: Data modification (requires unsafe mode)
  - `destructive`: Schema changes (requires unsafe mode + confirmation)
- Updated Ruff to v0.9.9
- Added asyncpg-stubs and pytest-mock for testing

## [0.3.7] - 2025-03-02
### Fixed
- Documentation inaccuracies

### Added
- Auth admin SDK support for local Supabase instances


## [0.3.6] - 2025-02-26
### Added
- Added `call_auth_admin_method` which enables MCP server to manage users in your database (create, update, delete, confirm). All Auth SDK methods are supported
- Added `get_auth_admin_methods_spec` to retrieve documentation for all available Auth Admin methods. Response objects now use attribute access (dot notation) instead of dictionary access.

### Fixed
- Fixed an issue with improper encoding of database passwords. Previously passwords containing "%" symbol led to connection failures


## [0.3.5] - 2025-02-26
### Fixed
- Fixed an issue with `get_tables` so that it reliably returns foreign tables and views
- Updated docs to describe how to setup mcp.json with project-specific MCPs
- Expanded and improved test suite to cover each MCP tool


## [0.3.4] - 2025-02-25
### Fixed
- Improved `get_tables` to return foreign data tables


## [0.3.3] - 2025-02-25
### Fixed
- Fixed a bug with `readonly` scope being incorrectly managed in db client

## [0.3.2] - 2025-02-25
### Fixed
- Fixed a bug preventing execution of DDL commands (create, alter tables, etc.)

## [0.3.1] - 2025-02-23
### Changed
- Significantly improved docs to make install, configuration, usage instructions super clear


## [0.3.0] - 2025-02-23
### Added
- Full support for read-write SQL operations:
  - Implemented safety mode system with read-only (default) and read-write modes
  - Added mode switching with automatic reset to read-only
  - Enhanced transaction support for testing write operations
  - Improved error handling for read-only violations
- Support for Supabase Management API
  - Introduces supabase management API integration with safe (enabled by default) and yolo modes
  - Includes the following tools:
    - `send_management_api_request` to send arbitrary requests to Supabase Management API, with auto-injection of project ref and safety mode control.
    - `get_management_api_spec` to get the enriched API specification with safety information
    - `get_management_api_safety_rules` to get all safety rules including blocked and unsafe operations with human-readable explanations
    - `live_dangerously` to switch to yolo mode
  - Safety features:
    - Divides API methods into `safe`, `unsafe` and `blocked` categories based on the risk of the operation
    - Allows to switch between safe and yolo modes dynamically
    - Blocked operations (delete project, delete database) are not allowed regardless of the mode


## [0.2.2] - 2025-02-20
### Added
- Support for different Supabase regions:
  - Configuration via `SUPABASE_REGION` environment variable
  - Validation for all 16 supported AWS regions
  - Default to `us-east-1` for backward compatibility
  - Enhanced logging for region information
  - Comprehensive documentation and examples

## [0.2.1] - 2025-02-19
### Added
- Package distribution support:
  - PyPI package publishing setup
  - Installation via `pipx` and `uv`
  - Entry point scripts for direct execution
- Smithery.ai deployment configuration

### Changed
- BREAKING: Installation and execution methods:
  - Switched from direct script execution to proper module structure
  - Updated Cursor/Windsurf configuration for package-based execution
- Improved setup instructions in README

## [0.2.0] - 2025-02-18
Intermediary release for package distribution support

## [0.1.0] - 2025-02-16
### Added
- Initial release
- Basic MCP server functionality
- Supabase database connection support
- Integration with Cursor and Windsurf IDEs

[0.3.0]: https://github.com/alexander-zuev/supabase-mcp-server/releases/tag/v0.3.0
[0.2.2]: https://github.com/alexander-zuev/supabase-mcp-server/releases/tag/v0.2.2
[0.2.1]: https://github.com/alexander-zuev/supabase-mcp-server/releases/tag/v0.2.1
[0.2.0]: https://github.com/alexander-zuev/supabase-mcp-server/releases/tag/v0.2.0-dev0
[0.1.0]: https://github.com/alexander-zuev/supabase-mcp-server/releases/tag/v0.1.0

```

--------------------------------------------------------------------------------
/tests/services/api/test_api_manager.py:
--------------------------------------------------------------------------------

```python
from typing import Any
from unittest.mock import MagicMock, patch

import pytest

from supabase_mcp.exceptions import SafetyError
from supabase_mcp.services.api.api_manager import SupabaseApiManager
from supabase_mcp.services.safety.models import ClientType


class TestApiManager:
    """Tests for the API Manager."""

    @pytest.mark.unit
    def test_path_parameter_replacement(self, mock_api_manager: SupabaseApiManager):
        """
        Test that path parameters are correctly replaced in API paths.

        This test verifies that the API Manager correctly replaces path placeholders
        with actual values, handling both required and optional parameters.
        """
        # Use the mock_api_manager fixture instead of creating one manually
        api_manager = mock_api_manager

        # Test with a simple path and required parameters (avoiding 'ref' which is auto-injected)
        path = "/v1/organizations/{slug}/members"
        path_params = {"slug": "example-org"}

        result = api_manager.replace_path_params(path, path_params)
        expected = "/v1/organizations/example-org/members"
        assert result == expected, f"Expected {expected}, got {result}"

        # Test with missing required parameters
        path = "/v1/organizations/{slug}/members/{id}"
        path_params = {"slug": "example-org"}

        with pytest.raises(ValueError) as excinfo:
            api_manager.replace_path_params(path, path_params)
        assert "Missing path parameters" in str(excinfo.value)

        # Test with extra parameters (should be ignored)
        path = "/v1/organizations/{slug}"
        path_params = {"slug": "example-org", "extra": "should-be-ignored"}

        with pytest.raises(ValueError) as excinfo:
            api_manager.replace_path_params(path, path_params)
        assert "Unknown path parameter" in str(excinfo.value)

        # Test with no parameters
        path = "/v1/organizations"
        result = api_manager.replace_path_params(path)
        expected = "/v1/organizations"
        assert result == expected, f"Expected {expected}, got {result}"

    @pytest.mark.asyncio
    @pytest.mark.unit
    @patch("supabase_mcp.services.api.api_manager.logger")
    async def test_safety_validation(self, mock_logger: MagicMock, mock_api_manager: SupabaseApiManager):
        """
        Test that API operations are properly validated through the safety manager.

        This test verifies that the API Manager correctly validates operations
        before executing them, and handles safety errors appropriately.
        """
        # Use the mock_api_manager fixture instead of creating one manually
        api_manager = mock_api_manager

        # Mock the replace_path_params method to return the path unchanged
        api_manager.replace_path_params = MagicMock(return_value="/v1/organizations/example-org")

        # Mock the client's execute_request method to return a simple response
        mock_response = {"success": True}
        api_manager.client.execute_request = MagicMock()
        api_manager.client.execute_request.return_value = mock_response

        # Make the mock awaitable
        async def mock_execute_request(*args: Any, **kwargs: Any) -> dict[str, Any]:
            return mock_response

        api_manager.client.execute_request = mock_execute_request

        # Test a successful operation
        method = "GET"
        path = "/v1/organizations/{slug}"
        path_params = {"slug": "example-org"}

        result = await api_manager.execute_request(method, path, path_params)

        # Verify that the safety manager was called with the correct parameters
        api_manager.safety_manager.validate_operation.assert_called_once_with(
            ClientType.API, (method, path, path_params, None, None), has_confirmation=False
        )

        # Verify that the result is what we expected
        assert result == {"success": True}

        # Test an operation that fails safety validation
        api_manager.safety_manager.validate_operation.reset_mock()

        # Make the safety manager raise a SafetyError
        def raise_safety_error(*args: Any, **kwargs: Any) -> None:
            raise SafetyError("Operation not allowed")

        api_manager.safety_manager.validate_operation.side_effect = raise_safety_error

        # The execute_request method should raise the SafetyError
        with pytest.raises(SafetyError) as excinfo:
            await api_manager.execute_request("DELETE", "/v1/organizations/{slug}", {"slug": "example-org"})

        assert "Operation not allowed" in str(excinfo.value)

    @pytest.mark.asyncio
    @pytest.mark.unit
    async def test_retrieve_logs_basic(self, mock_api_manager: SupabaseApiManager):
        """
        Test that the retrieve_logs method correctly builds and executes a logs query.

        This test verifies that the API Manager correctly builds a logs query using
        the LogManager and executes it through the Management API.
        """
        # Mock the log_manager's build_logs_query method
        mock_api_manager.log_manager.build_logs_query = MagicMock(return_value="SELECT * FROM postgres_logs LIMIT 10")

        # Mock the execute_request method to return a simple response
        mock_response = {"result": [{"id": "123", "event_message": "test"}]}

        async def mock_execute_request(*args: Any, **kwargs: Any) -> dict[str, Any]:
            return mock_response

        mock_api_manager.execute_request = mock_execute_request

        # Execute the method
        result = await mock_api_manager.retrieve_logs(
            collection="postgres",
            limit=10,
            hours_ago=24,
        )

        # Verify that the log_manager was called with the correct parameters
        mock_api_manager.log_manager.build_logs_query.assert_called_once_with(
            collection="postgres",
            limit=10,
            hours_ago=24,
            filters=None,
            search=None,
            custom_query=None,
        )

        # Verify that the result is what we expected
        assert result == {"result": [{"id": "123", "event_message": "test"}]}

    @pytest.mark.asyncio
    @pytest.mark.unit
    async def test_retrieve_logs_error_handling(self, mock_api_manager: SupabaseApiManager):
        """
        Test that the retrieve_logs method correctly handles errors.

        This test verifies that the API Manager correctly handles errors that occur
        during log retrieval and propagates them to the caller.
        """
        # Mock the log_manager's build_logs_query method
        mock_api_manager.log_manager.build_logs_query = MagicMock(return_value="SELECT * FROM postgres_logs LIMIT 10")

        # Mock the execute_request method to raise an exception
        async def mock_execute_request_error(*args: Any, **kwargs: Any) -> dict[str, Any]:
            raise Exception("API error")

        mock_api_manager.execute_request = mock_execute_request_error

        # The retrieve_logs method should propagate the exception
        with pytest.raises(Exception) as excinfo:
            await mock_api_manager.retrieve_logs(collection="postgres")

        assert "API error" in str(excinfo.value)

```

--------------------------------------------------------------------------------
/tests/test_tool_manager.py:
--------------------------------------------------------------------------------

```python
from unittest.mock import MagicMock, mock_open, patch

from supabase_mcp.tools.manager import ToolManager, ToolName


class TestToolManager:
    """Tests for the ToolManager class."""

    def test_singleton_pattern(self):
        """Test that ToolManager follows the singleton pattern."""
        # Get two instances
        manager1 = ToolManager.get_instance()
        manager2 = ToolManager.get_instance()

        # They should be the same object
        assert manager1 is manager2

        # Reset the singleton for other tests
        # pylint: disable=protected-access
        # We need to reset the singleton for test isolation
        ToolManager._instance = None  # type: ignore

    @patch("supabase_mcp.tools.manager.Path")
    @patch("supabase_mcp.tools.manager.yaml.safe_load")
    def test_load_descriptions(self, mock_yaml_load: MagicMock, mock_path: MagicMock):
        """Test that descriptions are loaded correctly from YAML files."""
        # Setup mock directory structure
        mock_file_path = MagicMock()
        mock_dir = MagicMock()

        # Mock the Path(__file__) call
        mock_path.return_value = mock_file_path
        mock_file_path.parent = mock_dir
        mock_dir.__truediv__.return_value = mock_dir  # For the / operator

        # Mock directory existence check
        mock_dir.exists.return_value = True

        # Mock the glob to return some YAML files
        mock_file1 = MagicMock()
        mock_file1.name = "database_tools.yaml"
        mock_file2 = MagicMock()
        mock_file2.name = "api_tools.yaml"
        mock_dir.glob.return_value = [mock_file1, mock_file2]

        # Mock the file open and YAML load
        mock_yaml_data = {"get_schemas": "Description for get_schemas", "get_tables": "Description for get_tables"}
        mock_yaml_load.return_value = mock_yaml_data

        # Create a new instance to trigger _load_descriptions
        with patch("builtins.open", mock_open(read_data="dummy yaml content")):
            # We need to create the manager to trigger _load_descriptions
            ToolManager()

        # Verify the descriptions were loaded
        assert mock_dir.glob.call_count > 0
        assert mock_dir.glob.call_args[0][0] == "*.yaml"
        assert mock_yaml_load.call_count >= 1

        # Reset the singleton for other tests
        # pylint: disable=protected-access
        ToolManager._instance = None  # type: ignore

    def test_get_description_valid_tool(self):
        """Test getting a description for a valid tool."""
        # Setup
        manager = ToolManager.get_instance()

        # Force the descriptions to have a known value for testing
        # pylint: disable=protected-access
        # We need to set the descriptions directly for testing
        manager.descriptions = {
            ToolName.GET_SCHEMAS.value: "Description for get_schemas",
            ToolName.GET_TABLES.value: "Description for get_tables",
        }

        # Test
        description = manager.get_description(ToolName.GET_SCHEMAS.value)

        # Verify
        assert description == "Description for get_schemas"

        # Reset the singleton for other tests
        # pylint: disable=protected-access
        ToolManager._instance = None  # type: ignore

    def test_get_description_invalid_tool(self):
        """Test getting a description for an invalid tool."""
        # Setup
        manager = ToolManager.get_instance()

        # Force the descriptions to have a known value for testing
        # pylint: disable=protected-access
        # We need to set the descriptions directly for testing
        manager.descriptions = {
            ToolName.GET_SCHEMAS.value: "Description for get_schemas",
            ToolName.GET_TABLES.value: "Description for get_tables",
        }

        # Test and verify
        description = manager.get_description("nonexistent_tool")
        assert description == ""  # The method returns an empty string for unknown tools

        # Reset the singleton for other tests
        # pylint: disable=protected-access
        ToolManager._instance = None  # type: ignore

    def test_all_tool_names_have_descriptions(self):
        """Test that all tools defined in ToolName enum have descriptions."""
        # Setup - get a fresh instance
        # Reset the singleton first to ensure we get a clean instance
        # pylint: disable=protected-access
        ToolManager._instance = None  # type: ignore

        # Get a fresh instance that will load the real YAML files
        manager = ToolManager.get_instance()

        # Print the loaded descriptions for debugging
        print(f"\nLoaded descriptions: {manager.descriptions}")

        # Verify that we have at least some descriptions loaded
        assert len(manager.descriptions) > 0, "No descriptions were loaded"

        # Check that descriptions are not empty
        empty_descriptions: list[str] = []
        for tool_name, description in manager.descriptions.items():
            if not description or len(description.strip()) == 0:
                empty_descriptions.append(tool_name)

        # Fail if we found any empty descriptions
        assert len(empty_descriptions) == 0, f"Found empty descriptions for tools: {empty_descriptions}"

        # Check that at least some of the tool names have descriptions
        found_descriptions = 0
        missing_descriptions: list[str] = []

        for tool_name in ToolName:
            description = manager.get_description(tool_name.value)
            if description:
                found_descriptions += 1
            else:
                missing_descriptions.append(tool_name.value)

        # Print missing descriptions for debugging
        if missing_descriptions:
            print(f"\nMissing descriptions for: {missing_descriptions}")

        # We should have at least some descriptions
        assert found_descriptions > 0, "No tool has a description"

        # Reset the singleton for other tests
        # pylint: disable=protected-access
        ToolManager._instance = None  # type: ignore

    @patch.object(ToolManager, "_load_descriptions")
    def test_initialization_loads_descriptions(self, mock_load_descriptions: MagicMock):
        """Test that descriptions are loaded during initialization."""
        # Create a new instance
        # We need to create the manager to trigger __init__
        ToolManager()

        # Verify _load_descriptions was called
        assert mock_load_descriptions.call_count > 0

        # Reset the singleton for other tests
        # pylint: disable=protected-access
        ToolManager._instance = None  # type: ignore

    def test_tool_enum_completeness(self):
        """Test that the ToolName enum contains all expected tools."""
        # Get all tool values from the enum
        tool_values = [tool.value for tool in ToolName]

        # Verify the total number of tools
        # Update this number when new tools are added
        expected_tool_count = 12
        assert len(tool_values) == expected_tool_count, f"Expected {expected_tool_count} tools, got {len(tool_values)}"

        # Verify specific tools are included
        assert "retrieve_logs" in tool_values, "retrieve_logs tool is missing from ToolName enum"

        # Reset the singleton for other tests
        # pylint: disable=protected-access
        ToolManager._instance = None  # type: ignore

```

--------------------------------------------------------------------------------
/tests/services/safety/test_api_safety_config.py:
--------------------------------------------------------------------------------

```python
"""
Unit tests for the APISafetyConfig class.

This file contains unit test cases for the APISafetyConfig class, which is responsible for
determining the risk level of API operations and whether they are allowed or require confirmation.
"""

import pytest

from supabase_mcp.services.safety.models import OperationRiskLevel, SafetyMode
from supabase_mcp.services.safety.safety_configs import APISafetyConfig, HTTPMethod


@pytest.mark.unit
class TestAPISafetyConfig:
    """Unit tests for the APISafetyConfig class."""

    def test_get_risk_level_low_risk(self):
        """Test getting risk level for low-risk operations (GET requests)."""
        config = APISafetyConfig()
        # API operations are tuples of (method, path, path_params, query_params, request_body)
        operation = ("GET", "/v1/projects/{ref}/functions", {}, {}, {})
        risk_level = config.get_risk_level(operation)
        assert risk_level == OperationRiskLevel.LOW

    def test_get_risk_level_medium_risk(self):
        """Test getting risk level for medium-risk operations (POST/PUT/PATCH)."""
        config = APISafetyConfig()

        # Test POST request
        operation = ("POST", "/v1/projects/{ref}/functions", {}, {}, {})
        risk_level = config.get_risk_level(operation)
        assert risk_level == OperationRiskLevel.MEDIUM

        # Test PUT request
        operation = ("PUT", "/v1/projects/{ref}/functions", {}, {}, {})
        risk_level = config.get_risk_level(operation)
        assert risk_level == OperationRiskLevel.MEDIUM

        # Test PATCH request
        operation = ("PATCH", "/v1/projects/{ref}/functions/{function_slug}", {}, {}, {})
        risk_level = config.get_risk_level(operation)
        assert risk_level == OperationRiskLevel.MEDIUM

    def test_get_risk_level_high_risk(self):
        """Test getting risk level for high-risk operations."""
        config = APISafetyConfig()

        # Test DELETE request for a function
        operation = ("DELETE", "/v1/projects/{ref}/functions/{function_slug}", {}, {}, {})
        risk_level = config.get_risk_level(operation)
        assert risk_level == OperationRiskLevel.HIGH

        # Test other high-risk operations
        high_risk_paths = [
            "/v1/projects/{ref}/branches/{branch_id}",
            "/v1/projects/{ref}/custom-hostname",
            "/v1/projects/{ref}/network-bans",
        ]

        for path in high_risk_paths:
            operation = ("DELETE", path, {}, {}, {})
            risk_level = config.get_risk_level(operation)
            assert risk_level == OperationRiskLevel.HIGH, f"Path {path} should be HIGH risk"

    def test_get_risk_level_extreme_risk(self):
        """Test getting risk level for extreme-risk operations."""
        config = APISafetyConfig()

        # Test DELETE request for a project
        operation = ("DELETE", "/v1/projects/{ref}", {}, {}, {})
        risk_level = config.get_risk_level(operation)
        assert risk_level == OperationRiskLevel.EXTREME

    def test_is_operation_allowed(self):
        """Test if operations are allowed based on risk level and safety mode."""
        config = APISafetyConfig()

        # Low risk operations should be allowed in both safe and unsafe modes
        assert config.is_operation_allowed(OperationRiskLevel.LOW, SafetyMode.SAFE) is True
        assert config.is_operation_allowed(OperationRiskLevel.LOW, SafetyMode.UNSAFE) is True

        # Medium/high risk operations should only be allowed in unsafe mode
        assert config.is_operation_allowed(OperationRiskLevel.MEDIUM, SafetyMode.SAFE) is False
        assert config.is_operation_allowed(OperationRiskLevel.MEDIUM, SafetyMode.UNSAFE) is True
        assert config.is_operation_allowed(OperationRiskLevel.HIGH, SafetyMode.SAFE) is False
        assert config.is_operation_allowed(OperationRiskLevel.HIGH, SafetyMode.UNSAFE) is True

        # Extreme risk operations should not be allowed in safe mode
        assert config.is_operation_allowed(OperationRiskLevel.EXTREME, SafetyMode.SAFE) is False
        # In the current implementation, extreme risk operations are never allowed
        assert config.is_operation_allowed(OperationRiskLevel.EXTREME, SafetyMode.UNSAFE) is False

    def test_needs_confirmation(self):
        """Test if operations need confirmation based on risk level."""
        config = APISafetyConfig()

        # Low and medium risk operations should not need confirmation
        assert config.needs_confirmation(OperationRiskLevel.LOW) is False
        assert config.needs_confirmation(OperationRiskLevel.MEDIUM) is False

        # High and extreme risk operations should need confirmation
        assert config.needs_confirmation(OperationRiskLevel.HIGH) is True
        assert config.needs_confirmation(OperationRiskLevel.EXTREME) is True

    def test_path_matching(self):
        """Test that path patterns are correctly matched."""
        config = APISafetyConfig()

        # Test exact path matching
        operation = ("GET", "/v1/projects/{ref}/functions", {}, {}, {})
        assert config.get_risk_level(operation) == OperationRiskLevel.LOW

        # Test path with parameters
        operation = ("GET", "/v1/projects/abc123/functions", {}, {}, {})
        assert config.get_risk_level(operation) == OperationRiskLevel.LOW

        # Test path with multiple parameters
        operation = ("DELETE", "/v1/projects/abc123/functions/my-function", {}, {}, {})
        assert config.get_risk_level(operation) == OperationRiskLevel.HIGH

        # Test path that doesn't match any pattern (should default to MEDIUM for non-GET)
        operation = ("DELETE", "/v1/some/unknown/path", {}, {}, {})
        assert config.get_risk_level(operation) == OperationRiskLevel.LOW

        # Test path that doesn't match any pattern (should default to LOW for GET)
        operation = ("GET", "/v1/some/unknown/path", {}, {}, {})
        assert config.get_risk_level(operation) == OperationRiskLevel.LOW

    def test_method_case_insensitivity(self):
        """Test that HTTP method matching is case-insensitive."""
        config = APISafetyConfig()

        # Test with lowercase method
        operation = ("get", "/v1/projects/{ref}/functions", {}, {}, {})
        assert config.get_risk_level(operation) == OperationRiskLevel.LOW

        # Test with uppercase method
        operation = ("GET", "/v1/projects/{ref}/functions", {}, {}, {})
        assert config.get_risk_level(operation) == OperationRiskLevel.LOW

        # Test with mixed case method
        operation = ("GeT", "/v1/projects/{ref}/functions", {}, {}, {})
        assert config.get_risk_level(operation) == OperationRiskLevel.LOW

    def test_path_safety_config_structure(self):
        """Test that the PATH_SAFETY_CONFIG structure is correctly defined."""
        config = APISafetyConfig()

        # Check that the config has the expected structure
        assert hasattr(config, "PATH_SAFETY_CONFIG")

        # Check that risk levels are represented as keys
        assert OperationRiskLevel.MEDIUM in config.PATH_SAFETY_CONFIG
        assert OperationRiskLevel.HIGH in config.PATH_SAFETY_CONFIG
        assert OperationRiskLevel.EXTREME in config.PATH_SAFETY_CONFIG

        # Check that each risk level has a dictionary of methods to paths
        for risk_level, methods_dict in config.PATH_SAFETY_CONFIG.items():
            assert isinstance(methods_dict, dict)
            for method, paths in methods_dict.items():
                assert isinstance(method, HTTPMethod)
                assert isinstance(paths, list)
                for path in paths:
                    assert isinstance(path, str)

```

--------------------------------------------------------------------------------
/supabase_mcp/settings.py:
--------------------------------------------------------------------------------

```python
import os
from pathlib import Path
from typing import Literal

from pydantic import Field, ValidationInfo, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict

from supabase_mcp.logger import logger

SUPPORTED_REGIONS = Literal[
    "us-west-1",  # West US (North California)
    "us-east-1",  # East US (North Virginia)
    "us-east-2",  # East US (Ohio)
    "ca-central-1",  # Canada (Central)
    "eu-west-1",  # West EU (Ireland)
    "eu-west-2",  # West Europe (London)
    "eu-west-3",  # West EU (Paris)
    "eu-central-1",  # Central EU (Frankfurt)
    "eu-central-2",  # Central Europe (Zurich)
    "eu-north-1",  # North EU (Stockholm)
    "ap-south-1",  # South Asia (Mumbai)
    "ap-southeast-1",  # Southeast Asia (Singapore)
    "ap-northeast-1",  # Northeast Asia (Tokyo)
    "ap-northeast-2",  # Northeast Asia (Seoul)
    "ap-southeast-2",  # Oceania (Sydney)
    "sa-east-1",  # South America (São Paulo)
]


def find_config_file(env_file: str = ".env") -> str | None:
    """Find the specified env file in order of precedence:
    1. Current working directory (where command is run)
    2. Global config:
       - Windows: %APPDATA%/supabase-mcp/{env_file}
       - macOS/Linux: ~/.config/supabase-mcp/{env_file}

    Args:
        env_file: The name of the environment file to look for (default: ".env")

    Returns:
        The path to the found config file, or None if not found
    """
    # 1. Check current directory
    cwd_config = Path.cwd() / env_file
    if cwd_config.exists():
        return str(cwd_config)

    # 2. Check global config
    home = Path.home()
    if os.name == "nt":  # Windows
        global_config = Path(os.environ.get("APPDATA", "")) / "supabase-mcp" / ".env"
    else:  # macOS/Linux
        global_config = home / ".config" / "supabase-mcp" / ".env"

    if global_config.exists():
        logger.error(
            f"DEPRECATED: {global_config} is deprecated and will be removed in a future release. "
            "Use your IDE's native .json config file to configure access to MCP."
        )
        return str(global_config)

    return None


class Settings(BaseSettings):
    """Initializes settings for Supabase MCP server."""

    supabase_project_ref: str = Field(
        default="127.0.0.1:54322",  # Local Supabase default
        description="Supabase project ref - Must be 20 chars for remote projects, can be local address for development",
        alias="SUPABASE_PROJECT_REF",
    )
    supabase_db_password: str | None = Field(
        default=None,  # Will be validated based on project_ref
        description="Supabase database password - Required for remote projects, defaults to 'postgres' for local",
        alias="SUPABASE_DB_PASSWORD",
    )
    supabase_region: str = Field(
        default="us-east-1",  # East US (North Virginia) - Supabase's default region
        description="Supabase region for connection",
        alias="SUPABASE_REGION",
    )
    supabase_access_token: str | None = Field(
        default=None,
        description="Optional personal access token for accessing Supabase Management API",
        alias="SUPABASE_ACCESS_TOKEN",
    )
    supabase_service_role_key: str | None = Field(
        default=None,
        description="Optional service role key for accessing Python SDK",
        alias="SUPABASE_SERVICE_ROLE_KEY",
    )

    supabase_api_url: str = Field(
        default="https://api.supabase.com",
        description="Supabase API URL",
    )

    query_api_key: str = Field(
        default="test-key",
        description="TheQuery.dev API key",
        alias="QUERY_API_KEY",
    )

    query_api_url: str = Field(
        default="https://api.thequery.dev/v1",
        description="TheQuery.dev API URL",
        alias="QUERY_API_URL",
    )

    @field_validator("supabase_region")
    @classmethod
    def validate_region(cls, v: str, info: ValidationInfo) -> str:
        """Validate that the region is supported by Supabase."""
        # Get the project_ref from the values
        values = info.data
        project_ref = values.get("supabase_project_ref", "")

        # If this is a remote project and region is the default
        if not project_ref.startswith("127.0.0.1") and v == "us-east-1" and "SUPABASE_REGION" not in os.environ:
            logger.warning(
                "You're connecting to a remote Supabase project but haven't specified a region. "
                "Using default 'us-east-1', which may cause 'Tenant or user not found' errors if incorrect. "
                "Please set the correct SUPABASE_REGION in your configuration."
            )

        # Validate that the region is supported
        if v not in SUPPORTED_REGIONS.__args__:
            supported = "\n  - ".join([""] + list(SUPPORTED_REGIONS.__args__))
            raise ValueError(f"Region '{v}' is not supported. Supported regions are:{supported}")
        return v

    @field_validator("supabase_project_ref")
    @classmethod
    def validate_project_ref(cls, v: str) -> str:
        """Validate the project ref format."""
        if v.startswith("127.0.0.1"):
            # Local development - allow default format
            return v

        # Remote project - must be 20 chars
        if len(v) != 20:
            logger.error("Invalid Supabase project ref format")
            raise ValueError(
                "Invalid Supabase project ref format. "
                "Remote project refs must be exactly 20 characters long. "
                f"Got {len(v)} characters instead."
            )
        return v

    @field_validator("supabase_db_password")
    @classmethod
    def validate_db_password(cls, v: str | None, info: ValidationInfo) -> str:
        """Validate database password based on project type."""
        project_ref = info.data.get("supabase_project_ref", "")

        # For local development, allow default password
        if project_ref.startswith("127.0.0.1"):
            return v or "postgres"  # Default to postgres for local

        # For remote projects, password is required
        if not v:
            logger.error("SUPABASE_DB_PASSWORD is required when connecting to a remote instance")
            raise ValueError(
                "Database password is required for remote Supabase projects. "
                "Please set SUPABASE_DB_PASSWORD in your environment variables."
            )
        return v

    @classmethod
    def with_config(cls, config_file: str | None = None) -> "Settings":
        """Create Settings with a specific config file.

        Args:
            config_file: Path to .env file to use, or None for no config file
        """

        # Create a new Settings class with the specific config
        class SettingsWithConfig(cls):
            model_config = SettingsConfigDict(env_file=config_file, env_file_encoding="utf-8")

        instance = SettingsWithConfig()

        # Log configuration source and precedence - simplified to a single clear message
        env_vars_present = any(var in os.environ for var in ["SUPABASE_PROJECT_REF", "SUPABASE_DB_PASSWORD"])

        if env_vars_present and config_file:
            logger.info(f"Using environment variables (highest precedence) over config file: {config_file}")
        elif env_vars_present:
            logger.info("Using environment variables for configuration")
        elif config_file:
            logger.info(f"Using settings from config file: {config_file}")
        else:
            logger.info("Using default settings (local development)")

        return instance


# Module-level singleton - maintains existing interface
settings = Settings.with_config(find_config_file())

```

--------------------------------------------------------------------------------
/tests/services/api/test_api_client.py:
--------------------------------------------------------------------------------

```python
import httpx
import pytest
from unittest.mock import AsyncMock, MagicMock, patch

from supabase_mcp.clients.management_client import ManagementAPIClient
from supabase_mcp.exceptions import APIClientError, APIConnectionError
from supabase_mcp.settings import Settings


@pytest.mark.asyncio(loop_scope="module")
class TestAPIClient:
    """Unit tests for the API client."""

    @pytest.fixture
    def mock_settings(self):
        """Create mock settings for testing."""
        settings = MagicMock(spec=Settings)
        settings.supabase_access_token = "test-token"
        settings.supabase_project_ref = "test-project-ref"
        settings.supabase_region = "us-east-1"
        settings.query_api_url = "https://api.test.com"
        settings.supabase_api_url = "https://api.supabase.com"
        return settings

    async def test_execute_get_request(self, mock_settings):
        """Test executing a GET request to the API."""
        # Create client but don't mock the httpx client yet
        client = ManagementAPIClient(settings=mock_settings)
        
        # Setup mock response
        mock_response = MagicMock(spec=httpx.Response)
        mock_response.status_code = 404
        mock_response.is_success = False
        mock_response.headers = {"content-type": "application/json"}
        mock_response.json.return_value = {"message": "Cannot GET /v1/health"}
        mock_response.text = '{"message": "Cannot GET /v1/health"}'
        mock_response.content = b'{"message": "Cannot GET /v1/health"}'
        
        # Mock the send_request method to return our mock response
        with patch.object(client, 'send_request', return_value=mock_response):
            path = "/v1/health"
            
            # Execute the request and expect a 404 error
            with pytest.raises(APIClientError) as exc_info:
                await client.execute_request(
                    method="GET",
                    path=path,
                )
            
            # Verify the error details
            assert exc_info.value.status_code == 404
            assert "Cannot GET /v1/health" in str(exc_info.value)

    async def test_request_preparation(self, mock_settings):
        """Test that requests are properly prepared with headers and parameters."""
        client = ManagementAPIClient(settings=mock_settings)
        
        # Prepare a request with parameters
        method = "GET"
        path = "/v1/health"
        request_params = {"param1": "value1", "param2": "value2"}

        # Prepare the request
        request = client.prepare_request(
            method=method,
            path=path,
            request_params=request_params,
        )

        # Verify the request
        assert request.method == method
        assert path in str(request.url)
        assert "param1=value1" in str(request.url)
        assert "param2=value2" in str(request.url)
        assert "Content-Type" in request.headers
        assert request.headers["Content-Type"] == "application/json"

    async def test_error_handling(self, mock_settings):
        """Test handling of API errors."""
        client = ManagementAPIClient(settings=mock_settings)
        
        # Setup mock response
        mock_response = MagicMock(spec=httpx.Response)
        mock_response.status_code = 404
        mock_response.is_success = False
        mock_response.headers = {"content-type": "application/json"}
        mock_response.json.return_value = {"message": "Cannot GET /v1/nonexistent-endpoint"}
        mock_response.text = '{"message": "Cannot GET /v1/nonexistent-endpoint"}'
        mock_response.content = b'{"message": "Cannot GET /v1/nonexistent-endpoint"}'
        
        with patch.object(client, 'send_request', return_value=mock_response):
            path = "/v1/nonexistent-endpoint"
            
            # Execute the request and expect an APIClientError
            with pytest.raises(APIClientError) as exc_info:
                await client.execute_request(
                    method="GET",
                    path=path,
                )
            
            # Verify the error details
            assert exc_info.value.status_code == 404
            assert "Cannot GET /v1/nonexistent-endpoint" in str(exc_info.value)

    async def test_request_with_body(self, mock_settings):
        """Test executing a request with a body."""
        client = ManagementAPIClient(settings=mock_settings)
        
        # Test the request preparation
        method = "POST"
        path = "/v1/health/check"
        request_body = {"test": "data", "nested": {"value": 123}}

        # Prepare the request
        request = client.prepare_request(
            method=method,
            path=path,
            request_body=request_body,
        )

        # Verify the request
        assert request.method == method
        assert path in str(request.url)
        assert request.content  # Should have content for the body
        assert "Content-Type" in request.headers
        assert request.headers["Content-Type"] == "application/json"

    async def test_response_parsing(self, mock_settings):
        """Test parsing API responses."""
        client = ManagementAPIClient(settings=mock_settings)
        
        # Setup mock response
        mock_response = MagicMock(spec=httpx.Response)
        mock_response.status_code = 200
        mock_response.is_success = True
        mock_response.headers = {"content-type": "application/json"}
        mock_response.json.return_value = [{"id": "project1", "name": "Test Project"}]
        mock_response.content = b'[{"id": "project1", "name": "Test Project"}]'
        
        with patch.object(client, 'send_request', return_value=mock_response):
            path = "/v1/projects"
            
            # Execute the request
            response = await client.execute_request(
                method="GET",
                path=path,
            )
            
            # Verify the response is parsed correctly
            assert isinstance(response, list)
            assert len(response) > 0
            assert "id" in response[0]

    async def test_request_retry_mechanism(self, mock_settings):
        """Test that the tenacity retry mechanism works correctly for API requests."""
        client = ManagementAPIClient(settings=mock_settings)
        
        # Create a mock request object for the NetworkError
        mock_request = MagicMock(spec=httpx.Request)
        mock_request.method = "GET"
        mock_request.url = "https://api.supabase.com/v1/projects"
        
        # Mock the client's send method to always raise a network error
        with patch.object(client.client, 'send', side_effect=httpx.NetworkError("Simulated network failure", request=mock_request)):
            # Execute a request - this should trigger retries and eventually fail
            with pytest.raises(APIConnectionError) as exc_info:
                await client.execute_request(
                    method="GET",
                    path="/v1/projects",
                )
            
            # Verify the error message indicates retries were attempted
            assert "Network error after 3 retry attempts" in str(exc_info.value)

    async def test_request_without_access_token(self, mock_settings):
        """Test that an exception is raised when attempting to send a request without an access token."""
        # Create client with no access token
        mock_settings.supabase_access_token = None
        client = ManagementAPIClient(settings=mock_settings)

        # Attempt to execute a request - should raise an exception
        with pytest.raises(APIClientError) as exc_info:
            await client.execute_request(
                method="GET",
                path="/v1/projects",
            )
        
        assert "Supabase access token is not configured" in str(exc_info.value)
```

--------------------------------------------------------------------------------
/supabase_mcp/clients/base_http_client.py:
--------------------------------------------------------------------------------

```python
from abc import ABC, abstractmethod
from json.decoder import JSONDecodeError
from typing import Any, TypeVar

import httpx
from pydantic import BaseModel
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential

from supabase_mcp.exceptions import (
    APIClientError,
    APIConnectionError,
    APIResponseError,
    APIServerError,
    UnexpectedError,
)
from supabase_mcp.logger import logger

T = TypeVar("T")


# Helper function for retry decorator to safely log exceptions
def log_retry_attempt(retry_state: RetryCallState) -> None:
    """Log retry attempts with exception details if available."""
    exception = retry_state.outcome.exception() if retry_state.outcome and retry_state.outcome.failed else None
    exception_str = str(exception) if exception else "Unknown error"
    logger.warning(f"Network error, retrying ({retry_state.attempt_number}/3): {exception_str}")


class AsyncHTTPClient(ABC):
    """Abstract base class for async HTTP clients."""

    @abstractmethod
    async def _ensure_client(self) -> httpx.AsyncClient:
        """Ensure client exists and is ready for use.

        Creates the client if it doesn't exist yet.
        Returns the client instance.
        """
        pass

    @abstractmethod
    async def close(self) -> None:
        """Close the client and release resources.

        Should be called when the client is no longer needed.
        """
        pass

    def prepare_request(
        self,
        client: httpx.AsyncClient,
        method: str,
        path: str,
        request_params: dict[str, Any] | None = None,
        request_body: dict[str, Any] | None = None,
    ) -> httpx.Request:
        """
        Prepare an HTTP request.

        Args:
            client: The httpx client to use
            method: HTTP method (GET, POST, etc.)
            path: API path
            request_params: Query parameters
            request_body: Request body

        Returns:
            Prepared httpx.Request object

        Raises:
            APIClientError: If request preparation fails
        """
        try:
            return client.build_request(method=method, url=path, params=request_params, json=request_body)
        except Exception as e:
            raise APIClientError(
                message=f"Failed to build request: {str(e)}",
                status_code=None,
            ) from e

    @retry(
        retry=retry_if_exception_type(httpx.NetworkError),  # This includes ConnectError and TimeoutException
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10),
        reraise=True,  # Ensure the original exception is raised
        before_sleep=log_retry_attempt,
    )
    async def send_request(self, client: httpx.AsyncClient, request: httpx.Request) -> httpx.Response:
        """
        Send an HTTP request with retry logic for transient errors.

        Args:
            client: The httpx client to use
            request: Prepared httpx.Request object

        Returns:
            httpx.Response object

        Raises:
            APIConnectionError: For connection issues
            APIClientError: For other request errors
        """
        try:
            return await client.send(request)
        except httpx.NetworkError as e:
            # All NetworkErrors will be retried by the decorator
            # This will only be reached after all retries are exhausted
            logger.error(f"Network error after all retry attempts: {str(e)}")
            raise APIConnectionError(
                message=f"Network error after 3 retry attempts: {str(e)}",
                status_code=None,
            ) from e
        except Exception as e:
            # Other exceptions won't be retried
            raise APIClientError(
                message=f"Request failed: {str(e)}",
                status_code=None,
            ) from e

    def parse_response(self, response: httpx.Response) -> dict[str, Any]:
        """
        Parse an HTTP response as JSON.

        Args:
            response: httpx.Response object

        Returns:
            Parsed response body as dictionary

        Raises:
            APIResponseError: If response cannot be parsed as JSON
        """
        if not response.content:
            return {}

        try:
            return response.json()
        except JSONDecodeError as e:
            raise APIResponseError(
                message=f"Failed to parse response as JSON: {str(e)}",
                status_code=response.status_code,
                response_body={"raw_content": response.text},
            ) from e

    def handle_error_response(self, response: httpx.Response, parsed_body: dict[str, Any] | None = None) -> None:
        """
        Handle error responses based on status code.

        Args:
            response: httpx.Response object
            parsed_body: Parsed response body if available

        Raises:
            APIClientError: For client errors (4xx)
            APIServerError: For server errors (5xx)
            UnexpectedError: For unexpected status codes
        """
        # Extract error message
        error_message = f"API request failed: {response.status_code}"
        if parsed_body and "message" in parsed_body:
            error_message = parsed_body["message"]

        # Determine error type based on status code
        if 400 <= response.status_code < 500:
            raise APIClientError(
                message=error_message,
                status_code=response.status_code,
                response_body=parsed_body,
            )
        elif response.status_code >= 500:
            raise APIServerError(
                message=error_message,
                status_code=response.status_code,
                response_body=parsed_body,
            )
        else:
            # This should not happen, but just in case
            raise UnexpectedError(
                message=f"Unexpected status code: {response.status_code}",
                status_code=response.status_code,
                response_body=parsed_body,
            )

    async def execute_request(
        self,
        method: str,
        path: str,
        request_params: dict[str, Any] | None = None,
        request_body: dict[str, Any] | None = None,
    ) -> dict[str, Any] | BaseModel:
        """
        Execute an HTTP request.

        Args:
            method: HTTP method (GET, POST, etc.)
            path: API path
            request_params: Query parameters
            request_body: Request body

        Returns:
            API response as a dictionary

        Raises:
            APIClientError: For client errors (4xx)
            APIConnectionError: For connection issues
            APIResponseError: For response parsing errors
            UnexpectedError: For unexpected errors
        """
        # Log detailed request information
        logger.info(f"API Client: Executing {method} request to {path}")
        if request_params:
            logger.debug(f"Request params: {request_params}")
        if request_body:
            logger.debug(f"Request body: {request_body}")

        # Get client
        client = await self._ensure_client()

        # Prepare request
        request = self.prepare_request(client, method, path, request_params, request_body)

        # Send request
        response = await self.send_request(client, request)

        # Parse response (for both success and error cases)
        parsed_body = self.parse_response(response)

        # Check if successful
        if not response.is_success:
            logger.warning(f"Request failed: {method} {path} - Status {response.status_code}")
            self.handle_error_response(response, parsed_body)

        # Log success and return
        logger.info(f"Request successful: {method} {path} - Status {response.status_code}")
        return parsed_body

```

--------------------------------------------------------------------------------
/tests/services/logs/test_log_manager.py:
--------------------------------------------------------------------------------

```python
from unittest.mock import patch

import pytest

from supabase_mcp.services.database.sql.loader import SQLLoader
from supabase_mcp.services.logs.log_manager import LogManager


class TestLogManager:
    """Tests for the LogManager class."""

    def test_init(self):
        """Test initialization of LogManager."""
        log_manager = LogManager()
        assert isinstance(log_manager.sql_loader, SQLLoader)
        assert log_manager.COLLECTION_TO_TABLE["postgres"] == "postgres_logs"
        assert log_manager.COLLECTION_TO_TABLE["api_gateway"] == "edge_logs"
        assert log_manager.COLLECTION_TO_TABLE["edge_functions"] == "function_edge_logs"

    @pytest.mark.parametrize(
        "collection,hours_ago,filters,search,expected_clause",
        [
            # Test with hours_ago only
            (
                "postgres",
                24,
                None,
                None,
                "WHERE postgres_logs.timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)",
            ),
            # Test with search only
            (
                "auth",
                None,
                None,
                "error",
                "WHERE event_message LIKE '%error%'",
            ),
            # Test with filters only
            (
                "api_gateway",
                None,
                [{"field": "status_code", "operator": "=", "value": 500}],
                None,
                "WHERE status_code = 500",
            ),
            # Test with string value in filters
            (
                "api_gateway",
                None,
                [{"field": "method", "operator": "=", "value": "GET"}],
                None,
                "WHERE method = 'GET'",
            ),
            # Test with multiple filters
            (
                "postgres",
                None,
                [
                    {"field": "parsed.error_severity", "operator": "=", "value": "ERROR"},
                    {"field": "parsed.application_name", "operator": "LIKE", "value": "app%"},
                ],
                None,
                "WHERE parsed.error_severity = 'ERROR' AND parsed.application_name LIKE 'app%'",
            ),
            # Test with hours_ago and search
            (
                "storage",
                12,
                None,
                "upload",
                "WHERE storage_logs.timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 12 HOUR) AND event_message LIKE '%upload%'",
            ),
            # Test with all parameters
            (
                "edge_functions",
                6,
                [{"field": "response.status_code", "operator": ">", "value": 400}],
                "timeout",
                "WHERE function_edge_logs.timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 6 HOUR) AND event_message LIKE '%timeout%' AND response.status_code > 400",
            ),
            # Test with cron logs (special case)
            (
                "cron",
                24,
                None,
                None,
                "AND postgres_logs.timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)",
            ),
            # Test with cron logs and other parameters
            (
                "cron",
                12,
                [{"field": "parsed.error_severity", "operator": "=", "value": "ERROR"}],
                "failed",
                "AND postgres_logs.timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 12 HOUR) AND event_message LIKE '%failed%' AND parsed.error_severity = 'ERROR'",
            ),
        ],
    )
    def test_build_where_clause(self, collection, hours_ago, filters, search, expected_clause):
        """Test building WHERE clauses for different scenarios."""
        log_manager = LogManager()
        where_clause = log_manager._build_where_clause(
            collection=collection, hours_ago=hours_ago, filters=filters, search=search
        )
        assert where_clause == expected_clause

    def test_build_where_clause_escapes_single_quotes(self):
        """Test that single quotes in search strings are properly escaped."""
        log_manager = LogManager()
        where_clause = log_manager._build_where_clause(collection="postgres", search="O'Reilly")
        assert where_clause == "WHERE event_message LIKE '%O''Reilly%'"

        # Test with filters containing single quotes
        where_clause = log_manager._build_where_clause(
            collection="postgres",
            filters=[{"field": "parsed.query", "operator": "LIKE", "value": "SELECT * FROM O'Reilly"}],
        )
        assert where_clause == "WHERE parsed.query LIKE 'SELECT * FROM O''Reilly'"

    @patch.object(SQLLoader, "get_logs_query")
    def test_build_logs_query_with_custom_query(self, mock_get_logs_query):
        """Test building a logs query with a custom query."""
        log_manager = LogManager()
        custom_query = "SELECT * FROM postgres_logs LIMIT 10"

        query = log_manager.build_logs_query(collection="postgres", custom_query=custom_query)

        assert query == custom_query
        # Ensure get_logs_query is not called when custom_query is provided
        mock_get_logs_query.assert_not_called()

    @patch.object(LogManager, "_build_where_clause")
    @patch.object(SQLLoader, "get_logs_query")
    def test_build_logs_query_standard(self, mock_get_logs_query, mock_build_where_clause):
        """Test building a standard logs query."""
        log_manager = LogManager()
        mock_build_where_clause.return_value = "WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)"
        mock_get_logs_query.return_value = "SELECT * FROM postgres_logs WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR) LIMIT 20"

        query = log_manager.build_logs_query(
            collection="postgres",
            limit=20,
            hours_ago=24,
            filters=[{"field": "parsed.error_severity", "operator": "=", "value": "ERROR"}],
            search="connection",
        )

        mock_build_where_clause.assert_called_once_with(
            collection="postgres",
            hours_ago=24,
            filters=[{"field": "parsed.error_severity", "operator": "=", "value": "ERROR"}],
            search="connection",
        )

        mock_get_logs_query.assert_called_once_with(
            collection="postgres",
            where_clause="WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)",
            limit=20,
        )

        assert (
            query
            == "SELECT * FROM postgres_logs WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR) LIMIT 20"
        )

    @patch.object(SQLLoader, "get_logs_query")
    def test_build_logs_query_integration(self, mock_get_logs_query, sql_loader):
        """Test building a logs query with integration between components."""
        # Setup
        log_manager = LogManager()
        log_manager.sql_loader = sql_loader

        # Mock the SQL loader to return a predictable result
        mock_get_logs_query.return_value = (
            "SELECT id, postgres_logs.timestamp, event_message FROM postgres_logs "
            "WHERE postgres_logs.timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR) "
            "ORDER BY timestamp DESC LIMIT 10"
        )

        # Execute
        query = log_manager.build_logs_query(
            collection="postgres",
            limit=10,
            hours_ago=24,
        )

        # Verify
        assert "SELECT id, postgres_logs.timestamp, event_message FROM postgres_logs" in query
        assert "LIMIT 10" in query
        mock_get_logs_query.assert_called_once()

    def test_unknown_collection(self):
        """Test handling of unknown collections."""
        log_manager = LogManager()

        # Test with a collection that doesn't exist in the mapping
        where_clause = log_manager._build_where_clause(
            collection="unknown_collection",
            hours_ago=24,
        )

        # Should use the collection name as the table name
        assert (
            where_clause == "WHERE unknown_collection.timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)"
        )

```
Page 1/5FirstPrevNextLast