#
tokens: 18606/50000 15/15 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── commands
├── docker-compose.yml
├── docker-entrypoint.sh
├── Dockerfile
├── docs
│   └── developer-guide.md
├── README.md
├── requirements.txt
├── setup.py
├── src
│   ├── __init__.py
│   ├── api
│   │   └── metabase.py
│   ├── config
│   │   └── settings.py
│   ├── server
│   │   ├── mcp_server.py
│   │   └── web_interface.py
│   └── tools
│       ├── metabase_action_tools.py
│       └── metabase_tools.py
└── templates
    └── config.html
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Environment and configuration
.env
.env.*
!.env.example
venv/
env/
ENV/
.venv
.python-version

# Python cache files
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg

# Docker
.docker/
docker-compose.override.yml

# IDE specific files
.idea/
.vscode/
*.swp
*.swo
.DS_Store
.project
.classpath
.settings/
*.sublime-workspace
*.sublime-project

# Logs
logs/
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*

# Testing
.coverage
htmlcov/
.pytest_cache/
.tox/
.nox/
coverage.xml
*.cover
.hypothesis/

# Metabase specific
metabase-data/

# Temporary files
tmp/
temp/

# Jupyter Notebooks
.ipynb_checkpoints
*.ipynb

# Local development
local_settings.py
db.sqlite3
db.sqlite3-journal
```

This `.gitignore` file:

1. Excludes environment files (`.env`) that might contain sensitive API keys
2. Ignores Python cache files and build artifacts
3. Excludes virtual environment directories
4. Ignores IDE-specific files and directories
5. Excludes logs and testing artifacts
6. Ignores any Metabase data directories
7. Excludes temporary files and Jupyter notebooks

You can create this file in the root directory of your project. If you need to make any adjustments based on your specific setup, feel free to modify it.
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Metabase MCP Server

A Model Control Protocol (MCP) server that enables AI assistants to interact with Metabase databases and actions.

![Metabase MCP Server]

## Overview

The Metabase MCP Server provides a bridge between AI assistants and Metabase, allowing AI models to:

- List and explore databases configured in Metabase
- Retrieve detailed metadata about database schemas, tables, and fields
- Visualize relationships between tables in a database
- List and execute Metabase actions
- Perform operations on Metabase data through a secure API

This server implements the [Model Control Protocol (MCP)] specification, making it compatible with AI assistants that support MCP tools.

## Features

- **Database Exploration**: List all databases and explore their schemas
- **Metadata Retrieval**: Get detailed information about tables, fields, and relationships
- **Relationship Visualization**: Generate visual representations of database relationships
- **Action Management**: List, view details, and execute Metabase actions
- **Secure API Key Handling**: Store API keys encrypted and prevent exposure
- **Web Interface**: Test and debug functionality through a user-friendly web interface
- **Docker Support**: Easy deployment with Docker and Docker Compose

## Prerequisites

- Metabase instance (v0.46.0 or higher recommended)
- Metabase API key with appropriate permissions
- Docker (for containerized deployment)
- Python 3.10+ (for local development)

## Installation

### Using Docker (Recommended)

1. Clone this repository:
   ```bash
   git clone https://github.com/yourusername/metabase-mcp.git
   cd metabase-mcp
   ```

2. Build and run the Docker container:
   ```bash
   docker-compose up -d
   ```

3. Access the configuration interface at http://localhost:5001

### Manual Installation

1. Clone this repository:
   ```bash
   git clone https://github.com/yourusername/metabase-mcp.git
   cd metabase-mcp
   ```

2. Install dependencies:
   ```bash
   pip install -r requirements.txt
   ```

3. Run the configuration interface:
   ```bash
   python -m src.server.web_interface
   ```

4. Access the configuration interface at http://localhost:5000

## Configuration

1. Open the web interface in your browser
2. Enter your Metabase URL (e.g., http://localhost:3000)
3. Enter your Metabase API key
4. Click "Save Configuration" and test the connection

### Obtaining a Metabase API Key

1. Log in to your Metabase instance as an administrator
2. Go to Settings > Admin settings > API Keys
3. Create a new API key with appropriate permissions
4. Copy the generated key for use in the MCP server

## Usage

### Running the MCP Server

After configuration, you can run the MCP server:

```bash
# Using Docker
docker run -p 5001:5000 metabase-mcp

# Manually
python -m src.server.mcp_server
```

### Available Tools

The MCP server provides the following tools to AI assistants:

1. **list_databases**: List all databases configured in Metabase
2. **get_database_metadata**: Get detailed metadata for a specific database
3. **db_overview**: Get a high-level overview of all tables in a database
4. **table_detail**: Get detailed information about a specific table
5. **visualize_database_relationships**: Generate a visual representation of database relationships
6. **run_database_query**: Execute a SQL query against a database
7. **list_actions**: List all actions configured in Metabase
8. **get_action_details**: Get detailed information about a specific action
9. **execute_action**: Execute a Metabase action with parameters

### Testing Tools via Web Interface

The web interface provides a testing area for each tool:

1. **List Databases**: View all databases configured in Metabase
2. **Get Database Metadata**: View detailed schema information for a database
3. **DB Overview**: View a concise list of all tables in a database
4. **Table Detail**: View detailed information about a specific table
5. **Visualize Database Relationships**: Generate a visual representation of table relationships
6. **Run Query**: Execute SQL queries against databases
7. **List Actions**: View all actions configured in Metabase
8. **Get Action Details**: View detailed information about a specific action
9. **Execute Action**: Test executing an action with parameters

## Security Considerations

- API keys are stored encrypted at rest
- The web interface never displays API keys in plain text
- All API requests use HTTPS when configured with a secure Metabase URL
- The server should be deployed behind a secure proxy in production environments

## Development

### Project Structure

```
metabase-mcp/
├── src/
│   ├── api/            # Metabase API client
│   ├── config/         # Configuration management
│   ├── server/         # MCP and web servers
│   └── tools/          # Tool implementations
├── templates/          # Web interface templates
├── docker-compose.yml  # Docker Compose configuration
├── Dockerfile          # Docker build configuration
├── requirements.txt    # Python dependencies
└── README.md           # Documentation
```

### Adding New Tools

To add a new tool:

1. Implement the tool function in `src/tools/`
2. Register the tool in `src/server/mcp_server.py`
3. Add a testing interface in `templates/config.html` (optional)
4. Add a route in `src/server/web_interface.py` (if adding a testing interface)

## Troubleshooting

### Common Issues

- **Connection Failed**: Ensure your Metabase URL is correct and accessible
- **Authentication Error**: Verify your API key has the necessary permissions
- **Docker Network Issues**: When using Docker, ensure proper network configuration

### Logs

Check the logs for detailed error information:

```bash
# Docker logs
docker logs metabase-mcp

# Manual execution logs
# Logs are printed to the console
```

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.



```

--------------------------------------------------------------------------------
/src/__init__.py:
--------------------------------------------------------------------------------

```python
# This file can be empty or contain package-level imports 
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
mcp>=1.2.0
httpx>=0.24.0
flask[async]>=3.1.0
python-dotenv>=0.19.0
cryptography>=41.0.0 
```

--------------------------------------------------------------------------------
/docker-compose.yml:
--------------------------------------------------------------------------------

```yaml
version: '3'

services:
  metabase-mcp:
    build: .
    ports:
      - "5001:5000"
    volumes:
      - ./data:/app/data
    environment:
      - METABASE_URL=http://host.docker.internal:3000
      - FLASK_DEBUG=False
    command: config 
```

--------------------------------------------------------------------------------
/setup.py:
--------------------------------------------------------------------------------

```python
from setuptools import setup, find_packages

setup(
    name="metabase-mcp",
    version="0.1.0",
    packages=find_packages(),
    install_requires=[
        "mcp>=1.2.0",
        "httpx>=0.24.0",
        "flask[async]>=3.1.0",
        "python-dotenv>=0.19.0",
    ],
    python_requires=">=3.8",
) 
```

--------------------------------------------------------------------------------
/docker-entrypoint.sh:
--------------------------------------------------------------------------------

```bash
#!/bin/bash

# Default behavior: Start both MCP server and Web Interface
echo "Starting MCP server in background..."
python -m src.server.mcp_server &
MCP_PID=$! # Get the Process ID of the backgrounded MCP server

echo "Starting Web Interface in foreground..."
# The FLASK_PORT environment variable should be set (e.g., in your .env file or Dockerfile)
# to the port exposed in the Dockerfile (5000).
# The Flask app in web_interface.py must be configured to bind to 0.0.0.0.
exec python -m src.server.web_interface

# Optional: A more robust script might wait for the MCP_PID to exit
# and handle cleanup, but for now, this keeps the web interface as the main process. 
```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM python:3.10-slim

WORKDIR /app

# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Install cryptography package
RUN pip install --no-cache-dir cryptography

# Copy application code
COPY src/ ./src/
COPY templates/ ./templates/
COPY setup.py .

# Install the package in development mode
RUN pip install -e .

# Expose port for web interface
EXPOSE 5000

# Set environment variables
ENV METABASE_URL=http://localhost:3000
ENV METABASE_API_KEY=""
ENV PYTHONUNBUFFERED=1

# Use entrypoint script to allow different commands
COPY docker-entrypoint.sh .
RUN chmod +x docker-entrypoint.sh

ENTRYPOINT ["/app/docker-entrypoint.sh"] 
```

--------------------------------------------------------------------------------
/src/server/mcp_server.py:
--------------------------------------------------------------------------------

```python
from mcp.server.fastmcp import FastMCP
from src.config.settings import Config
from src.tools.metabase_tools import list_databases, get_database_metadata, db_overview, table_detail, visualize_database_relationships, run_database_query
from src.tools.metabase_action_tools import list_actions, get_action_details, execute_action

def create_mcp_server():
    """Create and configure an MCP server instance."""
    mcp = FastMCP(Config.MCP_NAME)
    
    # Register database tools
    mcp.tool(
        description="List all databases configured in Metabase"
    )(list_databases)
    
    mcp.tool(
        description="Get detailed metadata for a specific database"
    )(get_database_metadata)
    
    mcp.tool(
        description="Get a high-level overview of all tables in a database"
    )(db_overview)

    mcp.tool(
        description="Get detailed information about a specific table"
    )(table_detail)

    mcp.tool(
        description="Generate a visual representation of database relationships"
    )(visualize_database_relationships)

    mcp.tool(
        description="Run a read-only SQL query against a database"
    )(run_database_query)

    # Register action tools
    mcp.tool(
        description="List all actions configured in Metabase"
    )(list_actions)

    mcp.tool(
        description="Get detailed information about a specific action"
    )(get_action_details)

    mcp.tool(
        description="Execute a Metabase action with parameters"
    )(execute_action)
    
    return mcp

def run_mcp_server():
    """Run the MCP server"""
    mcp = create_mcp_server()
    mcp.run(transport='stdio')

if __name__ == "__main__":
    run_mcp_server() 
```

--------------------------------------------------------------------------------
/src/config/settings.py:
--------------------------------------------------------------------------------

```python
import os
import base64
from cryptography.fernet import Fernet
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Configuration class
class Config:
    # Secret key for encryption (generate once and store securely)
    # In production, this should be set as an environment variable
    SECRET_KEY = os.environ.get("SECRET_KEY") or Fernet.generate_key().decode()
    
    # Metabase settings
    METABASE_URL = os.environ.get("METABASE_URL", "http://localhost:3000")
    _METABASE_API_KEY = os.environ.get("METABASE_API_KEY", "")
    
    # Flask settings
    FLASK_DEBUG = os.environ.get("FLASK_DEBUG", "False").lower() == "true"
    FLASK_HOST = os.environ.get("FLASK_HOST", "0.0.0.0")
    FLASK_PORT = int(os.environ.get("FLASK_PORT", "5000"))
    
    # MCP settings
    MCP_NAME = os.environ.get("MCP_NAME", "metabase")
    
    # File paths
    CONFIG_FILE = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), '.env')
    TEMPLATE_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'templates')
    
    @classmethod
    def encrypt_api_key(cls, api_key):
        """Encrypt the API key"""
        if not api_key:
            return ""
        
        cipher_suite = Fernet(cls.SECRET_KEY.encode())
        encrypted_key = cipher_suite.encrypt(api_key.encode())
        return base64.urlsafe_b64encode(encrypted_key).decode()
    
    @classmethod
    def decrypt_api_key(cls, encrypted_key):
        """Decrypt the API key"""
        if not encrypted_key:
            return ""
        
        try:
            cipher_suite = Fernet(cls.SECRET_KEY.encode())
            decoded = base64.urlsafe_b64decode(encrypted_key.encode())
            decrypted_key = cipher_suite.decrypt(decoded)
            return decrypted_key.decode()
        except Exception:
            # If decryption fails, return empty string
            return ""
    
    @classmethod
    def save_metabase_config(cls, metabase_url, api_key):
        """Save Metabase configuration to .env file"""
        # Encrypt the API key before saving
        encrypted_key = cls.encrypt_api_key(api_key)
        
        with open(cls.CONFIG_FILE, 'w') as f:
            f.write(f"METABASE_URL={metabase_url}\n")
            f.write(f"METABASE_API_KEY={encrypted_key}\n")
            f.write(f"SECRET_KEY={cls.SECRET_KEY}\n")
        
        # Update current environment
        os.environ['METABASE_URL'] = metabase_url
        os.environ['METABASE_API_KEY'] = encrypted_key
        
        # Update class attributes
        cls.METABASE_URL = metabase_url
        cls._METABASE_API_KEY = encrypted_key

    @classmethod
    def get_metabase_url(cls):
        """Get the current Metabase URL, refreshing from environment if needed"""
        cls.METABASE_URL = os.environ.get("METABASE_URL", cls.METABASE_URL)
        return cls.METABASE_URL
    
    @classmethod
    def get_metabase_api_key(cls):
        """Get the current Metabase API key, decrypting it first"""
        encrypted_key = os.environ.get("METABASE_API_KEY", cls._METABASE_API_KEY)
        return cls.decrypt_api_key(encrypted_key) 
```

--------------------------------------------------------------------------------
/src/tools/metabase_action_tools.py:
--------------------------------------------------------------------------------

```python
from src.api.metabase import MetabaseAPI
from typing import Dict, Any

async def list_actions() -> str:
    """
    List all actions configured in Metabase.
    
    Returns:
        A formatted string with information about all configured actions.
    """
    # Check Metabase version
    version_info = await MetabaseAPI.get_request("version")
    version = "unknown"
    if version_info and not "error" in version_info:
        version = version_info.get("version", "unknown")
    
    response = await MetabaseAPI.get_actions()
    
    if response is None or "error" in response:
        error_message = response.get('message', 'Unknown error') if response else 'No response'
        return f"Error fetching actions: {error_message}"
    
    if not response:
        return "No actions found in Metabase. You may need to create some actions first."
    
    result = "## Actions in Metabase\n\n"
    for action in response:
        result += f"- **ID**: {action.get('id')}\n"
        result += f"  **Name**: {action.get('name')}\n"
        result += f"  **Type**: {action.get('type')}\n"
        result += f"  **Model ID**: {action.get('model_id')}\n"
        result += f"  **Created At**: {action.get('created_at')}\n\n"
    
    return result

async def get_action_details(action_id: int) -> str:
    """
    Get detailed information about a specific action.
    
    Args:
        action_id: The ID of the action to fetch
        
    Returns:
        A formatted string with the action's details.
    """
    response = await MetabaseAPI.get_request(f"action/{action_id}")
    
    if response is None or "error" in response:
        return f"Error fetching action details: {response.get('message', 'Unknown error')}"
    
    result = f"## Action: {response.get('name')}\n\n"
    result += f"**ID**: {response.get('id')}\n"
    result += f"**Type**: {response.get('type')}\n"
    result += f"**Model ID**: {response.get('model_id')}\n"
    result += f"**Database ID**: {response.get('database_id')}\n"
    result += f"**Created At**: {response.get('created_at')}\n\n"
    
    # Add parameters if available
    parameters = response.get('parameters', [])
    if parameters:
        result += f"### Parameters ({len(parameters)})\n\n"
        for param in parameters:
            result += f"- **{param.get('id')}**: {param.get('name')}\n"
            result += f"  - Type: {param.get('type')}\n"
            if param.get('required'):
                result += f"  - Required: {param.get('required')}\n"
            if param.get('default'):
                result += f"  - Default: {param.get('default')}\n"
        result += "\n"
    
    return result

async def execute_action(action_id: int, parameters: Dict[str, Any] = None) -> str:
    """
    Execute a Metabase action with the provided parameters.
    
    Args:
        action_id: The ID of the action to execute
        parameters: Dictionary of parameter values to use when executing the action
        
    Returns:
        A formatted string with the execution results.
    """
    if parameters is None:
        parameters = {}
    
    # First, verify the action exists
    action_details = await MetabaseAPI.get_request(f"action/{action_id}")
    if "error" in action_details:
        return f"Error: Action with ID {action_id} not found. {action_details.get('message', '')}"
    
    # Execute the action
    response = await MetabaseAPI.make_request(
        f"action/{action_id}/execute", 
        method="POST",
        data={"parameters": parameters}
    )
    
    if response is None or "error" in response:
        error_msg = response.get('message', 'Unknown error') if response else 'No response'
        return f"Error executing action: {error_msg}"
    
    # Format the successful response
    result = f"## Action Execution Results for '{action_details.get('name')}'\n\n"
    
    # Format the response based on what was returned
    if isinstance(response, dict):
        for key, value in response.items():
            result += f"**{key}**: {value}\n"
    elif isinstance(response, list):
        result += f"Returned {len(response)} rows\n\n"
        if response and len(response) > 0:
            # Get keys from first item
            keys = response[0].keys()
            # Create table header
            result += "| " + " | ".join(keys) + " |\n"
            result += "| " + " | ".join(["---" for _ in keys]) + " |\n"
            # Add rows
            for row in response[:10]:  # Limit to first 10 rows
                result += "| " + " | ".join([str(row.get(k, "")) for k in keys]) + " |\n"
            
            if len(response) > 10:
                result += "\n_Showing first 10 rows of results_\n"
    else:
        result += f"Result: {response}\n"
    
    return result

async def check_actions_enabled() -> bool:
    """Check if actions are enabled in this Metabase instance"""
    # Check settings endpoint to see if actions are enabled
    settings = await MetabaseAPI.get_request("setting")
    
    if settings and not "error" in settings:
        for setting in settings:
            if setting.get("key") == "enable-actions" or setting.get("key") == "actions-enabled":
                return setting.get("value") == "true" or setting.get("value") == True
    
    # If we can't determine from settings, try to fetch actions as a test
    actions = await MetabaseAPI.get_actions()
    return not (actions is None or "error" in actions) 
```

--------------------------------------------------------------------------------
/src/api/metabase.py:
--------------------------------------------------------------------------------

```python
import httpx
from typing import Dict, Any, Optional
from src.config.settings import Config

class MetabaseAPI:
    """Class for interacting with the Metabase API"""
    
    @staticmethod
    async def make_request(endpoint: str, method: str = "GET", data: Optional[Dict] = None) -> Any:
        """Make a request to the Metabase API with proper error handling.
        
        Args:
            endpoint: API endpoint to call (without the base URL)
            method: HTTP method to use (GET, POST, etc.)
            data: Optional JSON data to send with the request
            
        Returns:
            JSON response from the API or error dict
        """
        # Get fresh values from config using the getter methods
        metabase_url = Config.get_metabase_url()
        api_key = Config.get_metabase_api_key()
        
        headers = {
            "x-api-key": api_key,
            "Content-Type": "application/json"
        }
        
        url = f"{metabase_url}/api/{endpoint.lstrip('/')}"
        print(f"Making request to: {url}")  # Debugging
        
        async with httpx.AsyncClient() as client:
            try:
                if method == "GET":
                    response = await client.get(url, headers=headers, timeout=30.0)
                elif method == "POST":
                    response = await client.post(url, headers=headers, json=data, timeout=30.0)
                elif method == "PUT":
                    response = await client.put(url, headers=headers, json=data, timeout=30.0)
                elif method == "DELETE":
                    response = await client.delete(url, headers=headers, timeout=30.0)
                else:
                    return {"error": f"Unsupported HTTP method: {method}"}
                
                response.raise_for_status()
                
                # Try to parse as JSON, but handle non-JSON responses
                try:
                    return response.json()
                except ValueError:
                    # If response is not JSON, return as error with the text content
                    return {"error": "Non-JSON response", "message": response.text}
                
            except httpx.HTTPStatusError as e:
                # Try to get JSON error response
                try:
                    error_json = e.response.json()
                    return {"error": f"HTTP error: {e.response.status_code}", "message": str(error_json)}
                except ValueError:
                    # If error is not JSON, return the text
                    return {"error": f"HTTP error: {e.response.status_code}", "message": e.response.text}
            except Exception as e:
                return {"error": "Failed to make request", "message": str(e)}
    
    @classmethod
    async def get_request(cls, endpoint: str) -> Any:
        """Shorthand for GET requests"""
        return await cls.make_request(endpoint, method="GET")
    
    @classmethod
    async def post_request(cls, endpoint: str, data: Dict) -> Any:
        """Shorthand for POST requests"""
        return await cls.make_request(endpoint, method="POST", data=data)
    
    @classmethod
    async def test_connection(cls) -> tuple:
        """Test connection to Metabase API"""
        try:
            response = await cls.get_request("database")
            if response is None or "error" in response:
                return False, f"Connection failed: {response.get('message', 'Unknown error')}"
            return True, "Connection successful!"
        except Exception as e:
            return False, f"Connection failed: {str(e)}"
    
    @classmethod
    async def get_databases(cls):
        """Get list of all databases"""
        response = await cls.get_request("database")
        
        # Handle case where response might be a string
        if isinstance(response, str):
            return {"error": "Unexpected string response", "message": response}
        
        # Check if response is a dictionary with a 'data' key (new Metabase API format)
        if isinstance(response, dict) and 'data' in response:
            print(f"Found 'data' key in response with {len(response['data'])} databases")
            return response['data']  # Return just the list of databases
        
        return response
    
    @classmethod
    async def get_database_metadata(cls, database_id: int):
        """Get metadata for a specific database"""
        return await cls.get_request(f"database/{database_id}/metadata")
    
    @classmethod
    async def get_actions(cls):
        """Get list of all actions with support for different Metabase versions"""
        # Try the standard endpoint first
        response = await cls.get_request("action")
        
        # If that fails, try alternative endpoints
        if response is None or "error" in response:
            # Try the legacy endpoint (some older Metabase versions)
            response_legacy = await cls.get_request("api/action")
            if response_legacy and not "error" in response_legacy:
                return response_legacy
            
            # If all attempts failed, return the original error
            return response
        
        return response
    
    @classmethod
    async def get_action(cls, action_id: int):
        """Get details for a specific action"""
        return await cls.get_request(f"action/{action_id}")
    
    @classmethod
    async def execute_action(cls, action_id: int, parameters: Dict = None):
        """Execute an action with parameters"""
        if parameters is None:
            parameters = {}
        
        # Validate action_id
        if not isinstance(action_id, int) or action_id <= 0:
            return {"error": "Invalid action ID", "message": "Action ID must be a positive integer"}
        
        # Sanitize parameters
        sanitized_params = {}
        for key, value in parameters.items():
            sanitized_params[str(key)] = value
        
        return await cls.post_request(f"action/{action_id}/execute", {"parameters": sanitized_params})
    
    @classmethod
    async def get_table_metadata(cls, table_id: int):
        """Get detailed metadata for a specific table"""
        return await cls.get_request(f"table/{table_id}/query_metadata")
    
    @classmethod
    async def get_field_metadata(cls, field_id: int):
        """Get detailed metadata for a specific field"""
        return await cls.get_request(f"field/{field_id}")
    
    @classmethod
    async def get_database_schema(cls, database_id: int):
        """Get the database schema with relationships between tables"""
        # First get the basic metadata
        metadata = await cls.get_database_metadata(database_id)
        
        if metadata is None or "error" in metadata:
            return metadata
        
        # For each table, get detailed metadata including foreign keys
        tables = metadata.get('tables', [])
        enhanced_tables = []
        
        for table in tables:
            table_id = table.get('id')
            if table_id:
                table_details = await cls.get_table_metadata(table_id)
                if table_details and not "error" in table_details:
                    enhanced_tables.append(table_details)
                else:
                    enhanced_tables.append(table)
            else:
                enhanced_tables.append(table)
        
        # Replace tables with enhanced versions
        metadata['tables'] = enhanced_tables
        return metadata
    
    @classmethod
    async def run_query(cls, database_id: int, query_string: str, row_limit: int = 5):
        """Run a native query against a database with a row limit
        
        Args:
            database_id: The ID of the database to query
            query_string: The SQL query to execute
            row_limit: Maximum number of rows to return (default: 5)
            
        Returns:
            Query results or error message
        """
        # Remove trailing semicolons that can cause issues with Metabase API
        query_string = query_string.strip()
        if query_string.endswith(';'):
            query_string = query_string[:-1]
        
        # Ensure the query has a LIMIT clause for safety
        query_string = cls._ensure_query_limit(query_string, row_limit)
        
        # Prepare the query payload
        payload = {
            "database": database_id,
            "type": "native",
            "native": {
                "query": query_string,
                "template-tags": {}
            }
        }
        
        # Execute the query
        response = await cls.post_request("dataset", payload)
        
        # Improved error handling for Metabase error responses
        if response and isinstance(response, dict) and "error" in response:
            error_data = response.get("error")
            
            # Check for common error patterns in Metabase responses
            if isinstance(error_data, str) and "does not exist" in error_data:
                # This is likely a SQL syntax error from the database
                return {"error": "SQL Error", "message": error_data}
            
            # If the error message contains additional info
            message = response.get("message", "Unknown error")
            if isinstance(message, dict) and "data" in message:
                if "errors" in message["data"]:
                    return {"error": "SQL Error", "message": message["data"]["errors"]}
        
        return response
    
    @staticmethod
    def _ensure_query_limit(query: str, limit: int) -> str:
        """Ensure the query has a LIMIT clause
        
        This is a simple implementation and may not work for all SQL dialects
        or complex queries. It's a basic safety measure.
        """
        # Convert to uppercase for case-insensitive matching
        query_upper = query.upper()
        
        # Check if query already has a LIMIT clause
        if "LIMIT" in query_upper:
            return query
        
        # Add LIMIT clause
        return f"{query} LIMIT {limit}" 
```

--------------------------------------------------------------------------------
/src/server/web_interface.py:
--------------------------------------------------------------------------------

```python
import os
from flask import Flask, request, render_template, redirect, url_for, flash, jsonify
from src.config.settings import Config
from src.api.metabase import MetabaseAPI

def create_app():
    """Create and configure the Flask application"""
    app = Flask(__name__, template_folder=Config.TEMPLATE_DIR)
    app.secret_key = os.urandom(24)
    
    @app.route('/')
    def home():
        """Home page with configuration form"""
        # Don't pass the actual API key to the template
        # Just indicate if one is set
        api_key_set = bool(Config._METABASE_API_KEY)
        
        return render_template(
            'config.html', 
            metabase_url=Config.METABASE_URL,
            api_key="" if not api_key_set else "••••••••••••••••••••••",
            api_key_set=api_key_set
        )
    
    @app.route('/save_config', methods=['POST'])
    def update_config():
        """Save configuration from form"""
        metabase_url = request.form.get('metabase_url', '').strip()
        api_key = request.form.get('api_key', '').strip()
        
        if not metabase_url:
            flash('Metabase URL is required!')
            return redirect(url_for('home'))
        
        # Only update API key if it's changed (not the masked version)
        if "•" not in api_key:
            Config.save_metabase_config(metabase_url, api_key)
            flash('Configuration saved successfully!')
        else:
            # Only update URL if API key wasn't changed
            with open(Config.CONFIG_FILE, 'w') as f:
                f.write(f"METABASE_URL={metabase_url}\n")
                f.write(f"METABASE_API_KEY={Config._METABASE_API_KEY}\n")
                f.write(f"SECRET_KEY={Config.SECRET_KEY}\n")
            
            # Update current environment and class attribute
            os.environ['METABASE_URL'] = metabase_url
            Config.METABASE_URL = metabase_url
            flash('URL updated successfully!')
        
        return redirect(url_for('home'))
    
    @app.route('/test_connection', methods=['POST'])
    async def test_connection():
        """Test connection with current or provided credentials"""
        metabase_url = request.form.get('metabase_url', Config.METABASE_URL).strip()
        api_key = request.form.get('api_key', '').strip()
        
        # Don't use the masked version
        if "•" in api_key:
            api_key = Config.get_metabase_api_key()
        
        # Temporarily update config for testing
        old_url = Config.METABASE_URL
        old_key = Config.get_metabase_api_key()
        Config.METABASE_URL = metabase_url
        
        try:
            success, message = await MetabaseAPI.test_connection()
            
            # Restore original config if not saving
            if 'save' not in request.form:
                Config.METABASE_URL = old_url
            
            return jsonify({'success': success, 'message': message})
        except Exception as e:
            # Restore original config
            Config.METABASE_URL = old_url
            
            # Return error as JSON
            return jsonify({'success': False, 'message': f"Error: {str(e)}"})
    
    @app.route('/test_list_databases')
    async def test_list_databases():
        """Test the list_databases tool"""
        from src.tools.metabase_tools import list_databases
        
        try:
            # Log the current configuration
            print(f"Testing list_databases with URL: {Config.get_metabase_url()}")
            
            result = await list_databases()
            return jsonify({'success': True, 'result': result})
        except Exception as e:
            import traceback
            error_traceback = traceback.format_exc()
            print(f"Error in test_list_databases: {str(e)}\n{error_traceback}")
            return jsonify({'success': False, 'error': str(e)})
    
    @app.route('/test_get_metadata', methods=['POST'])
    async def test_get_metadata():
        """Test the get_database_metadata tool"""
        from src.tools.metabase_tools import get_database_metadata
        
        database_id = request.form.get('database_id')
        if not database_id or not database_id.isdigit():
            return jsonify({'success': False, 'error': 'Valid database ID is required'})
        
        try:
            result = await get_database_metadata(int(database_id))
            return jsonify({'success': True, 'result': result})
        except Exception as e:
            return jsonify({'success': False, 'error': str(e)})
    
    @app.route('/test_list_actions')
    async def test_list_actions():
        """Test the list_actions tool"""
        from src.tools.metabase_action_tools import list_actions
        
        try:
            # Get version info
            version_info = await MetabaseAPI.get_request("version")
            version = "unknown"
            if version_info and not "error" in version_info:
                version = version_info.get("version", "unknown")
            
            result = await list_actions()
            result_with_version = f"Metabase Version: {version}\n\n{result}"
            
            return jsonify({'success': True, 'result': result_with_version})
        except Exception as e:
            return jsonify({'success': False, 'error': str(e)})
    
    @app.route('/test_get_action_details', methods=['POST'])
    async def test_get_action_details():
        """Test the get_action_details tool"""
        from src.tools.metabase_action_tools import get_action_details
        
        action_id = request.form.get('action_id')
        if not action_id or not action_id.isdigit():
            return jsonify({'success': False, 'error': 'Valid action ID is required'})
        
        try:
            result = await get_action_details(int(action_id))
            return jsonify({'success': True, 'result': result})
        except Exception as e:
            return jsonify({'success': False, 'error': str(e)})
    
    @app.route('/test_execute_action', methods=['POST'])
    async def test_execute_action():
        """Test the execute_action tool"""
        from src.tools.metabase_action_tools import execute_action
        
        action_id = request.form.get('action_id')
        if not action_id or not action_id.isdigit():
            return jsonify({'success': False, 'error': 'Valid action ID is required'})
        
        # Parse parameters from form
        parameters = {}
        for key, value in request.form.items():
            if key.startswith('param_'):
                param_name = key[6:]  # Remove 'param_' prefix
                parameters[param_name] = value
        
        try:
            result = await execute_action(int(action_id), parameters)
            return jsonify({'success': True, 'result': result})
        except Exception as e:
            return jsonify({'success': False, 'error': str(e)})
    
    @app.route('/test_visualize_relationships', methods=['POST'])
    async def test_visualize_relationships():
        """Test the visualize_database_relationships tool"""
        from src.tools.metabase_tools import visualize_database_relationships
        
        database_id = request.form.get('database_id')
        if not database_id or not database_id.isdigit():
            return jsonify({'success': False, 'error': 'Valid database ID is required'})
        
        try:
            result = await visualize_database_relationships(int(database_id))
            return jsonify({'success': True, 'result': result})
        except Exception as e:
            return jsonify({'success': False, 'error': str(e)})
    
    @app.route('/test_run_query', methods=['POST'])
    async def test_run_query():
        """Test the run_database_query tool"""
        from src.tools.metabase_tools import run_database_query
        
        database_id = request.form.get('database_id')
        query = request.form.get('query')
        
        if not database_id or not database_id.isdigit():
            return jsonify({'success': False, 'error': 'Valid database ID is required'})
        
        if not query or not query.strip():
            return jsonify({'success': False, 'error': 'SQL query is required'})
        
        try:
            result = await run_database_query(int(database_id), query)
            
            # Check if the result contains an error message
            if result and isinstance(result, str) and result.startswith("Error executing query:"):
                return jsonify({'success': False, 'error': result})
            
            return jsonify({'success': True, 'result': result})
        except Exception as e:
            import traceback
            error_traceback = traceback.format_exc()
            print(f"Error in test_run_query: {str(e)}\n{error_traceback}")
            return jsonify({'success': False, 'error': str(e)})
    
    @app.route('/test_db_overview', methods=['POST'])
    async def test_db_overview():
        """Test the db_overview tool"""
        from src.tools.metabase_tools import db_overview
        
        database_id = request.form.get('database_id')
        if not database_id or not database_id.isdigit():
            return jsonify({'success': False, 'error': 'Valid database ID is required'})
        
        try:
            result = await db_overview(int(database_id))
            return jsonify({'success': True, 'result': result})
        except Exception as e:
            import traceback
            error_traceback = traceback.format_exc()
            print(f"Error in test_db_overview: {str(e)}\n{error_traceback}")
            return jsonify({'success': False, 'error': str(e)})
    
    @app.route('/test_table_detail', methods=['POST'])
    async def test_table_detail():
        """Test the table_detail tool"""
        from src.tools.metabase_tools import table_detail
        
        database_id = request.form.get('database_id')
        table_id = request.form.get('table_id')
        
        if not database_id or not database_id.isdigit():
            return jsonify({'success': False, 'error': 'Valid database ID is required'})
        
        if not table_id or not table_id.isdigit():
            return jsonify({'success': False, 'error': 'Valid table ID is required'})
        
        try:
            result = await table_detail(int(database_id), int(table_id))
            return jsonify({'success': True, 'result': result})
        except Exception as e:
            import traceback
            error_traceback = traceback.format_exc()
            print(f"Error in test_table_detail: {str(e)}\n{error_traceback}")
            return jsonify({'success': False, 'error': str(e)})
    
    return app

if __name__ == '__main__':
    app = create_app()
    port = int(os.environ.get('FLASK_PORT', 5000))
    # Use a Config attribute for debug for consistency, e.g., Config.FLASK_DEBUG
    # Ensure FLASK_DEBUG is set appropriately in your .env or config settings
    app.run(host='0.0.0.0', port=port, debug=getattr(Config, 'FLASK_DEBUG', False)) 
```

--------------------------------------------------------------------------------
/src/tools/metabase_tools.py:
--------------------------------------------------------------------------------

```python
from src.api.metabase import MetabaseAPI
from src.config.settings import Config

async def list_databases() -> str:
    """
    List all databases configured in Metabase.
    
    Returns:
        A formatted string with information about all configured databases.
    """
    response = await MetabaseAPI.get_databases()
    
    # Handle different response types
    if isinstance(response, str):
        return f"Error: Received unexpected string response: {response}"
    
    if response is None:
        return "Error: No response received from Metabase API"
    
    if isinstance(response, dict) and "error" in response:
        return f"Error fetching databases: {response.get('message', 'Unknown error')}"
    
    # If we got a dictionary without an error, try to extract databases
    if isinstance(response, dict) and not isinstance(response, list):
        # Try to find databases in common locations
        if 'data' in response:
            response = response['data']
        elif 'databases' in response:
            response = response['databases']
        elif 'results' in response:
            response = response['results']
        else:
            return f"Error: Unexpected response format: {response}"
    
    if not response:
        return "No databases found in Metabase."
    
    if not isinstance(response, list):
        return f"Error: Expected a list of databases, but got {type(response).__name__}: {response}"
    
    # Now we should have a list of databases
    result = "## Databases in Metabase\n\n"
    
    for db in response:
        # Check if each item is a dictionary
        if not isinstance(db, dict):
            result += f"- Warning: Found non-dictionary item: {db}\n\n"
            continue
            
        # Safely extract values with fallbacks
        db_id = db.get('id', 'Unknown')
        db_name = db.get('name', 'Unnamed')
        db_engine = db.get('engine', 'Unknown')
        db_created = db.get('created_at', 'Unknown')
        
        result += f"- **ID**: {db_id}\n"
        result += f"  **Name**: {db_name}\n"
        result += f"  **Engine**: {db_engine}\n"
        result += f"  **Created At**: {db_created}\n\n"
    
    return result

async def get_database_metadata(database_id: int) -> str:
    """
    Get metadata for a specific database in Metabase, including table relationships.
    
    Args:
        database_id: The ID of the database to fetch metadata for
        
    Returns:
        A formatted string with the database's metadata including tables, fields, and relationships.
    """
    response = await MetabaseAPI.get_database_schema(database_id)
    
    if response is None or "error" in response:
        return f"Error fetching database metadata: {response.get('message', 'Unknown error')}"
    
    result = f"## Metadata for Database: {response.get('name')}\n\n"
    
    # Add database details
    result += f"**ID**: {response.get('id')}\n"
    result += f"**Engine**: {response.get('engine')}\n"
    result += f"**Is Sample**: {response.get('is_sample', False)}\n\n"
    
    # Add tables information
    tables = response.get('tables', [])
    result += f"### Tables ({len(tables)})\n\n"
    
    # Create a map of table IDs to names for reference
    table_map = {table.get('id'): table.get('name') for table in tables}
    
    for table in tables:
        result += f"#### {table.get('name')}\n"
        result += f"**ID**: {table.get('id')}\n"
        result += f"**Schema**: {table.get('schema', 'N/A')}\n"
        result += f"**Description**: {table.get('description', 'No description')}\n\n"
        
        # Add fields for this table
        fields = table.get('fields', [])
        result += f"##### Fields ({len(fields)})\n\n"
        
        # Track foreign keys for relationship section
        foreign_keys = []
        
        for field in fields:
            result += f"- **{field.get('name')}**\n"
            result += f"  - Type: {field.get('base_type')}\n"
            result += f"  - Description: {field.get('description', 'No description')}\n"
            
            # Check if this is a foreign key
            fk_target_field_id = field.get('fk_target_field_id')
            if fk_target_field_id:
                foreign_keys.append({
                    'source_field': field.get('name'),
                    'source_field_id': field.get('id'),
                    'target_field_id': fk_target_field_id
                })
                result += f"  - **Foreign Key** to another table\n"
            
            if field.get('special_type'):
                result += f"  - Special Type: {field.get('special_type')}\n"
        
        # Add relationships section if there are foreign keys
        if foreign_keys:
            result += "\n##### Relationships\n\n"
            
            for fk in foreign_keys:
                # Find target field information
                target_field_info = "Unknown field"
                target_table_name = "Unknown table"
                
                # Search all tables for the target field
                for t in tables:
                    for f in t.get('fields', []):
                        if f.get('id') == fk['target_field_id']:
                            target_field_info = f.get('name')
                            target_table_name = t.get('name')
                            break
                
                result += f"- **{fk['source_field']}** → **{target_table_name}.{target_field_info}**\n"
        
        result += "\n"
    
    # Add a visual representation of relationships
    result += "### Database Relationships\n\n"
    result += "```\n"
    
    # Create a simple text-based diagram of relationships
    for table in tables:
        table_name = table.get('name')
        result += f"{table_name}\n"
        
        for field in table.get('fields', []):
            fk_target_field_id = field.get('fk_target_field_id')
            if fk_target_field_id:
                # Find target field information
                for t in tables:
                    for f in t.get('fields', []):
                        if f.get('id') == fk_target_field_id:
                            target_field = f.get('name')
                            target_table = t.get('name')
                            result += f"  └── {field.get('name')} → {target_table}.{target_field}\n"
        
        result += "\n"
    
    result += "```\n"
    
    return result

async def visualize_database_relationships(database_id: int) -> str:
    """
    Generate a visual representation of database relationships.
    
    Args:
        database_id: The ID of the database to visualize
        
    Returns:
        A formatted string with a visualization of table relationships.
    """
    response = await MetabaseAPI.get_database_schema(database_id)
    
    if response is None or "error" in response:
        return f"Error fetching database schema: {response.get('message', 'Unknown error')}"
    
    tables = response.get('tables', [])
    if not tables:
        return "No tables found in this database."
    
    result = f"## Database Relationship Diagram for: {response.get('name')}\n\n"
    
    # Generate a text-based ER diagram
    result += "```\n"
    
    # First list all tables
    result += "Tables:\n"
    for table in tables:
        result += f"  {table.get('name')}\n"
    
    result += "\nRelationships:\n"
    
    # Then show all relationships
    for table in tables:
        table_name = table.get('name')
        
        for field in table.get('fields', []):
            fk_target_field_id = field.get('fk_target_field_id')
            if fk_target_field_id:
                # Find target field information
                for t in tables:
                    for f in t.get('fields', []):
                        if f.get('id') == fk_target_field_id:
                            target_field = f.get('name')
                            target_table = t.get('name')
                            result += f"  {table_name}.{field.get('name')} → {target_table}.{target_field}\n"
    
    result += "```\n\n"
    
    # Add a more detailed description of each relationship
    result += "### Detailed Relationships\n\n"
    
    for table in tables:
        table_name = table.get('name')
        has_relationships = False
        
        for field in table.get('fields', []):
            fk_target_field_id = field.get('fk_target_field_id')
            if fk_target_field_id:
                if not has_relationships:
                    result += f"**{table_name}** has the following relationships:\n\n"
                    has_relationships = True
                
                # Find target field information
                for t in tables:
                    for f in t.get('fields', []):
                        if f.get('id') == fk_target_field_id:
                            target_field = f.get('name')
                            target_table = t.get('name')
                            result += f"- Field **{field.get('name')}** references **{target_table}.{target_field}**\n"
        
        if has_relationships:
            result += "\n"
    
    return result

async def run_database_query(database_id: int, query: str) -> str:
    """
    Run a read-only SQL query against a database and return the first 5 rows.
    
    Args:
        database_id: The ID of the database to query
        query: The SQL query to execute (will be limited to 5 rows)
        
    Returns:
        A formatted string with the query results or error message
    """
    # Execute the query with a limit of 5 rows
    response = await MetabaseAPI.run_query(database_id, query, row_limit=5)
    
    if response is None:
        return "Error: No response received from Metabase API"
    
    if isinstance(response, dict) and "error" in response:
        # Extract more detailed error information if available
        error_message = response.get('message', 'Unknown error')
        
        # Try to extract structured error info
        if isinstance(error_message, dict) and 'data' in error_message:
            data = error_message.get('data', {})
            if 'errors' in data:
                return f"Error executing query: {data['errors']}"
        
        # Handle different error formats from Metabase
        if isinstance(error_message, str) and "does not exist" in error_message:
            return f"Error executing query: {error_message}"
            
        # If it's a raw JSON string representation, try to parse it
        if isinstance(error_message, str) and error_message.startswith('{'):
            try:
                import json
                error_json = json.loads(error_message)
                if 'data' in error_json and 'errors' in error_json['data']:
                    return f"Error executing query: {error_json['data']['errors']}"
            except:
                pass
        
        return f"Error executing query: {error_message}"
    
    # Format the results
    result = f"## Query Results\n\n"
    result += f"```sql\n{query}\n```\n\n"
    
    # Extract and format the data
    try:
        # Get column names
        if "data" in response and "cols" in response["data"]:
            columns = [col.get("name", f"Column {i}") for i, col in enumerate(response["data"]["cols"])]
            
            # Get rows (limited to 5)
            rows = []
            if "data" in response and "rows" in response["data"]:
                rows = response["data"]["rows"][:5]
            
            # Format as a table
            if columns and rows:
                # Add header
                result += "| " + " | ".join(columns) + " |\n"
                result += "| " + " | ".join(["---"] * len(columns)) + " |\n"
                
                # Add rows
                for row in rows:
                    result += "| " + " | ".join([str(cell) for cell in row]) + " |\n"
                
                # Add row count info
                total_row_count = response.get("row_count", len(rows))
                if total_row_count > 5:
                    result += f"\n*Showing 5 of {total_row_count} rows*\n"
            else:
                result += "No data returned by the query.\n"
        else:
            result += "No data structure found in the response.\n"
            result += f"Raw response: {response}\n"
    except Exception as e:
        result += f"Error formatting results: {str(e)}\n"
        result += f"Raw response: {response}\n"
    
    return result

async def db_overview(database_id: int) -> str:
    """
    Get an overview of all tables in a database without detailed field information.
    
    Args:
        database_id: The ID of the database to get the overview for
        
    Returns:
        A formatted string with basic information about all tables in the database.
    """
    response = await MetabaseAPI.get_database_schema(database_id)
    
    if response is None or "error" in response:
        return f"Error fetching database schema: {response.get('message', 'Unknown error')}"
    
    tables = response.get('tables', [])
    if not tables:
        return "No tables found in this database."
    
    result = f"## Database Overview: {response.get('name')}\n\n"
    
    # Add database details
    result += f"**ID**: {response.get('id')}\n"
    result += f"**Engine**: {response.get('engine')}\n"
    result += f"**Is Sample**: {response.get('is_sample', False)}\n\n"
    
    # Add tables information in a tabular format
    result += "### Tables\n\n"
    
    # Create markdown table header with Table ID
    result += "| Table ID | Table Name | Schema | Description | # of Fields |\n"
    result += "| -------- | ---------- | ------ | ----------- | ----------- |\n"
    
    # Add each table as a row
    for table in tables:
        table_id = table.get('id', 'Unknown')
        name = table.get('name', 'Unknown')
        schema = table.get('schema', 'N/A')
        description = table.get('description', 'No description')
        field_count = len(table.get('fields', []))
        
        # Add null check before using string methods
        if description is None:
            description = 'No description'
        else:
            # Clean up description for table display (remove newlines but keep full text)
            description = description.replace('\n', ' ').strip()
        
        result += f"| {table_id} | {name} | {schema} | {description} | {field_count} |\n"
    
    return result

async def table_detail(database_id: int, table_id: int) -> str:
    """
    Get detailed information about a specific table.
    
    Args:
        database_id: The ID of the database containing the table
        table_id: The ID of the table to get details for
        
    Returns:
        A formatted string with detailed information about the table.
    """
    # Directly fetch metadata for the specific table
    response = await MetabaseAPI.get_table_metadata(table_id)
    
    if response is None or "error" in response:
        return f"Error fetching table metadata: {response.get('message', 'Unknown error')}"
    
    # Extract table information
    table_name = response.get('name', 'Unknown')
    result = f"## Table Details: {table_name}\n\n"
    
    # Add table details
    result += f"**ID**: {response.get('id')}\n"
    result += f"**Schema**: {response.get('schema', 'N/A')}\n"
    description = response.get('description', 'No description')
    if description is None:
        description = 'No description'
    result += f"**Description**: {description}\n\n"
    
    # Add fields section
    fields = response.get('fields', [])
    result += f"### Fields ({len(fields)})\n\n"
    
    # Create markdown table for fields
    result += "| Field ID | Field Name | Type | Description | Special Type |\n"
    result += "| -------- | ---------- | ---- | ----------- | ------------ |\n"
    
    # Track foreign keys for relationship section
    foreign_keys = []
    
    for field in fields:
        field_id = field.get('id', 'Unknown')
        name = field.get('name', 'Unknown')
        field_type = field.get('base_type', 'Unknown')
        description = field.get('description', 'No description')
        special_type = field.get('special_type', 'None')
        
        # Add null check before using string methods
        if description is None:
            description = 'No description'
        else:
            # Clean up description for table display (remove newlines but keep full text)
            description = description.replace('\n', ' ').strip()
        
        result += f"| {field_id} | {name} | {field_type} | {description} | {special_type} |\n"
        
        # Check if this is a foreign key
        fk_target_field_id = field.get('fk_target_field_id')
        if fk_target_field_id:
            foreign_keys.append({
                'source_field': field.get('name'),
                'source_field_id': field.get('id'),
                'target_field_id': fk_target_field_id
            })
    
    # Add relationships section if there are foreign keys
    if foreign_keys:
        result += "\n### Relationships\n\n"
        
        for fk in foreign_keys:
            # We'll need to fetch target field information
            target_field = await MetabaseAPI.get_field_metadata(fk['target_field_id'])
            
            if target_field and not "error" in target_field:
                target_field_name = target_field.get('name', 'Unknown field')
                target_table_id = target_field.get('table_id')
                
                # Get target table information
                target_table = await MetabaseAPI.get_table_metadata(target_table_id)
                target_table_name = target_table.get('name', 'Unknown table') if target_table else 'Unknown table'
                
                result += f"- **{fk['source_field']}** → **{target_table_name}.{target_field_name}** (Table ID: {target_table_id})\n"
            else:
                result += f"- **{fk['source_field']}** → **Unknown reference** (Target Field ID: {fk['target_field_id']})\n"
    
    # For "Referenced By" section, we would need to search through other tables
    # For now, let's note that this would require additional API calls
    result += "\n### Referenced By\n\n"
    result += "*Note: To see all references to this table, use the database visualization tool.*\n"
    
    return result 
```

--------------------------------------------------------------------------------
/templates/config.html:
--------------------------------------------------------------------------------

```html
<!DOCTYPE html>
<html>
<head>
    <title>Metabase MCP Configuration</title>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/github-markdown.min.css">
    <script src="https://cdn.jsdelivr.net/npm/[email protected]/marked.min.js"></script>
    <style>
        body {
            font-family: Arial, sans-serif;
            max-width: 800px;
            margin: 0 auto;
            padding: 20px;
        }
        .form-group {
            margin-bottom: 15px;
        }
        label {
            display: block;
            margin-bottom: 5px;
            font-weight: bold;
        }
        input[type="text"] {
            width: 100%;
            padding: 8px;
            box-sizing: border-box;
        }
        button {
            background-color: #4CAF50;
            color: white;
            padding: 10px 15px;
            border: none;
            cursor: pointer;
            margin-right: 10px;
        }
        button:hover {
            background-color: #45a049;
        }
        .test-button {
            background-color: #2196F3;
        }
        .test-button:hover {
            background-color: #0b7dda;
        }
        .message {
            margin: 15px 0;
            padding: 10px;
            border-radius: 5px;
        }
        .success {
            background-color: #d4edda;
            color: #155724;
        }
        .error {
            background-color: #f8d7da;
            color: #721c24;
        }
        .result-area {
            margin: 20px 0;
            padding:.875rem;
            background: #f5f5f5;
            border-radius: 4px;
            border: 1px solid #ddd;
            min-height: 100px;
            max-height: 400px;
            overflow-y: auto;
            white-space: pre-wrap;
            display: none;
        }
        .section {
            margin-top: 30px;
            border-top: 1px solid #ddd;
            padding-top: 20px;
        }
        textarea.form-control {
            width: 100%;
            padding: 8px;
            box-sizing: border-box;
            font-family: monospace;
            border: 1px solid #ccc;
            border-radius: 4px;
        }
        .markdown-body {
            box-sizing: border-box;
            min-width: 200px;
            max-width: 100%;
            padding: 15px;
            background-color: #fff;
            border-radius: 4px;
            border: 1px solid #ddd;
            color: #24292e;
            font-weight: normal;
            font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Helvetica, Arial, sans-serif;
        }
        .markdown-body h1,
        .markdown-body h2,
        .markdown-body h3,
        .markdown-body h4 {
            color: #24292e;
            font-weight: 600;
            margin-top: 24px;
            margin-bottom: 16px;
        }
        .markdown-body table {
            display: table;
            width: 100%;
            border-collapse: collapse;
            margin-bottom: 16px;
            border: 1px solid #ddd;
        }
        .markdown-body table th {
            font-weight: 600;
            padding: 8px 13px;
            border: 1px solid #ddd;
            background-color: #f6f8fa;
            color: #24292e;
        }
        .markdown-body table td {
            padding: 8px 13px;
            border: 1px solid #ddd;
            color: #24292e;
        }
        .markdown-body table tr {
            background-color: #fff;
            border-top: 1px solid #c6cbd1;
        }
        .markdown-body table tr:nth-child(2n) {
            background-color: #f6f8fa;
        }
        .markdown-body p,
        .markdown-body ul,
        .markdown-body ol,
        .markdown-body blockquote {
            color: #24292e;
            margin-bottom: 16px;
        }
        .markdown-body pre,
        .markdown-body code {
            background-color: #f6f8fa;
            border-radius: 3px;
            padding: 0.2em 0.4em;
            color: #24292e;
            font-family: SFMono-Regular, Consolas, "Liberation Mono", Menlo, monospace;
        }
        .result-container {
            margin-top: 15px;
        }
    </style>
</head>
<body>
    <h1>Metabase MCP Configuration</h1>
    
    {% with messages = get_flashed_messages() %}
        {% if messages %}
            <div class="message {% if 'error' in messages[0].lower() %}error{% else %}success{% endif %}">
                {{ messages[0] }}
            </div>
        {% endif %}
    {% endwith %}
    
    <form method="post" action="/save_config" id="config-form">
        <div class="form-group">
            <label for="metabase_url">Metabase URL:</label>
            <input type="text" id="metabase_url" name="metabase_url" value="{{ metabase_url }}" placeholder="http://localhost:3000">
        </div>
        
        <div class="form-group">
            <label for="api_key">API Key:</label>
            <input type="password" id="api_key" name="api_key" class="form-control" 
                   value="{{ api_key }}" 
                   placeholder="{{ 'API key is set' if api_key_set else 'Enter your Metabase API key' }}"
                   required>
            <small class="form-text text-muted">
                {% if api_key_set %}
                Your API key is stored encrypted. Enter a new key to change it.
                {% else %}
                Your API key will be stored encrypted and never displayed in plain text.
                {% endif %}
            </small>
        </div>
        
        <div class="form-group">
            <button type="submit">Save Configuration</button>
            <button type="button" class="test-button" id="test-connection">Test Connection</button>
        </div>
    </form>
    
    <div id="connection-result" class="result-area"></div>
    
    <div class="section">
        <h2>Test Tools</h2>
        
        <h3>List Databases</h3>
        <button type="button" id="test-list-databases">Test List Databases</button>
        <div id="list-databases-result" class="result-area"></div>
        
        <h3>Get Database Metadata</h3>
        <div class="form-group">
            <label for="database_id">Database ID:</label>
            <input type="text" id="database_id" placeholder="Enter database ID">
        </div>
        <button type="button" id="test-get-metadata">Test Get Metadata</button>
        <div id="get-metadata-result" class="result-area"></div>
        
        <h3>DB Overview</h3>
        <div class="form-group">
            <label for="db_overview_database_id">Database ID:</label>
            <input type="text" id="db_overview_database_id" placeholder="Enter database ID">
        </div>
        <button type="button" id="test-db-overview">Get Database Overview</button>
        <div id="db-overview-result" class="result-area"></div>
        
        <h3>Table Detail</h3>
        <div class="form-group">
            <label for="table_detail_database_id">Database ID:</label>
            <input type="text" id="table_detail_database_id" placeholder="Enter database ID">
        </div>
        <div class="form-group">
            <label for="table_detail_table_id">Table ID:</label>
            <input type="text" id="table_detail_table_id" placeholder="Enter table ID (from DB Overview)">
        </div>
        <button type="button" id="test-table-detail">Get Table Detail</button>
        <div id="table-detail-result" class="result-area"></div>
        
        <h3>Visualize Database Relationships</h3>
        <div class="form-group">
            <label for="relationship_database_id">Database ID:</label>
            <input type="text" id="relationship_database_id" placeholder="Enter database ID">
        </div>
        <button type="button" id="test-visualize-relationships">Visualize Relationships</button>
        <div id="visualize-relationships-result" class="result-area"></div>
        
        <h3>Run Database Query</h3>
        <div class="form-group">
            <label for="query_database_id">Database ID:</label>
            <input type="text" id="query_database_id" placeholder="Enter database ID">
        </div>
        <div class="form-group">
            <label for="sql_query">SQL Query:</label>
            <textarea id="sql_query" placeholder="Enter SQL query" rows="4" class="form-control"></textarea>
            <small class="form-text text-muted">Query will be limited to returning 5 rows for safety.</small>
        </div>
        <button type="button" id="test-run-query">Run Query</button>
        <div id="run-query-result" class="result-area"></div>
    </div>
    
    <div class="section">
        <h3>List Actions</h3>
        <button type="button" id="test-list-actions">Test List Actions</button>
        <div id="list-actions-result" class="result-area"></div>
        
        <h3>Get Action Details</h3>
        <div class="form-group">
            <label for="action_id">Action ID:</label>
            <input type="text" id="action_id" placeholder="Enter action ID">
        </div>
        <button type="button" id="test-get-action">Test Get Action Details</button>
        <div id="get-action-result" class="result-area"></div>
        
        <h3>Execute Action</h3>
        <div class="form-group">
            <label for="exec_action_id">Action ID:</label>
            <input type="text" id="exec_action_id" placeholder="Enter action ID">
        </div>
        <button type="button" id="load-action-params">Load Parameters</button>
        <div id="action-parameters" class="form-group"></div>
        <button type="button" id="test-execute-action">Execute Action</button>
        <div id="execute-action-result" class="result-area"></div>
    </div>
    
    <script>
        // Configure marked.js options
        marked.setOptions({
            gfm: true,
            breaks: true,
            tables: true,
            sanitize: false
        });

        // Function to format response data with proper Markdown rendering
        function formatResponse(container, data) {
            if (data.success) {
                // Check if response has result or message property
                const content = data.result || data.message || '';
                
                // Only try to parse as markdown if it's not empty
                if (content) {
                    container.innerHTML = '<div class="markdown-body">' + marked.parse(content) + '</div>';
                } else {
                    container.innerHTML = '<div class="markdown-body">Operation completed successfully.</div>';
                }
                
                // Add success class if not already present
                if (!container.className.includes('success')) {
                    container.className += ' success';
                }
            } else {
                const errorMsg = data.error || data.message || 'Unknown error';
                container.innerHTML = '<div class="error">' + errorMsg + '</div>';
                
                // Add error class if not already present
                if (!container.className.includes('error')) {
                    container.className += ' error';
                }
            }
        }

        // Test connection
        document.getElementById('test-connection').addEventListener('click', function() {
            const url = document.getElementById('metabase_url').value;
            const apiKey = document.getElementById('api_key').value;
            const resultArea = document.getElementById('connection-result');
            
            resultArea.style.display = 'block';
            resultArea.innerHTML = 'Testing connection...';
            
            fetch('/test_connection', {
                method: 'POST',
                headers: {'Content-Type': 'application/x-www-form-urlencoded'},
                body: `metabase_url=${encodeURIComponent(url)}&api_key=${encodeURIComponent(apiKey)}`
            })
            .then(response => response.json())
            .then(data => {
                formatResponse(resultArea, data);
            })
            .catch(error => {
                resultArea.className = 'result-area error';
                resultArea.innerHTML = `Error: ${error}`;
            });
        });
        
        // Test list databases
        document.getElementById('test-list-databases').addEventListener('click', function() {
            const resultArea = document.getElementById('list-databases-result');
            
            resultArea.style.display = 'block';
            resultArea.innerHTML = 'Fetching databases...';
            
            fetch('/test_list_databases')
            .then(response => response.json())
            .then(data => {
                formatResponse(resultArea, data);
            })
            .catch(error => {
                resultArea.className = 'result-area error';
                resultArea.innerHTML = `Error: ${error}`;
            });
        });
        
        // Test get metadata
        document.getElementById('test-get-metadata').addEventListener('click', function() {
            const databaseId = document.getElementById('database_id').value;
            const resultArea = document.getElementById('get-metadata-result');
            
            if (!databaseId) {
                resultArea.style.display = 'block';
                resultArea.className = 'result-area error';
                resultArea.innerHTML = 'Please enter a database ID';
                return;
            }
            
            resultArea.style.display = 'block';
            resultArea.innerHTML = 'Fetching metadata...';
            
            fetch('/test_get_metadata', {
                method: 'POST',
                headers: {'Content-Type': 'application/x-www-form-urlencoded'},
                body: `database_id=${encodeURIComponent(databaseId)}`
            })
            .then(response => response.json())
            .then(data => {
                formatResponse(resultArea, data);
            })
            .catch(error => {
                resultArea.className = 'result-area error';
                resultArea.innerHTML = `Error: ${error}`;
            });
        });

        // Test list actions
        document.getElementById('test-list-actions').addEventListener('click', function() {
            const resultArea = document.getElementById('list-actions-result');
            
            resultArea.style.display = 'block';
            resultArea.innerHTML = 'Fetching actions...';
            
            fetch('/test_list_actions')
            .then(response => response.json())
            .then(data => {
                formatResponse(resultArea, data);
            })
            .catch(error => {
                resultArea.className = 'result-area error';
                resultArea.innerHTML = `Error: ${error}`;
            });
        });

        // Test get action details
        document.getElementById('test-get-action').addEventListener('click', function() {
            const actionId = document.getElementById('action_id').value;
            const resultArea = document.getElementById('get-action-result');
            
            if (!actionId) {
                resultArea.style.display = 'block';
                resultArea.className = 'result-area error';
                resultArea.innerHTML = 'Please enter an action ID';
                return;
            }
            
            resultArea.style.display = 'block';
            resultArea.innerHTML = 'Fetching action details...';
            
            fetch('/test_get_action_details', {
                method: 'POST',
                headers: {'Content-Type': 'application/x-www-form-urlencoded'},
                body: `action_id=${encodeURIComponent(actionId)}`
            })
            .then(response => response.json())
            .then(data => {
                formatResponse(resultArea, data);
            })
            .catch(error => {
                resultArea.className = 'result-area error';
                resultArea.innerHTML = `Error: ${error}`;
            });
        });

        // Load action parameters
        document.getElementById('load-action-params').addEventListener('click', function() {
            const actionId = document.getElementById('exec_action_id').value;
            const paramsContainer = document.getElementById('action-parameters');
            
            if (!actionId) {
                alert('Please enter an action ID');
                return;
            }
            
            paramsContainer.innerHTML = 'Loading parameters...';
            
            fetch('/test_get_action_details', {
                method: 'POST',
                headers: {'Content-Type': 'application/x-www-form-urlencoded'},
                body: `action_id=${encodeURIComponent(actionId)}`
            })
            .then(response => response.json())
            .then(data => {
                if (data.success) {
                    // Parse the markdown to extract parameters
                    const paramSection = data.result.split('### Parameters')[1];
                    if (!paramSection) {
                        paramsContainer.innerHTML = 'No parameters found for this action.';
                        return;
                    }
                    
                    // Clear container
                    paramsContainer.innerHTML = '';
                    
                    // Extract parameter names from the markdown
                    const paramLines = paramSection.split('\n');
                    let paramCount = 0;
                    
                    for (const line of paramLines) {
                        if (line.startsWith('- **')) {
                            const paramName = line.split('**:')[0].replace('- **', '').trim();
                            
                            const formGroup = document.createElement('div');
                            formGroup.className = 'form-group';
                            
                            const label = document.createElement('label');
                            label.setAttribute('for', `param_${paramName}`);
                            label.textContent = `Parameter: ${paramName}`;
                            
                            const input = document.createElement('input');
                            input.type = 'text';
                            input.id = `param_${paramName}`;
                            input.name = `param_${paramName}`;
                            input.placeholder = `Enter value for ${paramName}`;
                            
                            formGroup.appendChild(label);
                            formGroup.appendChild(input);
                            paramsContainer.appendChild(formGroup);
                            
                            paramCount++;
                        }
                    }
                    
                    if (paramCount === 0) {
                        paramsContainer.innerHTML = 'No parameters found for this action.';
                    }
                } else {
                    paramsContainer.innerHTML = `Error: ${data.error}`;
                }
            })
            .catch(error => {
                paramsContainer.innerHTML = `Error: ${error}`;
            });
        });

        // Execute action
        document.getElementById('test-execute-action').addEventListener('click', function() {
            const actionId = document.getElementById('exec_action_id').value;
            const resultArea = document.getElementById('execute-action-result');
            const paramsContainer = document.getElementById('action-parameters');
            
            if (!actionId) {
                resultArea.style.display = 'block';
                resultArea.className = 'result-area error';
                resultArea.innerHTML = 'Please enter an action ID';
                return;
            }
            
            // Collect parameters
            const formData = new FormData();
            formData.append('action_id', actionId);
            
            // Add all input fields from the parameters container
            const inputs = paramsContainer.querySelectorAll('input');
            inputs.forEach(input => {
                if (input.name && input.value) {
                    formData.append(input.name, input.value);
                }
            });
            
            resultArea.style.display = 'block';
            resultArea.innerHTML = 'Executing action...';
            
            fetch('/test_execute_action', {
                method: 'POST',
                body: formData
            })
            .then(response => response.json())
            .then(data => {
                formatResponse(resultArea, data);
            })
            .catch(error => {
                resultArea.className = 'result-area error';
                resultArea.innerHTML = `Error: ${error}`;
            });
        });

        // Test visualize relationships
        document.getElementById('test-visualize-relationships').addEventListener('click', function() {
            const databaseId = document.getElementById('relationship_database_id').value;
            if (!databaseId) {
                alert('Please enter a database ID');
                return;
            }
            
            const resultArea = document.getElementById('visualize-relationships-result');
            resultArea.style.display = 'block';
            resultArea.innerHTML = 'Visualizing relationships...';
            
            fetch('/test_visualize_relationships', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/x-www-form-urlencoded',
                },
                body: `database_id=${databaseId}`
            })
            .then(response => response.json())
            .then(data => {
                formatResponse(resultArea, data);
            })
            .catch(error => {
                resultArea.innerHTML = `<div class="error">Error: ${error.message}</div>`;
            });
        });

        // Test run query
        document.getElementById('test-run-query').addEventListener('click', function() {
            const databaseId = document.getElementById('query_database_id').value;
            const sqlQuery = document.getElementById('sql_query').value;
            
            if (!databaseId || !sqlQuery) {
                alert('Please enter both database ID and SQL query');
                return;
            }
            
            const resultArea = document.getElementById('run-query-result');
            resultArea.style.display = 'block';
            resultArea.innerHTML = 'Executing query...';
            
            fetch('/test_run_query', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/x-www-form-urlencoded',
                },
                body: `database_id=${encodeURIComponent(databaseId)}&query=${encodeURIComponent(sqlQuery)}`
            })
            .then(response => response.json())
            .then(data => {
                formatResponse(resultArea, data);
            })
            .catch(error => {
                resultArea.innerHTML = `<div class="error">Error: ${error.message}</div>`;
            });
        });

        // Test DB Overview
        document.getElementById('test-db-overview').addEventListener('click', function() {
            const databaseId = document.getElementById('db_overview_database_id').value;
            if (!databaseId) {
                alert('Please enter a database ID');
                return;
            }
            
            const resultArea = document.getElementById('db-overview-result');
            resultArea.style.display = 'block';
            resultArea.innerHTML = 'Loading database overview...';
            
            fetch('/test_db_overview', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/x-www-form-urlencoded',
                },
                body: `database_id=${databaseId}`
            })
            .then(response => response.json())
            .then(data => {
                formatResponse(resultArea, data);
            })
            .catch(error => {
                resultArea.innerHTML = `<div class="error">Error: ${error.message}</div>`;
            });
        });

        // Test Table Detail
        document.getElementById('test-table-detail').addEventListener('click', function() {
            const databaseId = document.getElementById('table_detail_database_id').value;
            const tableId = document.getElementById('table_detail_table_id').value;
            
            if (!databaseId || !tableId) {
                alert('Please enter both database ID and table ID');
                return;
            }
            
            const resultArea = document.getElementById('table-detail-result');
            resultArea.style.display = 'block';
            resultArea.innerHTML = 'Loading table details...';
            
            fetch('/test_table_detail', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/x-www-form-urlencoded',
                },
                body: `database_id=${encodeURIComponent(databaseId)}&table_id=${encodeURIComponent(tableId)}`
            })
            .then(response => response.json())
            .then(data => {
                formatResponse(resultArea, data);
            })
            .catch(error => {
                resultArea.innerHTML = `<div class="error">Error: ${error.message}</div>`;
            });
        });
    </script>
</body>
</html>

```