# Directory Structure ``` ├── .gitignore ├── commands ├── docker-compose.yml ├── docker-entrypoint.sh ├── Dockerfile ├── docs │ └── developer-guide.md ├── README.md ├── requirements.txt ├── setup.py ├── src │ ├── __init__.py │ ├── api │ │ └── metabase.py │ ├── config │ │ └── settings.py │ ├── server │ │ ├── mcp_server.py │ │ └── web_interface.py │ └── tools │ ├── metabase_action_tools.py │ └── metabase_tools.py └── templates └── config.html ``` # Files -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Environment and configuration .env .env.* !.env.example venv/ env/ ENV/ .venv .python-version # Python cache files __pycache__/ *.py[cod] *$py.class *.so .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ *.egg-info/ .installed.cfg *.egg # Docker .docker/ docker-compose.override.yml # IDE specific files .idea/ .vscode/ *.swp *.swo .DS_Store .project .classpath .settings/ *.sublime-workspace *.sublime-project # Logs logs/ *.log npm-debug.log* yarn-debug.log* yarn-error.log* # Testing .coverage htmlcov/ .pytest_cache/ .tox/ .nox/ coverage.xml *.cover .hypothesis/ # Metabase specific metabase-data/ # Temporary files tmp/ temp/ # Jupyter Notebooks .ipynb_checkpoints *.ipynb # Local development local_settings.py db.sqlite3 db.sqlite3-journal ``` This `.gitignore` file: 1. Excludes environment files (`.env`) that might contain sensitive API keys 2. Ignores Python cache files and build artifacts 3. Excludes virtual environment directories 4. Ignores IDE-specific files and directories 5. Excludes logs and testing artifacts 6. Ignores any Metabase data directories 7. Excludes temporary files and Jupyter notebooks You can create this file in the root directory of your project. If you need to make any adjustments based on your specific setup, feel free to modify it. ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # Metabase MCP Server A Model Control Protocol (MCP) server that enables AI assistants to interact with Metabase databases and actions. ![Metabase MCP Server] ## Overview The Metabase MCP Server provides a bridge between AI assistants and Metabase, allowing AI models to: - List and explore databases configured in Metabase - Retrieve detailed metadata about database schemas, tables, and fields - Visualize relationships between tables in a database - List and execute Metabase actions - Perform operations on Metabase data through a secure API This server implements the [Model Control Protocol (MCP)] specification, making it compatible with AI assistants that support MCP tools. ## Features - **Database Exploration**: List all databases and explore their schemas - **Metadata Retrieval**: Get detailed information about tables, fields, and relationships - **Relationship Visualization**: Generate visual representations of database relationships - **Action Management**: List, view details, and execute Metabase actions - **Secure API Key Handling**: Store API keys encrypted and prevent exposure - **Web Interface**: Test and debug functionality through a user-friendly web interface - **Docker Support**: Easy deployment with Docker and Docker Compose ## Prerequisites - Metabase instance (v0.46.0 or higher recommended) - Metabase API key with appropriate permissions - Docker (for containerized deployment) - Python 3.10+ (for local development) ## Installation ### Using Docker (Recommended) 1. Clone this repository: ```bash git clone https://github.com/yourusername/metabase-mcp.git cd metabase-mcp ``` 2. Build and run the Docker container: ```bash docker-compose up -d ``` 3. Access the configuration interface at http://localhost:5001 ### Manual Installation 1. Clone this repository: ```bash git clone https://github.com/yourusername/metabase-mcp.git cd metabase-mcp ``` 2. Install dependencies: ```bash pip install -r requirements.txt ``` 3. Run the configuration interface: ```bash python -m src.server.web_interface ``` 4. Access the configuration interface at http://localhost:5000 ## Configuration 1. Open the web interface in your browser 2. Enter your Metabase URL (e.g., http://localhost:3000) 3. Enter your Metabase API key 4. Click "Save Configuration" and test the connection ### Obtaining a Metabase API Key 1. Log in to your Metabase instance as an administrator 2. Go to Settings > Admin settings > API Keys 3. Create a new API key with appropriate permissions 4. Copy the generated key for use in the MCP server ## Usage ### Running the MCP Server After configuration, you can run the MCP server: ```bash # Using Docker docker run -p 5001:5000 metabase-mcp # Manually python -m src.server.mcp_server ``` ### Available Tools The MCP server provides the following tools to AI assistants: 1. **list_databases**: List all databases configured in Metabase 2. **get_database_metadata**: Get detailed metadata for a specific database 3. **db_overview**: Get a high-level overview of all tables in a database 4. **table_detail**: Get detailed information about a specific table 5. **visualize_database_relationships**: Generate a visual representation of database relationships 6. **run_database_query**: Execute a SQL query against a database 7. **list_actions**: List all actions configured in Metabase 8. **get_action_details**: Get detailed information about a specific action 9. **execute_action**: Execute a Metabase action with parameters ### Testing Tools via Web Interface The web interface provides a testing area for each tool: 1. **List Databases**: View all databases configured in Metabase 2. **Get Database Metadata**: View detailed schema information for a database 3. **DB Overview**: View a concise list of all tables in a database 4. **Table Detail**: View detailed information about a specific table 5. **Visualize Database Relationships**: Generate a visual representation of table relationships 6. **Run Query**: Execute SQL queries against databases 7. **List Actions**: View all actions configured in Metabase 8. **Get Action Details**: View detailed information about a specific action 9. **Execute Action**: Test executing an action with parameters ## Security Considerations - API keys are stored encrypted at rest - The web interface never displays API keys in plain text - All API requests use HTTPS when configured with a secure Metabase URL - The server should be deployed behind a secure proxy in production environments ## Development ### Project Structure ``` metabase-mcp/ ├── src/ │ ├── api/ # Metabase API client │ ├── config/ # Configuration management │ ├── server/ # MCP and web servers │ └── tools/ # Tool implementations ├── templates/ # Web interface templates ├── docker-compose.yml # Docker Compose configuration ├── Dockerfile # Docker build configuration ├── requirements.txt # Python dependencies └── README.md # Documentation ``` ### Adding New Tools To add a new tool: 1. Implement the tool function in `src/tools/` 2. Register the tool in `src/server/mcp_server.py` 3. Add a testing interface in `templates/config.html` (optional) 4. Add a route in `src/server/web_interface.py` (if adding a testing interface) ## Troubleshooting ### Common Issues - **Connection Failed**: Ensure your Metabase URL is correct and accessible - **Authentication Error**: Verify your API key has the necessary permissions - **Docker Network Issues**: When using Docker, ensure proper network configuration ### Logs Check the logs for detailed error information: ```bash # Docker logs docker logs metabase-mcp # Manual execution logs # Logs are printed to the console ``` ## Contributing Contributions are welcome! Please feel free to submit a Pull Request. ``` -------------------------------------------------------------------------------- /src/__init__.py: -------------------------------------------------------------------------------- ```python # This file can be empty or contain package-level imports ``` -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` mcp>=1.2.0 httpx>=0.24.0 flask[async]>=3.1.0 python-dotenv>=0.19.0 cryptography>=41.0.0 ``` -------------------------------------------------------------------------------- /docker-compose.yml: -------------------------------------------------------------------------------- ```yaml version: '3' services: metabase-mcp: build: . ports: - "5001:5000" volumes: - ./data:/app/data environment: - METABASE_URL=http://host.docker.internal:3000 - FLASK_DEBUG=False command: config ``` -------------------------------------------------------------------------------- /setup.py: -------------------------------------------------------------------------------- ```python from setuptools import setup, find_packages setup( name="metabase-mcp", version="0.1.0", packages=find_packages(), install_requires=[ "mcp>=1.2.0", "httpx>=0.24.0", "flask[async]>=3.1.0", "python-dotenv>=0.19.0", ], python_requires=">=3.8", ) ``` -------------------------------------------------------------------------------- /docker-entrypoint.sh: -------------------------------------------------------------------------------- ```bash #!/bin/bash # Default behavior: Start both MCP server and Web Interface echo "Starting MCP server in background..." python -m src.server.mcp_server & MCP_PID=$! # Get the Process ID of the backgrounded MCP server echo "Starting Web Interface in foreground..." # The FLASK_PORT environment variable should be set (e.g., in your .env file or Dockerfile) # to the port exposed in the Dockerfile (5000). # The Flask app in web_interface.py must be configured to bind to 0.0.0.0. exec python -m src.server.web_interface # Optional: A more robust script might wait for the MCP_PID to exit # and handle cleanup, but for now, this keeps the web interface as the main process. ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile FROM python:3.10-slim WORKDIR /app # Copy requirements and install dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Install cryptography package RUN pip install --no-cache-dir cryptography # Copy application code COPY src/ ./src/ COPY templates/ ./templates/ COPY setup.py . # Install the package in development mode RUN pip install -e . # Expose port for web interface EXPOSE 5000 # Set environment variables ENV METABASE_URL=http://localhost:3000 ENV METABASE_API_KEY="" ENV PYTHONUNBUFFERED=1 # Use entrypoint script to allow different commands COPY docker-entrypoint.sh . RUN chmod +x docker-entrypoint.sh ENTRYPOINT ["/app/docker-entrypoint.sh"] ``` -------------------------------------------------------------------------------- /src/server/mcp_server.py: -------------------------------------------------------------------------------- ```python from mcp.server.fastmcp import FastMCP from src.config.settings import Config from src.tools.metabase_tools import list_databases, get_database_metadata, db_overview, table_detail, visualize_database_relationships, run_database_query from src.tools.metabase_action_tools import list_actions, get_action_details, execute_action def create_mcp_server(): """Create and configure an MCP server instance.""" mcp = FastMCP(Config.MCP_NAME) # Register database tools mcp.tool( description="List all databases configured in Metabase" )(list_databases) mcp.tool( description="Get detailed metadata for a specific database" )(get_database_metadata) mcp.tool( description="Get a high-level overview of all tables in a database" )(db_overview) mcp.tool( description="Get detailed information about a specific table" )(table_detail) mcp.tool( description="Generate a visual representation of database relationships" )(visualize_database_relationships) mcp.tool( description="Run a read-only SQL query against a database" )(run_database_query) # Register action tools mcp.tool( description="List all actions configured in Metabase" )(list_actions) mcp.tool( description="Get detailed information about a specific action" )(get_action_details) mcp.tool( description="Execute a Metabase action with parameters" )(execute_action) return mcp def run_mcp_server(): """Run the MCP server""" mcp = create_mcp_server() mcp.run(transport='stdio') if __name__ == "__main__": run_mcp_server() ``` -------------------------------------------------------------------------------- /src/config/settings.py: -------------------------------------------------------------------------------- ```python import os import base64 from cryptography.fernet import Fernet from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # Configuration class class Config: # Secret key for encryption (generate once and store securely) # In production, this should be set as an environment variable SECRET_KEY = os.environ.get("SECRET_KEY") or Fernet.generate_key().decode() # Metabase settings METABASE_URL = os.environ.get("METABASE_URL", "http://localhost:3000") _METABASE_API_KEY = os.environ.get("METABASE_API_KEY", "") # Flask settings FLASK_DEBUG = os.environ.get("FLASK_DEBUG", "False").lower() == "true" FLASK_HOST = os.environ.get("FLASK_HOST", "0.0.0.0") FLASK_PORT = int(os.environ.get("FLASK_PORT", "5000")) # MCP settings MCP_NAME = os.environ.get("MCP_NAME", "metabase") # File paths CONFIG_FILE = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), '.env') TEMPLATE_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'templates') @classmethod def encrypt_api_key(cls, api_key): """Encrypt the API key""" if not api_key: return "" cipher_suite = Fernet(cls.SECRET_KEY.encode()) encrypted_key = cipher_suite.encrypt(api_key.encode()) return base64.urlsafe_b64encode(encrypted_key).decode() @classmethod def decrypt_api_key(cls, encrypted_key): """Decrypt the API key""" if not encrypted_key: return "" try: cipher_suite = Fernet(cls.SECRET_KEY.encode()) decoded = base64.urlsafe_b64decode(encrypted_key.encode()) decrypted_key = cipher_suite.decrypt(decoded) return decrypted_key.decode() except Exception: # If decryption fails, return empty string return "" @classmethod def save_metabase_config(cls, metabase_url, api_key): """Save Metabase configuration to .env file""" # Encrypt the API key before saving encrypted_key = cls.encrypt_api_key(api_key) with open(cls.CONFIG_FILE, 'w') as f: f.write(f"METABASE_URL={metabase_url}\n") f.write(f"METABASE_API_KEY={encrypted_key}\n") f.write(f"SECRET_KEY={cls.SECRET_KEY}\n") # Update current environment os.environ['METABASE_URL'] = metabase_url os.environ['METABASE_API_KEY'] = encrypted_key # Update class attributes cls.METABASE_URL = metabase_url cls._METABASE_API_KEY = encrypted_key @classmethod def get_metabase_url(cls): """Get the current Metabase URL, refreshing from environment if needed""" cls.METABASE_URL = os.environ.get("METABASE_URL", cls.METABASE_URL) return cls.METABASE_URL @classmethod def get_metabase_api_key(cls): """Get the current Metabase API key, decrypting it first""" encrypted_key = os.environ.get("METABASE_API_KEY", cls._METABASE_API_KEY) return cls.decrypt_api_key(encrypted_key) ``` -------------------------------------------------------------------------------- /src/tools/metabase_action_tools.py: -------------------------------------------------------------------------------- ```python from src.api.metabase import MetabaseAPI from typing import Dict, Any async def list_actions() -> str: """ List all actions configured in Metabase. Returns: A formatted string with information about all configured actions. """ # Check Metabase version version_info = await MetabaseAPI.get_request("version") version = "unknown" if version_info and not "error" in version_info: version = version_info.get("version", "unknown") response = await MetabaseAPI.get_actions() if response is None or "error" in response: error_message = response.get('message', 'Unknown error') if response else 'No response' return f"Error fetching actions: {error_message}" if not response: return "No actions found in Metabase. You may need to create some actions first." result = "## Actions in Metabase\n\n" for action in response: result += f"- **ID**: {action.get('id')}\n" result += f" **Name**: {action.get('name')}\n" result += f" **Type**: {action.get('type')}\n" result += f" **Model ID**: {action.get('model_id')}\n" result += f" **Created At**: {action.get('created_at')}\n\n" return result async def get_action_details(action_id: int) -> str: """ Get detailed information about a specific action. Args: action_id: The ID of the action to fetch Returns: A formatted string with the action's details. """ response = await MetabaseAPI.get_request(f"action/{action_id}") if response is None or "error" in response: return f"Error fetching action details: {response.get('message', 'Unknown error')}" result = f"## Action: {response.get('name')}\n\n" result += f"**ID**: {response.get('id')}\n" result += f"**Type**: {response.get('type')}\n" result += f"**Model ID**: {response.get('model_id')}\n" result += f"**Database ID**: {response.get('database_id')}\n" result += f"**Created At**: {response.get('created_at')}\n\n" # Add parameters if available parameters = response.get('parameters', []) if parameters: result += f"### Parameters ({len(parameters)})\n\n" for param in parameters: result += f"- **{param.get('id')}**: {param.get('name')}\n" result += f" - Type: {param.get('type')}\n" if param.get('required'): result += f" - Required: {param.get('required')}\n" if param.get('default'): result += f" - Default: {param.get('default')}\n" result += "\n" return result async def execute_action(action_id: int, parameters: Dict[str, Any] = None) -> str: """ Execute a Metabase action with the provided parameters. Args: action_id: The ID of the action to execute parameters: Dictionary of parameter values to use when executing the action Returns: A formatted string with the execution results. """ if parameters is None: parameters = {} # First, verify the action exists action_details = await MetabaseAPI.get_request(f"action/{action_id}") if "error" in action_details: return f"Error: Action with ID {action_id} not found. {action_details.get('message', '')}" # Execute the action response = await MetabaseAPI.make_request( f"action/{action_id}/execute", method="POST", data={"parameters": parameters} ) if response is None or "error" in response: error_msg = response.get('message', 'Unknown error') if response else 'No response' return f"Error executing action: {error_msg}" # Format the successful response result = f"## Action Execution Results for '{action_details.get('name')}'\n\n" # Format the response based on what was returned if isinstance(response, dict): for key, value in response.items(): result += f"**{key}**: {value}\n" elif isinstance(response, list): result += f"Returned {len(response)} rows\n\n" if response and len(response) > 0: # Get keys from first item keys = response[0].keys() # Create table header result += "| " + " | ".join(keys) + " |\n" result += "| " + " | ".join(["---" for _ in keys]) + " |\n" # Add rows for row in response[:10]: # Limit to first 10 rows result += "| " + " | ".join([str(row.get(k, "")) for k in keys]) + " |\n" if len(response) > 10: result += "\n_Showing first 10 rows of results_\n" else: result += f"Result: {response}\n" return result async def check_actions_enabled() -> bool: """Check if actions are enabled in this Metabase instance""" # Check settings endpoint to see if actions are enabled settings = await MetabaseAPI.get_request("setting") if settings and not "error" in settings: for setting in settings: if setting.get("key") == "enable-actions" or setting.get("key") == "actions-enabled": return setting.get("value") == "true" or setting.get("value") == True # If we can't determine from settings, try to fetch actions as a test actions = await MetabaseAPI.get_actions() return not (actions is None or "error" in actions) ``` -------------------------------------------------------------------------------- /src/api/metabase.py: -------------------------------------------------------------------------------- ```python import httpx from typing import Dict, Any, Optional from src.config.settings import Config class MetabaseAPI: """Class for interacting with the Metabase API""" @staticmethod async def make_request(endpoint: str, method: str = "GET", data: Optional[Dict] = None) -> Any: """Make a request to the Metabase API with proper error handling. Args: endpoint: API endpoint to call (without the base URL) method: HTTP method to use (GET, POST, etc.) data: Optional JSON data to send with the request Returns: JSON response from the API or error dict """ # Get fresh values from config using the getter methods metabase_url = Config.get_metabase_url() api_key = Config.get_metabase_api_key() headers = { "x-api-key": api_key, "Content-Type": "application/json" } url = f"{metabase_url}/api/{endpoint.lstrip('/')}" print(f"Making request to: {url}") # Debugging async with httpx.AsyncClient() as client: try: if method == "GET": response = await client.get(url, headers=headers, timeout=30.0) elif method == "POST": response = await client.post(url, headers=headers, json=data, timeout=30.0) elif method == "PUT": response = await client.put(url, headers=headers, json=data, timeout=30.0) elif method == "DELETE": response = await client.delete(url, headers=headers, timeout=30.0) else: return {"error": f"Unsupported HTTP method: {method}"} response.raise_for_status() # Try to parse as JSON, but handle non-JSON responses try: return response.json() except ValueError: # If response is not JSON, return as error with the text content return {"error": "Non-JSON response", "message": response.text} except httpx.HTTPStatusError as e: # Try to get JSON error response try: error_json = e.response.json() return {"error": f"HTTP error: {e.response.status_code}", "message": str(error_json)} except ValueError: # If error is not JSON, return the text return {"error": f"HTTP error: {e.response.status_code}", "message": e.response.text} except Exception as e: return {"error": "Failed to make request", "message": str(e)} @classmethod async def get_request(cls, endpoint: str) -> Any: """Shorthand for GET requests""" return await cls.make_request(endpoint, method="GET") @classmethod async def post_request(cls, endpoint: str, data: Dict) -> Any: """Shorthand for POST requests""" return await cls.make_request(endpoint, method="POST", data=data) @classmethod async def test_connection(cls) -> tuple: """Test connection to Metabase API""" try: response = await cls.get_request("database") if response is None or "error" in response: return False, f"Connection failed: {response.get('message', 'Unknown error')}" return True, "Connection successful!" except Exception as e: return False, f"Connection failed: {str(e)}" @classmethod async def get_databases(cls): """Get list of all databases""" response = await cls.get_request("database") # Handle case where response might be a string if isinstance(response, str): return {"error": "Unexpected string response", "message": response} # Check if response is a dictionary with a 'data' key (new Metabase API format) if isinstance(response, dict) and 'data' in response: print(f"Found 'data' key in response with {len(response['data'])} databases") return response['data'] # Return just the list of databases return response @classmethod async def get_database_metadata(cls, database_id: int): """Get metadata for a specific database""" return await cls.get_request(f"database/{database_id}/metadata") @classmethod async def get_actions(cls): """Get list of all actions with support for different Metabase versions""" # Try the standard endpoint first response = await cls.get_request("action") # If that fails, try alternative endpoints if response is None or "error" in response: # Try the legacy endpoint (some older Metabase versions) response_legacy = await cls.get_request("api/action") if response_legacy and not "error" in response_legacy: return response_legacy # If all attempts failed, return the original error return response return response @classmethod async def get_action(cls, action_id: int): """Get details for a specific action""" return await cls.get_request(f"action/{action_id}") @classmethod async def execute_action(cls, action_id: int, parameters: Dict = None): """Execute an action with parameters""" if parameters is None: parameters = {} # Validate action_id if not isinstance(action_id, int) or action_id <= 0: return {"error": "Invalid action ID", "message": "Action ID must be a positive integer"} # Sanitize parameters sanitized_params = {} for key, value in parameters.items(): sanitized_params[str(key)] = value return await cls.post_request(f"action/{action_id}/execute", {"parameters": sanitized_params}) @classmethod async def get_table_metadata(cls, table_id: int): """Get detailed metadata for a specific table""" return await cls.get_request(f"table/{table_id}/query_metadata") @classmethod async def get_field_metadata(cls, field_id: int): """Get detailed metadata for a specific field""" return await cls.get_request(f"field/{field_id}") @classmethod async def get_database_schema(cls, database_id: int): """Get the database schema with relationships between tables""" # First get the basic metadata metadata = await cls.get_database_metadata(database_id) if metadata is None or "error" in metadata: return metadata # For each table, get detailed metadata including foreign keys tables = metadata.get('tables', []) enhanced_tables = [] for table in tables: table_id = table.get('id') if table_id: table_details = await cls.get_table_metadata(table_id) if table_details and not "error" in table_details: enhanced_tables.append(table_details) else: enhanced_tables.append(table) else: enhanced_tables.append(table) # Replace tables with enhanced versions metadata['tables'] = enhanced_tables return metadata @classmethod async def run_query(cls, database_id: int, query_string: str, row_limit: int = 5): """Run a native query against a database with a row limit Args: database_id: The ID of the database to query query_string: The SQL query to execute row_limit: Maximum number of rows to return (default: 5) Returns: Query results or error message """ # Remove trailing semicolons that can cause issues with Metabase API query_string = query_string.strip() if query_string.endswith(';'): query_string = query_string[:-1] # Ensure the query has a LIMIT clause for safety query_string = cls._ensure_query_limit(query_string, row_limit) # Prepare the query payload payload = { "database": database_id, "type": "native", "native": { "query": query_string, "template-tags": {} } } # Execute the query response = await cls.post_request("dataset", payload) # Improved error handling for Metabase error responses if response and isinstance(response, dict) and "error" in response: error_data = response.get("error") # Check for common error patterns in Metabase responses if isinstance(error_data, str) and "does not exist" in error_data: # This is likely a SQL syntax error from the database return {"error": "SQL Error", "message": error_data} # If the error message contains additional info message = response.get("message", "Unknown error") if isinstance(message, dict) and "data" in message: if "errors" in message["data"]: return {"error": "SQL Error", "message": message["data"]["errors"]} return response @staticmethod def _ensure_query_limit(query: str, limit: int) -> str: """Ensure the query has a LIMIT clause This is a simple implementation and may not work for all SQL dialects or complex queries. It's a basic safety measure. """ # Convert to uppercase for case-insensitive matching query_upper = query.upper() # Check if query already has a LIMIT clause if "LIMIT" in query_upper: return query # Add LIMIT clause return f"{query} LIMIT {limit}" ``` -------------------------------------------------------------------------------- /src/server/web_interface.py: -------------------------------------------------------------------------------- ```python import os from flask import Flask, request, render_template, redirect, url_for, flash, jsonify from src.config.settings import Config from src.api.metabase import MetabaseAPI def create_app(): """Create and configure the Flask application""" app = Flask(__name__, template_folder=Config.TEMPLATE_DIR) app.secret_key = os.urandom(24) @app.route('/') def home(): """Home page with configuration form""" # Don't pass the actual API key to the template # Just indicate if one is set api_key_set = bool(Config._METABASE_API_KEY) return render_template( 'config.html', metabase_url=Config.METABASE_URL, api_key="" if not api_key_set else "••••••••••••••••••••••", api_key_set=api_key_set ) @app.route('/save_config', methods=['POST']) def update_config(): """Save configuration from form""" metabase_url = request.form.get('metabase_url', '').strip() api_key = request.form.get('api_key', '').strip() if not metabase_url: flash('Metabase URL is required!') return redirect(url_for('home')) # Only update API key if it's changed (not the masked version) if "•" not in api_key: Config.save_metabase_config(metabase_url, api_key) flash('Configuration saved successfully!') else: # Only update URL if API key wasn't changed with open(Config.CONFIG_FILE, 'w') as f: f.write(f"METABASE_URL={metabase_url}\n") f.write(f"METABASE_API_KEY={Config._METABASE_API_KEY}\n") f.write(f"SECRET_KEY={Config.SECRET_KEY}\n") # Update current environment and class attribute os.environ['METABASE_URL'] = metabase_url Config.METABASE_URL = metabase_url flash('URL updated successfully!') return redirect(url_for('home')) @app.route('/test_connection', methods=['POST']) async def test_connection(): """Test connection with current or provided credentials""" metabase_url = request.form.get('metabase_url', Config.METABASE_URL).strip() api_key = request.form.get('api_key', '').strip() # Don't use the masked version if "•" in api_key: api_key = Config.get_metabase_api_key() # Temporarily update config for testing old_url = Config.METABASE_URL old_key = Config.get_metabase_api_key() Config.METABASE_URL = metabase_url try: success, message = await MetabaseAPI.test_connection() # Restore original config if not saving if 'save' not in request.form: Config.METABASE_URL = old_url return jsonify({'success': success, 'message': message}) except Exception as e: # Restore original config Config.METABASE_URL = old_url # Return error as JSON return jsonify({'success': False, 'message': f"Error: {str(e)}"}) @app.route('/test_list_databases') async def test_list_databases(): """Test the list_databases tool""" from src.tools.metabase_tools import list_databases try: # Log the current configuration print(f"Testing list_databases with URL: {Config.get_metabase_url()}") result = await list_databases() return jsonify({'success': True, 'result': result}) except Exception as e: import traceback error_traceback = traceback.format_exc() print(f"Error in test_list_databases: {str(e)}\n{error_traceback}") return jsonify({'success': False, 'error': str(e)}) @app.route('/test_get_metadata', methods=['POST']) async def test_get_metadata(): """Test the get_database_metadata tool""" from src.tools.metabase_tools import get_database_metadata database_id = request.form.get('database_id') if not database_id or not database_id.isdigit(): return jsonify({'success': False, 'error': 'Valid database ID is required'}) try: result = await get_database_metadata(int(database_id)) return jsonify({'success': True, 'result': result}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/test_list_actions') async def test_list_actions(): """Test the list_actions tool""" from src.tools.metabase_action_tools import list_actions try: # Get version info version_info = await MetabaseAPI.get_request("version") version = "unknown" if version_info and not "error" in version_info: version = version_info.get("version", "unknown") result = await list_actions() result_with_version = f"Metabase Version: {version}\n\n{result}" return jsonify({'success': True, 'result': result_with_version}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/test_get_action_details', methods=['POST']) async def test_get_action_details(): """Test the get_action_details tool""" from src.tools.metabase_action_tools import get_action_details action_id = request.form.get('action_id') if not action_id or not action_id.isdigit(): return jsonify({'success': False, 'error': 'Valid action ID is required'}) try: result = await get_action_details(int(action_id)) return jsonify({'success': True, 'result': result}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/test_execute_action', methods=['POST']) async def test_execute_action(): """Test the execute_action tool""" from src.tools.metabase_action_tools import execute_action action_id = request.form.get('action_id') if not action_id or not action_id.isdigit(): return jsonify({'success': False, 'error': 'Valid action ID is required'}) # Parse parameters from form parameters = {} for key, value in request.form.items(): if key.startswith('param_'): param_name = key[6:] # Remove 'param_' prefix parameters[param_name] = value try: result = await execute_action(int(action_id), parameters) return jsonify({'success': True, 'result': result}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/test_visualize_relationships', methods=['POST']) async def test_visualize_relationships(): """Test the visualize_database_relationships tool""" from src.tools.metabase_tools import visualize_database_relationships database_id = request.form.get('database_id') if not database_id or not database_id.isdigit(): return jsonify({'success': False, 'error': 'Valid database ID is required'}) try: result = await visualize_database_relationships(int(database_id)) return jsonify({'success': True, 'result': result}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/test_run_query', methods=['POST']) async def test_run_query(): """Test the run_database_query tool""" from src.tools.metabase_tools import run_database_query database_id = request.form.get('database_id') query = request.form.get('query') if not database_id or not database_id.isdigit(): return jsonify({'success': False, 'error': 'Valid database ID is required'}) if not query or not query.strip(): return jsonify({'success': False, 'error': 'SQL query is required'}) try: result = await run_database_query(int(database_id), query) # Check if the result contains an error message if result and isinstance(result, str) and result.startswith("Error executing query:"): return jsonify({'success': False, 'error': result}) return jsonify({'success': True, 'result': result}) except Exception as e: import traceback error_traceback = traceback.format_exc() print(f"Error in test_run_query: {str(e)}\n{error_traceback}") return jsonify({'success': False, 'error': str(e)}) @app.route('/test_db_overview', methods=['POST']) async def test_db_overview(): """Test the db_overview tool""" from src.tools.metabase_tools import db_overview database_id = request.form.get('database_id') if not database_id or not database_id.isdigit(): return jsonify({'success': False, 'error': 'Valid database ID is required'}) try: result = await db_overview(int(database_id)) return jsonify({'success': True, 'result': result}) except Exception as e: import traceback error_traceback = traceback.format_exc() print(f"Error in test_db_overview: {str(e)}\n{error_traceback}") return jsonify({'success': False, 'error': str(e)}) @app.route('/test_table_detail', methods=['POST']) async def test_table_detail(): """Test the table_detail tool""" from src.tools.metabase_tools import table_detail database_id = request.form.get('database_id') table_id = request.form.get('table_id') if not database_id or not database_id.isdigit(): return jsonify({'success': False, 'error': 'Valid database ID is required'}) if not table_id or not table_id.isdigit(): return jsonify({'success': False, 'error': 'Valid table ID is required'}) try: result = await table_detail(int(database_id), int(table_id)) return jsonify({'success': True, 'result': result}) except Exception as e: import traceback error_traceback = traceback.format_exc() print(f"Error in test_table_detail: {str(e)}\n{error_traceback}") return jsonify({'success': False, 'error': str(e)}) return app if __name__ == '__main__': app = create_app() port = int(os.environ.get('FLASK_PORT', 5000)) # Use a Config attribute for debug for consistency, e.g., Config.FLASK_DEBUG # Ensure FLASK_DEBUG is set appropriately in your .env or config settings app.run(host='0.0.0.0', port=port, debug=getattr(Config, 'FLASK_DEBUG', False)) ``` -------------------------------------------------------------------------------- /src/tools/metabase_tools.py: -------------------------------------------------------------------------------- ```python from src.api.metabase import MetabaseAPI from src.config.settings import Config async def list_databases() -> str: """ List all databases configured in Metabase. Returns: A formatted string with information about all configured databases. """ response = await MetabaseAPI.get_databases() # Handle different response types if isinstance(response, str): return f"Error: Received unexpected string response: {response}" if response is None: return "Error: No response received from Metabase API" if isinstance(response, dict) and "error" in response: return f"Error fetching databases: {response.get('message', 'Unknown error')}" # If we got a dictionary without an error, try to extract databases if isinstance(response, dict) and not isinstance(response, list): # Try to find databases in common locations if 'data' in response: response = response['data'] elif 'databases' in response: response = response['databases'] elif 'results' in response: response = response['results'] else: return f"Error: Unexpected response format: {response}" if not response: return "No databases found in Metabase." if not isinstance(response, list): return f"Error: Expected a list of databases, but got {type(response).__name__}: {response}" # Now we should have a list of databases result = "## Databases in Metabase\n\n" for db in response: # Check if each item is a dictionary if not isinstance(db, dict): result += f"- Warning: Found non-dictionary item: {db}\n\n" continue # Safely extract values with fallbacks db_id = db.get('id', 'Unknown') db_name = db.get('name', 'Unnamed') db_engine = db.get('engine', 'Unknown') db_created = db.get('created_at', 'Unknown') result += f"- **ID**: {db_id}\n" result += f" **Name**: {db_name}\n" result += f" **Engine**: {db_engine}\n" result += f" **Created At**: {db_created}\n\n" return result async def get_database_metadata(database_id: int) -> str: """ Get metadata for a specific database in Metabase, including table relationships. Args: database_id: The ID of the database to fetch metadata for Returns: A formatted string with the database's metadata including tables, fields, and relationships. """ response = await MetabaseAPI.get_database_schema(database_id) if response is None or "error" in response: return f"Error fetching database metadata: {response.get('message', 'Unknown error')}" result = f"## Metadata for Database: {response.get('name')}\n\n" # Add database details result += f"**ID**: {response.get('id')}\n" result += f"**Engine**: {response.get('engine')}\n" result += f"**Is Sample**: {response.get('is_sample', False)}\n\n" # Add tables information tables = response.get('tables', []) result += f"### Tables ({len(tables)})\n\n" # Create a map of table IDs to names for reference table_map = {table.get('id'): table.get('name') for table in tables} for table in tables: result += f"#### {table.get('name')}\n" result += f"**ID**: {table.get('id')}\n" result += f"**Schema**: {table.get('schema', 'N/A')}\n" result += f"**Description**: {table.get('description', 'No description')}\n\n" # Add fields for this table fields = table.get('fields', []) result += f"##### Fields ({len(fields)})\n\n" # Track foreign keys for relationship section foreign_keys = [] for field in fields: result += f"- **{field.get('name')}**\n" result += f" - Type: {field.get('base_type')}\n" result += f" - Description: {field.get('description', 'No description')}\n" # Check if this is a foreign key fk_target_field_id = field.get('fk_target_field_id') if fk_target_field_id: foreign_keys.append({ 'source_field': field.get('name'), 'source_field_id': field.get('id'), 'target_field_id': fk_target_field_id }) result += f" - **Foreign Key** to another table\n" if field.get('special_type'): result += f" - Special Type: {field.get('special_type')}\n" # Add relationships section if there are foreign keys if foreign_keys: result += "\n##### Relationships\n\n" for fk in foreign_keys: # Find target field information target_field_info = "Unknown field" target_table_name = "Unknown table" # Search all tables for the target field for t in tables: for f in t.get('fields', []): if f.get('id') == fk['target_field_id']: target_field_info = f.get('name') target_table_name = t.get('name') break result += f"- **{fk['source_field']}** → **{target_table_name}.{target_field_info}**\n" result += "\n" # Add a visual representation of relationships result += "### Database Relationships\n\n" result += "```\n" # Create a simple text-based diagram of relationships for table in tables: table_name = table.get('name') result += f"{table_name}\n" for field in table.get('fields', []): fk_target_field_id = field.get('fk_target_field_id') if fk_target_field_id: # Find target field information for t in tables: for f in t.get('fields', []): if f.get('id') == fk_target_field_id: target_field = f.get('name') target_table = t.get('name') result += f" └── {field.get('name')} → {target_table}.{target_field}\n" result += "\n" result += "```\n" return result async def visualize_database_relationships(database_id: int) -> str: """ Generate a visual representation of database relationships. Args: database_id: The ID of the database to visualize Returns: A formatted string with a visualization of table relationships. """ response = await MetabaseAPI.get_database_schema(database_id) if response is None or "error" in response: return f"Error fetching database schema: {response.get('message', 'Unknown error')}" tables = response.get('tables', []) if not tables: return "No tables found in this database." result = f"## Database Relationship Diagram for: {response.get('name')}\n\n" # Generate a text-based ER diagram result += "```\n" # First list all tables result += "Tables:\n" for table in tables: result += f" {table.get('name')}\n" result += "\nRelationships:\n" # Then show all relationships for table in tables: table_name = table.get('name') for field in table.get('fields', []): fk_target_field_id = field.get('fk_target_field_id') if fk_target_field_id: # Find target field information for t in tables: for f in t.get('fields', []): if f.get('id') == fk_target_field_id: target_field = f.get('name') target_table = t.get('name') result += f" {table_name}.{field.get('name')} → {target_table}.{target_field}\n" result += "```\n\n" # Add a more detailed description of each relationship result += "### Detailed Relationships\n\n" for table in tables: table_name = table.get('name') has_relationships = False for field in table.get('fields', []): fk_target_field_id = field.get('fk_target_field_id') if fk_target_field_id: if not has_relationships: result += f"**{table_name}** has the following relationships:\n\n" has_relationships = True # Find target field information for t in tables: for f in t.get('fields', []): if f.get('id') == fk_target_field_id: target_field = f.get('name') target_table = t.get('name') result += f"- Field **{field.get('name')}** references **{target_table}.{target_field}**\n" if has_relationships: result += "\n" return result async def run_database_query(database_id: int, query: str) -> str: """ Run a read-only SQL query against a database and return the first 5 rows. Args: database_id: The ID of the database to query query: The SQL query to execute (will be limited to 5 rows) Returns: A formatted string with the query results or error message """ # Execute the query with a limit of 5 rows response = await MetabaseAPI.run_query(database_id, query, row_limit=5) if response is None: return "Error: No response received from Metabase API" if isinstance(response, dict) and "error" in response: # Extract more detailed error information if available error_message = response.get('message', 'Unknown error') # Try to extract structured error info if isinstance(error_message, dict) and 'data' in error_message: data = error_message.get('data', {}) if 'errors' in data: return f"Error executing query: {data['errors']}" # Handle different error formats from Metabase if isinstance(error_message, str) and "does not exist" in error_message: return f"Error executing query: {error_message}" # If it's a raw JSON string representation, try to parse it if isinstance(error_message, str) and error_message.startswith('{'): try: import json error_json = json.loads(error_message) if 'data' in error_json and 'errors' in error_json['data']: return f"Error executing query: {error_json['data']['errors']}" except: pass return f"Error executing query: {error_message}" # Format the results result = f"## Query Results\n\n" result += f"```sql\n{query}\n```\n\n" # Extract and format the data try: # Get column names if "data" in response and "cols" in response["data"]: columns = [col.get("name", f"Column {i}") for i, col in enumerate(response["data"]["cols"])] # Get rows (limited to 5) rows = [] if "data" in response and "rows" in response["data"]: rows = response["data"]["rows"][:5] # Format as a table if columns and rows: # Add header result += "| " + " | ".join(columns) + " |\n" result += "| " + " | ".join(["---"] * len(columns)) + " |\n" # Add rows for row in rows: result += "| " + " | ".join([str(cell) for cell in row]) + " |\n" # Add row count info total_row_count = response.get("row_count", len(rows)) if total_row_count > 5: result += f"\n*Showing 5 of {total_row_count} rows*\n" else: result += "No data returned by the query.\n" else: result += "No data structure found in the response.\n" result += f"Raw response: {response}\n" except Exception as e: result += f"Error formatting results: {str(e)}\n" result += f"Raw response: {response}\n" return result async def db_overview(database_id: int) -> str: """ Get an overview of all tables in a database without detailed field information. Args: database_id: The ID of the database to get the overview for Returns: A formatted string with basic information about all tables in the database. """ response = await MetabaseAPI.get_database_schema(database_id) if response is None or "error" in response: return f"Error fetching database schema: {response.get('message', 'Unknown error')}" tables = response.get('tables', []) if not tables: return "No tables found in this database." result = f"## Database Overview: {response.get('name')}\n\n" # Add database details result += f"**ID**: {response.get('id')}\n" result += f"**Engine**: {response.get('engine')}\n" result += f"**Is Sample**: {response.get('is_sample', False)}\n\n" # Add tables information in a tabular format result += "### Tables\n\n" # Create markdown table header with Table ID result += "| Table ID | Table Name | Schema | Description | # of Fields |\n" result += "| -------- | ---------- | ------ | ----------- | ----------- |\n" # Add each table as a row for table in tables: table_id = table.get('id', 'Unknown') name = table.get('name', 'Unknown') schema = table.get('schema', 'N/A') description = table.get('description', 'No description') field_count = len(table.get('fields', [])) # Add null check before using string methods if description is None: description = 'No description' else: # Clean up description for table display (remove newlines but keep full text) description = description.replace('\n', ' ').strip() result += f"| {table_id} | {name} | {schema} | {description} | {field_count} |\n" return result async def table_detail(database_id: int, table_id: int) -> str: """ Get detailed information about a specific table. Args: database_id: The ID of the database containing the table table_id: The ID of the table to get details for Returns: A formatted string with detailed information about the table. """ # Directly fetch metadata for the specific table response = await MetabaseAPI.get_table_metadata(table_id) if response is None or "error" in response: return f"Error fetching table metadata: {response.get('message', 'Unknown error')}" # Extract table information table_name = response.get('name', 'Unknown') result = f"## Table Details: {table_name}\n\n" # Add table details result += f"**ID**: {response.get('id')}\n" result += f"**Schema**: {response.get('schema', 'N/A')}\n" description = response.get('description', 'No description') if description is None: description = 'No description' result += f"**Description**: {description}\n\n" # Add fields section fields = response.get('fields', []) result += f"### Fields ({len(fields)})\n\n" # Create markdown table for fields result += "| Field ID | Field Name | Type | Description | Special Type |\n" result += "| -------- | ---------- | ---- | ----------- | ------------ |\n" # Track foreign keys for relationship section foreign_keys = [] for field in fields: field_id = field.get('id', 'Unknown') name = field.get('name', 'Unknown') field_type = field.get('base_type', 'Unknown') description = field.get('description', 'No description') special_type = field.get('special_type', 'None') # Add null check before using string methods if description is None: description = 'No description' else: # Clean up description for table display (remove newlines but keep full text) description = description.replace('\n', ' ').strip() result += f"| {field_id} | {name} | {field_type} | {description} | {special_type} |\n" # Check if this is a foreign key fk_target_field_id = field.get('fk_target_field_id') if fk_target_field_id: foreign_keys.append({ 'source_field': field.get('name'), 'source_field_id': field.get('id'), 'target_field_id': fk_target_field_id }) # Add relationships section if there are foreign keys if foreign_keys: result += "\n### Relationships\n\n" for fk in foreign_keys: # We'll need to fetch target field information target_field = await MetabaseAPI.get_field_metadata(fk['target_field_id']) if target_field and not "error" in target_field: target_field_name = target_field.get('name', 'Unknown field') target_table_id = target_field.get('table_id') # Get target table information target_table = await MetabaseAPI.get_table_metadata(target_table_id) target_table_name = target_table.get('name', 'Unknown table') if target_table else 'Unknown table' result += f"- **{fk['source_field']}** → **{target_table_name}.{target_field_name}** (Table ID: {target_table_id})\n" else: result += f"- **{fk['source_field']}** → **Unknown reference** (Target Field ID: {fk['target_field_id']})\n" # For "Referenced By" section, we would need to search through other tables # For now, let's note that this would require additional API calls result += "\n### Referenced By\n\n" result += "*Note: To see all references to this table, use the database visualization tool.*\n" return result ``` -------------------------------------------------------------------------------- /templates/config.html: -------------------------------------------------------------------------------- ```html <!DOCTYPE html> <html> <head> <title>Metabase MCP Configuration</title> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/github-markdown.min.css"> <script src="https://cdn.jsdelivr.net/npm/[email protected]/marked.min.js"></script> <style> body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; } .form-group { margin-bottom: 15px; } label { display: block; margin-bottom: 5px; font-weight: bold; } input[type="text"] { width: 100%; padding: 8px; box-sizing: border-box; } button { background-color: #4CAF50; color: white; padding: 10px 15px; border: none; cursor: pointer; margin-right: 10px; } button:hover { background-color: #45a049; } .test-button { background-color: #2196F3; } .test-button:hover { background-color: #0b7dda; } .message { margin: 15px 0; padding: 10px; border-radius: 5px; } .success { background-color: #d4edda; color: #155724; } .error { background-color: #f8d7da; color: #721c24; } .result-area { margin: 20px 0; padding:.875rem; background: #f5f5f5; border-radius: 4px; border: 1px solid #ddd; min-height: 100px; max-height: 400px; overflow-y: auto; white-space: pre-wrap; display: none; } .section { margin-top: 30px; border-top: 1px solid #ddd; padding-top: 20px; } textarea.form-control { width: 100%; padding: 8px; box-sizing: border-box; font-family: monospace; border: 1px solid #ccc; border-radius: 4px; } .markdown-body { box-sizing: border-box; min-width: 200px; max-width: 100%; padding: 15px; background-color: #fff; border-radius: 4px; border: 1px solid #ddd; color: #24292e; font-weight: normal; font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Helvetica, Arial, sans-serif; } .markdown-body h1, .markdown-body h2, .markdown-body h3, .markdown-body h4 { color: #24292e; font-weight: 600; margin-top: 24px; margin-bottom: 16px; } .markdown-body table { display: table; width: 100%; border-collapse: collapse; margin-bottom: 16px; border: 1px solid #ddd; } .markdown-body table th { font-weight: 600; padding: 8px 13px; border: 1px solid #ddd; background-color: #f6f8fa; color: #24292e; } .markdown-body table td { padding: 8px 13px; border: 1px solid #ddd; color: #24292e; } .markdown-body table tr { background-color: #fff; border-top: 1px solid #c6cbd1; } .markdown-body table tr:nth-child(2n) { background-color: #f6f8fa; } .markdown-body p, .markdown-body ul, .markdown-body ol, .markdown-body blockquote { color: #24292e; margin-bottom: 16px; } .markdown-body pre, .markdown-body code { background-color: #f6f8fa; border-radius: 3px; padding: 0.2em 0.4em; color: #24292e; font-family: SFMono-Regular, Consolas, "Liberation Mono", Menlo, monospace; } .result-container { margin-top: 15px; } </style> </head> <body> <h1>Metabase MCP Configuration</h1> {% with messages = get_flashed_messages() %} {% if messages %} <div class="message {% if 'error' in messages[0].lower() %}error{% else %}success{% endif %}"> {{ messages[0] }} </div> {% endif %} {% endwith %} <form method="post" action="/save_config" id="config-form"> <div class="form-group"> <label for="metabase_url">Metabase URL:</label> <input type="text" id="metabase_url" name="metabase_url" value="{{ metabase_url }}" placeholder="http://localhost:3000"> </div> <div class="form-group"> <label for="api_key">API Key:</label> <input type="password" id="api_key" name="api_key" class="form-control" value="{{ api_key }}" placeholder="{{ 'API key is set' if api_key_set else 'Enter your Metabase API key' }}" required> <small class="form-text text-muted"> {% if api_key_set %} Your API key is stored encrypted. Enter a new key to change it. {% else %} Your API key will be stored encrypted and never displayed in plain text. {% endif %} </small> </div> <div class="form-group"> <button type="submit">Save Configuration</button> <button type="button" class="test-button" id="test-connection">Test Connection</button> </div> </form> <div id="connection-result" class="result-area"></div> <div class="section"> <h2>Test Tools</h2> <h3>List Databases</h3> <button type="button" id="test-list-databases">Test List Databases</button> <div id="list-databases-result" class="result-area"></div> <h3>Get Database Metadata</h3> <div class="form-group"> <label for="database_id">Database ID:</label> <input type="text" id="database_id" placeholder="Enter database ID"> </div> <button type="button" id="test-get-metadata">Test Get Metadata</button> <div id="get-metadata-result" class="result-area"></div> <h3>DB Overview</h3> <div class="form-group"> <label for="db_overview_database_id">Database ID:</label> <input type="text" id="db_overview_database_id" placeholder="Enter database ID"> </div> <button type="button" id="test-db-overview">Get Database Overview</button> <div id="db-overview-result" class="result-area"></div> <h3>Table Detail</h3> <div class="form-group"> <label for="table_detail_database_id">Database ID:</label> <input type="text" id="table_detail_database_id" placeholder="Enter database ID"> </div> <div class="form-group"> <label for="table_detail_table_id">Table ID:</label> <input type="text" id="table_detail_table_id" placeholder="Enter table ID (from DB Overview)"> </div> <button type="button" id="test-table-detail">Get Table Detail</button> <div id="table-detail-result" class="result-area"></div> <h3>Visualize Database Relationships</h3> <div class="form-group"> <label for="relationship_database_id">Database ID:</label> <input type="text" id="relationship_database_id" placeholder="Enter database ID"> </div> <button type="button" id="test-visualize-relationships">Visualize Relationships</button> <div id="visualize-relationships-result" class="result-area"></div> <h3>Run Database Query</h3> <div class="form-group"> <label for="query_database_id">Database ID:</label> <input type="text" id="query_database_id" placeholder="Enter database ID"> </div> <div class="form-group"> <label for="sql_query">SQL Query:</label> <textarea id="sql_query" placeholder="Enter SQL query" rows="4" class="form-control"></textarea> <small class="form-text text-muted">Query will be limited to returning 5 rows for safety.</small> </div> <button type="button" id="test-run-query">Run Query</button> <div id="run-query-result" class="result-area"></div> </div> <div class="section"> <h3>List Actions</h3> <button type="button" id="test-list-actions">Test List Actions</button> <div id="list-actions-result" class="result-area"></div> <h3>Get Action Details</h3> <div class="form-group"> <label for="action_id">Action ID:</label> <input type="text" id="action_id" placeholder="Enter action ID"> </div> <button type="button" id="test-get-action">Test Get Action Details</button> <div id="get-action-result" class="result-area"></div> <h3>Execute Action</h3> <div class="form-group"> <label for="exec_action_id">Action ID:</label> <input type="text" id="exec_action_id" placeholder="Enter action ID"> </div> <button type="button" id="load-action-params">Load Parameters</button> <div id="action-parameters" class="form-group"></div> <button type="button" id="test-execute-action">Execute Action</button> <div id="execute-action-result" class="result-area"></div> </div> <script> // Configure marked.js options marked.setOptions({ gfm: true, breaks: true, tables: true, sanitize: false }); // Function to format response data with proper Markdown rendering function formatResponse(container, data) { if (data.success) { // Check if response has result or message property const content = data.result || data.message || ''; // Only try to parse as markdown if it's not empty if (content) { container.innerHTML = '<div class="markdown-body">' + marked.parse(content) + '</div>'; } else { container.innerHTML = '<div class="markdown-body">Operation completed successfully.</div>'; } // Add success class if not already present if (!container.className.includes('success')) { container.className += ' success'; } } else { const errorMsg = data.error || data.message || 'Unknown error'; container.innerHTML = '<div class="error">' + errorMsg + '</div>'; // Add error class if not already present if (!container.className.includes('error')) { container.className += ' error'; } } } // Test connection document.getElementById('test-connection').addEventListener('click', function() { const url = document.getElementById('metabase_url').value; const apiKey = document.getElementById('api_key').value; const resultArea = document.getElementById('connection-result'); resultArea.style.display = 'block'; resultArea.innerHTML = 'Testing connection...'; fetch('/test_connection', { method: 'POST', headers: {'Content-Type': 'application/x-www-form-urlencoded'}, body: `metabase_url=${encodeURIComponent(url)}&api_key=${encodeURIComponent(apiKey)}` }) .then(response => response.json()) .then(data => { formatResponse(resultArea, data); }) .catch(error => { resultArea.className = 'result-area error'; resultArea.innerHTML = `Error: ${error}`; }); }); // Test list databases document.getElementById('test-list-databases').addEventListener('click', function() { const resultArea = document.getElementById('list-databases-result'); resultArea.style.display = 'block'; resultArea.innerHTML = 'Fetching databases...'; fetch('/test_list_databases') .then(response => response.json()) .then(data => { formatResponse(resultArea, data); }) .catch(error => { resultArea.className = 'result-area error'; resultArea.innerHTML = `Error: ${error}`; }); }); // Test get metadata document.getElementById('test-get-metadata').addEventListener('click', function() { const databaseId = document.getElementById('database_id').value; const resultArea = document.getElementById('get-metadata-result'); if (!databaseId) { resultArea.style.display = 'block'; resultArea.className = 'result-area error'; resultArea.innerHTML = 'Please enter a database ID'; return; } resultArea.style.display = 'block'; resultArea.innerHTML = 'Fetching metadata...'; fetch('/test_get_metadata', { method: 'POST', headers: {'Content-Type': 'application/x-www-form-urlencoded'}, body: `database_id=${encodeURIComponent(databaseId)}` }) .then(response => response.json()) .then(data => { formatResponse(resultArea, data); }) .catch(error => { resultArea.className = 'result-area error'; resultArea.innerHTML = `Error: ${error}`; }); }); // Test list actions document.getElementById('test-list-actions').addEventListener('click', function() { const resultArea = document.getElementById('list-actions-result'); resultArea.style.display = 'block'; resultArea.innerHTML = 'Fetching actions...'; fetch('/test_list_actions') .then(response => response.json()) .then(data => { formatResponse(resultArea, data); }) .catch(error => { resultArea.className = 'result-area error'; resultArea.innerHTML = `Error: ${error}`; }); }); // Test get action details document.getElementById('test-get-action').addEventListener('click', function() { const actionId = document.getElementById('action_id').value; const resultArea = document.getElementById('get-action-result'); if (!actionId) { resultArea.style.display = 'block'; resultArea.className = 'result-area error'; resultArea.innerHTML = 'Please enter an action ID'; return; } resultArea.style.display = 'block'; resultArea.innerHTML = 'Fetching action details...'; fetch('/test_get_action_details', { method: 'POST', headers: {'Content-Type': 'application/x-www-form-urlencoded'}, body: `action_id=${encodeURIComponent(actionId)}` }) .then(response => response.json()) .then(data => { formatResponse(resultArea, data); }) .catch(error => { resultArea.className = 'result-area error'; resultArea.innerHTML = `Error: ${error}`; }); }); // Load action parameters document.getElementById('load-action-params').addEventListener('click', function() { const actionId = document.getElementById('exec_action_id').value; const paramsContainer = document.getElementById('action-parameters'); if (!actionId) { alert('Please enter an action ID'); return; } paramsContainer.innerHTML = 'Loading parameters...'; fetch('/test_get_action_details', { method: 'POST', headers: {'Content-Type': 'application/x-www-form-urlencoded'}, body: `action_id=${encodeURIComponent(actionId)}` }) .then(response => response.json()) .then(data => { if (data.success) { // Parse the markdown to extract parameters const paramSection = data.result.split('### Parameters')[1]; if (!paramSection) { paramsContainer.innerHTML = 'No parameters found for this action.'; return; } // Clear container paramsContainer.innerHTML = ''; // Extract parameter names from the markdown const paramLines = paramSection.split('\n'); let paramCount = 0; for (const line of paramLines) { if (line.startsWith('- **')) { const paramName = line.split('**:')[0].replace('- **', '').trim(); const formGroup = document.createElement('div'); formGroup.className = 'form-group'; const label = document.createElement('label'); label.setAttribute('for', `param_${paramName}`); label.textContent = `Parameter: ${paramName}`; const input = document.createElement('input'); input.type = 'text'; input.id = `param_${paramName}`; input.name = `param_${paramName}`; input.placeholder = `Enter value for ${paramName}`; formGroup.appendChild(label); formGroup.appendChild(input); paramsContainer.appendChild(formGroup); paramCount++; } } if (paramCount === 0) { paramsContainer.innerHTML = 'No parameters found for this action.'; } } else { paramsContainer.innerHTML = `Error: ${data.error}`; } }) .catch(error => { paramsContainer.innerHTML = `Error: ${error}`; }); }); // Execute action document.getElementById('test-execute-action').addEventListener('click', function() { const actionId = document.getElementById('exec_action_id').value; const resultArea = document.getElementById('execute-action-result'); const paramsContainer = document.getElementById('action-parameters'); if (!actionId) { resultArea.style.display = 'block'; resultArea.className = 'result-area error'; resultArea.innerHTML = 'Please enter an action ID'; return; } // Collect parameters const formData = new FormData(); formData.append('action_id', actionId); // Add all input fields from the parameters container const inputs = paramsContainer.querySelectorAll('input'); inputs.forEach(input => { if (input.name && input.value) { formData.append(input.name, input.value); } }); resultArea.style.display = 'block'; resultArea.innerHTML = 'Executing action...'; fetch('/test_execute_action', { method: 'POST', body: formData }) .then(response => response.json()) .then(data => { formatResponse(resultArea, data); }) .catch(error => { resultArea.className = 'result-area error'; resultArea.innerHTML = `Error: ${error}`; }); }); // Test visualize relationships document.getElementById('test-visualize-relationships').addEventListener('click', function() { const databaseId = document.getElementById('relationship_database_id').value; if (!databaseId) { alert('Please enter a database ID'); return; } const resultArea = document.getElementById('visualize-relationships-result'); resultArea.style.display = 'block'; resultArea.innerHTML = 'Visualizing relationships...'; fetch('/test_visualize_relationships', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded', }, body: `database_id=${databaseId}` }) .then(response => response.json()) .then(data => { formatResponse(resultArea, data); }) .catch(error => { resultArea.innerHTML = `<div class="error">Error: ${error.message}</div>`; }); }); // Test run query document.getElementById('test-run-query').addEventListener('click', function() { const databaseId = document.getElementById('query_database_id').value; const sqlQuery = document.getElementById('sql_query').value; if (!databaseId || !sqlQuery) { alert('Please enter both database ID and SQL query'); return; } const resultArea = document.getElementById('run-query-result'); resultArea.style.display = 'block'; resultArea.innerHTML = 'Executing query...'; fetch('/test_run_query', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded', }, body: `database_id=${encodeURIComponent(databaseId)}&query=${encodeURIComponent(sqlQuery)}` }) .then(response => response.json()) .then(data => { formatResponse(resultArea, data); }) .catch(error => { resultArea.innerHTML = `<div class="error">Error: ${error.message}</div>`; }); }); // Test DB Overview document.getElementById('test-db-overview').addEventListener('click', function() { const databaseId = document.getElementById('db_overview_database_id').value; if (!databaseId) { alert('Please enter a database ID'); return; } const resultArea = document.getElementById('db-overview-result'); resultArea.style.display = 'block'; resultArea.innerHTML = 'Loading database overview...'; fetch('/test_db_overview', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded', }, body: `database_id=${databaseId}` }) .then(response => response.json()) .then(data => { formatResponse(resultArea, data); }) .catch(error => { resultArea.innerHTML = `<div class="error">Error: ${error.message}</div>`; }); }); // Test Table Detail document.getElementById('test-table-detail').addEventListener('click', function() { const databaseId = document.getElementById('table_detail_database_id').value; const tableId = document.getElementById('table_detail_table_id').value; if (!databaseId || !tableId) { alert('Please enter both database ID and table ID'); return; } const resultArea = document.getElementById('table-detail-result'); resultArea.style.display = 'block'; resultArea.innerHTML = 'Loading table details...'; fetch('/test_table_detail', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded', }, body: `database_id=${encodeURIComponent(databaseId)}&table_id=${encodeURIComponent(tableId)}` }) .then(response => response.json()) .then(data => { formatResponse(resultArea, data); }) .catch(error => { resultArea.innerHTML = `<div class="error">Error: ${error.message}</div>`; }); }); </script> </body> </html> ```