#
tokens: 19111/50000 4/56 files (page 2/3)
lines: off (toggle) GitHub
raw markdown copy
This is page 2 of 3. Use http://codebase.md/mammothgrowth/dbt-cli-mcp?page={x} to view the full context.

# Directory Structure

```
├── .gitignore
├── .gitmodules
├── .python-version
├── docs
│   ├── dbt_cheat_sheet.md
│   ├── dbt_mcp_guide.md
│   ├── llm_guide_to_mcp.md
│   └── python_fastMCP.md
├── integration_tests
│   ├── __init__.py
│   ├── common.py
│   ├── run_all.py
│   ├── test_dbt_build.py
│   ├── test_dbt_compile.py
│   ├── test_dbt_debug.py
│   ├── test_dbt_deps.py
│   ├── test_dbt_ls.py
│   ├── test_dbt_run.py
│   ├── test_dbt_seed.py
│   ├── test_dbt_show.py
│   └── test_dbt_test.py
├── LICENSE
├── mcp_architect_instructions
│   ├── examples
│   │   ├── planning_example.md
│   │   ├── task_example.md
│   │   └── weather_mcp_example.md
│   ├── GETTING_STARTED.md
│   ├── guides
│   │   ├── environment_setup_guide.md
│   │   ├── implementation_guide.md
│   │   ├── logging_guide.md
│   │   ├── project_structure_guide.md
│   │   ├── reference_guide.md
│   │   ├── registration_guide.md
│   │   └── testing_guide.md
│   ├── mcp_architecture_instructions.md
│   ├── planning
│   │   └── work_progress_log.md
│   ├── README.md
│   └── templates
│       ├── implementation_plan_template.md
│       ├── requirements_questionnaire.md
│       ├── task_template.md
│       └── work_progress_log_template.md
├── pyproject.toml
├── README.md
├── src
│   ├── __init__.py
│   ├── cli.py
│   ├── command.py
│   ├── config.py
│   ├── formatters.py
│   ├── server.py
│   └── tools.py
└── tests
    ├── __init__.py
    ├── mock_responses
    │   ├── debug.json
    │   ├── ls.json
    │   ├── run.json
    │   └── test.json
    ├── test_command.py
    ├── test_config.py
    ├── test_formatters.py
    ├── test_sql_security.py
    └── test_tools.py
```

# Files

--------------------------------------------------------------------------------
/docs/python_fastMCP.md:
--------------------------------------------------------------------------------

```markdown
# MCP Python SDK

<div align="center">

<strong>Python implementation of the Model Context Protocol (MCP)</strong>

[![PyPI][pypi-badge]][pypi-url]
[![MIT licensed][mit-badge]][mit-url]
[![Python Version][python-badge]][python-url]
[![Documentation][docs-badge]][docs-url]
[![Specification][spec-badge]][spec-url]
[![GitHub Discussions][discussions-badge]][discussions-url]

</div>

<!-- omit in toc -->
## Table of Contents

- [Overview](#overview)
- [Installation](#installation)
- [Quickstart](#quickstart)
- [What is MCP?](#what-is-mcp)
- [Core Concepts](#core-concepts)
  - [Server](#server)
  - [Resources](#resources)
  - [Tools](#tools)
  - [Prompts](#prompts)
  - [Images](#images)
  - [Context](#context)
- [Running Your Server](#running-your-server)
  - [Development Mode](#development-mode)
  - [Claude Desktop Integration](#claude-desktop-integration)
  - [Direct Execution](#direct-execution)
- [Examples](#examples)
  - [Echo Server](#echo-server)
  - [SQLite Explorer](#sqlite-explorer)
- [Advanced Usage](#advanced-usage)
  - [Low-Level Server](#low-level-server)
  - [Writing MCP Clients](#writing-mcp-clients)
  - [MCP Primitives](#mcp-primitives)
  - [Server Capabilities](#server-capabilities)
- [Documentation](#documentation)
- [Contributing](#contributing)
- [License](#license)

[pypi-badge]: https://img.shields.io/pypi/v/mcp.svg
[pypi-url]: https://pypi.org/project/mcp/
[mit-badge]: https://img.shields.io/pypi/l/mcp.svg
[mit-url]: https://github.com/modelcontextprotocol/python-sdk/blob/main/LICENSE
[python-badge]: https://img.shields.io/pypi/pyversions/mcp.svg
[python-url]: https://www.python.org/downloads/
[docs-badge]: https://img.shields.io/badge/docs-modelcontextprotocol.io-blue.svg
[docs-url]: https://modelcontextprotocol.io
[spec-badge]: https://img.shields.io/badge/spec-spec.modelcontextprotocol.io-blue.svg
[spec-url]: https://spec.modelcontextprotocol.io
[discussions-badge]: https://img.shields.io/github/discussions/modelcontextprotocol/python-sdk
[discussions-url]: https://github.com/modelcontextprotocol/python-sdk/discussions

## Overview

The Model Context Protocol allows applications to provide context for LLMs in a standardized way, separating the concerns of providing context from the actual LLM interaction. This Python SDK implements the full MCP specification, making it easy to:

- Build MCP clients that can connect to any MCP server
- Create MCP servers that expose resources, prompts and tools
- Use standard transports like stdio and SSE
- Handle all MCP protocol messages and lifecycle events

## Installation

We recommend using [uv](https://docs.astral.sh/uv/) to manage your Python projects:

```bash
uv add "mcp[cli]"
```

Alternatively:
```bash
pip install mcp
```

## Quickstart

Let's create a simple MCP server that exposes a calculator tool and some data:

```python
# server.py
from mcp.server.fastmcp import FastMCP

# Create an MCP server
mcp = FastMCP("Demo")

# Add an addition tool
@mcp.tool()
def add(a: int, b: int) -> int:
    """Add two numbers"""
    return a + b

# Add a dynamic greeting resource
@mcp.resource("greeting://{name}")
def get_greeting(name: str) -> str:
    """Get a personalized greeting"""
    return f"Hello, {name}!"
```

You can install this server in [Claude Desktop](https://claude.ai/download) and interact with it right away by running:
```bash
mcp install server.py
```

Alternatively, you can test it with the MCP Inspector:
```bash
mcp dev server.py
```

## What is MCP?

The [Model Context Protocol (MCP)](https://modelcontextprotocol.io) lets you build servers that expose data and functionality to LLM applications in a secure, standardized way. Think of it like a web API, but specifically designed for LLM interactions. MCP servers can:

- Expose data through **Resources** (think of these sort of like GET endpoints; they are used to load information into the LLM's context)
- Provide functionality through **Tools** (sort of like POST endpoints; they are used to execute code or otherwise produce a side effect)
- Define interaction patterns through **Prompts** (reusable templates for LLM interactions)
- And more!

## Core Concepts

### Server

The FastMCP server is your core interface to the MCP protocol. It handles connection management, protocol compliance, and message routing:

```python
# Add lifespan support for startup/shutdown with strong typing
from dataclasses import dataclass
from typing import AsyncIterator
from mcp.server.fastmcp import FastMCP

# Create a named server
mcp = FastMCP("My App")

# Specify dependencies for deployment and development
mcp = FastMCP("My App", dependencies=["pandas", "numpy"])

@dataclass
class AppContext:
    db: Database  # Replace with your actual DB type

@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
    """Manage application lifecycle with type-safe context"""
    try:
        # Initialize on startup
        await db.connect()
        yield AppContext(db=db)
    finally:
        # Cleanup on shutdown
        await db.disconnect()

# Pass lifespan to server
mcp = FastMCP("My App", lifespan=app_lifespan)

# Access type-safe lifespan context in tools
@mcp.tool()
def query_db(ctx: Context) -> str:
    """Tool that uses initialized resources"""
    db = ctx.request_context.lifespan_context["db"]
    return db.query()
```

### Resources

Resources are how you expose data to LLMs. They're similar to GET endpoints in a REST API - they provide data but shouldn't perform significant computation or have side effects:

```python
@mcp.resource("config://app")
def get_config() -> str:
    """Static configuration data"""
    return "App configuration here"

@mcp.resource("users://{user_id}/profile")
def get_user_profile(user_id: str) -> str:
    """Dynamic user data"""
    return f"Profile data for user {user_id}"
```

### Tools

Tools let LLMs take actions through your server. Unlike resources, tools are expected to perform computation and have side effects:

```python
@mcp.tool()
def calculate_bmi(weight_kg: float, height_m: float) -> float:
    """Calculate BMI given weight in kg and height in meters"""
    return weight_kg / (height_m ** 2)

@mcp.tool()
async def fetch_weather(city: str) -> str:
    """Fetch current weather for a city"""
    async with httpx.AsyncClient() as client:
        response = await client.get(f"https://api.weather.com/{city}")
        return response.text
```

### Prompts

Prompts are reusable templates that help LLMs interact with your server effectively:

```python
@mcp.prompt()
def review_code(code: str) -> str:
    return f"Please review this code:\n\n{code}"

@mcp.prompt()
def debug_error(error: str) -> list[Message]:
    return [
        UserMessage("I'm seeing this error:"),
        UserMessage(error),
        AssistantMessage("I'll help debug that. What have you tried so far?")
    ]
```

### Images

FastMCP provides an `Image` class that automatically handles image data:

```python
from mcp.server.fastmcp import FastMCP, Image
from PIL import Image as PILImage

@mcp.tool()
def create_thumbnail(image_path: str) -> Image:
    """Create a thumbnail from an image"""
    img = PILImage.open(image_path)
    img.thumbnail((100, 100))
    return Image(data=img.tobytes(), format="png")
```

### Context

The Context object gives your tools and resources access to MCP capabilities:

```python
from mcp.server.fastmcp import FastMCP, Context

@mcp.tool()
async def long_task(files: list[str], ctx: Context) -> str:
    """Process multiple files with progress tracking"""
    for i, file in enumerate(files):
        ctx.info(f"Processing {file}")
        await ctx.report_progress(i, len(files))
        data, mime_type = await ctx.read_resource(f"file://{file}")
    return "Processing complete"
```

## Running Your Server

### Development Mode

The fastest way to test and debug your server is with the MCP Inspector:

```bash
mcp dev server.py

# Add dependencies
mcp dev server.py --with pandas --with numpy

# Mount local code
mcp dev server.py --with-editable .
```

### Claude Desktop Integration

Once your server is ready, install it in Claude Desktop:

```bash
mcp install server.py

# Custom name
mcp install server.py --name "My Analytics Server"

# Environment variables
mcp install server.py -v API_KEY=abc123 -v DB_URL=postgres://...
mcp install server.py -f .env
```

### Direct Execution

For advanced scenarios like custom deployments:

```python
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("My App")

if __name__ == "__main__":
    mcp.run()
```

Run it with:
```bash
python server.py
# or
mcp run server.py
```

## Examples

### Echo Server

A simple server demonstrating resources, tools, and prompts:

```python
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Echo")

@mcp.resource("echo://{message}")
def echo_resource(message: str) -> str:
    """Echo a message as a resource"""
    return f"Resource echo: {message}"

@mcp.tool()
def echo_tool(message: str) -> str:
    """Echo a message as a tool"""
    return f"Tool echo: {message}"

@mcp.prompt()
def echo_prompt(message: str) -> str:
    """Create an echo prompt"""
    return f"Please process this message: {message}"
```

### SQLite Explorer

A more complex example showing database integration:

```python
from mcp.server.fastmcp import FastMCP
import sqlite3

mcp = FastMCP("SQLite Explorer")

@mcp.resource("schema://main")
def get_schema() -> str:
    """Provide the database schema as a resource"""
    conn = sqlite3.connect("database.db")
    schema = conn.execute(
        "SELECT sql FROM sqlite_master WHERE type='table'"
    ).fetchall()
    return "\n".join(sql[0] for sql in schema if sql[0])

@mcp.tool()
def query_data(sql: str) -> str:
    """Execute SQL queries safely"""
    conn = sqlite3.connect("database.db")
    try:
        result = conn.execute(sql).fetchall()
        return "\n".join(str(row) for row in result)
    except Exception as e:
        return f"Error: {str(e)}"
```

## Advanced Usage

### Low-Level Server

For more control, you can use the low-level server implementation directly. This gives you full access to the protocol and allows you to customize every aspect of your server, including lifecycle management through the lifespan API:

```python
from contextlib import asynccontextmanager
from typing import AsyncIterator

@asynccontextmanager
async def server_lifespan(server: Server) -> AsyncIterator[dict]:
    """Manage server startup and shutdown lifecycle."""
    try:
        # Initialize resources on startup
        await db.connect()
        yield {"db": db}
    finally:
        # Clean up on shutdown
        await db.disconnect()

# Pass lifespan to server
server = Server("example-server", lifespan=server_lifespan)

# Access lifespan context in handlers
@server.call_tool()
async def query_db(name: str, arguments: dict) -> list:
    ctx = server.request_context
    db = ctx.lifespan_context["db"]
    return await db.query(arguments["query"])
```

The lifespan API provides:
- A way to initialize resources when the server starts and clean them up when it stops
- Access to initialized resources through the request context in handlers
- Type-safe context passing between lifespan and request handlers

```python
from mcp.server.lowlevel import Server, NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.server.stdio
import mcp.types as types

# Create a server instance
server = Server("example-server")

@server.list_prompts()
async def handle_list_prompts() -> list[types.Prompt]:
    return [
        types.Prompt(
            name="example-prompt",
            description="An example prompt template",
            arguments=[
                types.PromptArgument(
                    name="arg1",
                    description="Example argument",
                    required=True
                )
            ]
        )
    ]

@server.get_prompt()
async def handle_get_prompt(
    name: str,
    arguments: dict[str, str] | None
) -> types.GetPromptResult:
    if name != "example-prompt":
        raise ValueError(f"Unknown prompt: {name}")

    return types.GetPromptResult(
        description="Example prompt",
        messages=[
            types.PromptMessage(
                role="user",
                content=types.TextContent(
                    type="text",
                    text="Example prompt text"
                )
            )
        ]
    )

async def run():
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="example",
                server_version="0.1.0",
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={},
                )
            )
        )

if __name__ == "__main__":
    import asyncio
    asyncio.run(run())
```

### Writing MCP Clients

The SDK provides a high-level client interface for connecting to MCP servers:

```python
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

# Create server parameters for stdio connection
server_params = StdioServerParameters(
    command="python", # Executable
    args=["example_server.py"], # Optional command line arguments
    env=None # Optional environment variables
)

# Optional: create a sampling callback
async def handle_sampling_message(message: types.CreateMessageRequestParams) -> types.CreateMessageResult:
    return types.CreateMessageResult(
        role="assistant",
        content=types.TextContent(
            type="text",
            text="Hello, world! from model",
        ),
        model="gpt-3.5-turbo",
        stopReason="endTurn",
    )

async def run():
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write, sampling_callback=handle_sampling_message) as session:
            # Initialize the connection
            await session.initialize()

            # List available prompts
            prompts = await session.list_prompts()

            # Get a prompt
            prompt = await session.get_prompt("example-prompt", arguments={"arg1": "value"})

            # List available resources
            resources = await session.list_resources()

            # List available tools
            tools = await session.list_tools()

            # Read a resource
            content, mime_type = await session.read_resource("file://some/path")

            # Call a tool
            result = await session.call_tool("tool-name", arguments={"arg1": "value"})

if __name__ == "__main__":
    import asyncio
    asyncio.run(run())
```

### MCP Primitives

The MCP protocol defines three core primitives that servers can implement:

| Primitive | Control               | Description                                         | Example Use                  |
|-----------|-----------------------|-----------------------------------------------------|------------------------------|
| Prompts   | User-controlled       | Interactive templates invoked by user choice        | Slash commands, menu options |
| Resources | Application-controlled| Contextual data managed by the client application   | File contents, API responses |
| Tools     | Model-controlled      | Functions exposed to the LLM to take actions        | API calls, data updates      |

### Server Capabilities

MCP servers declare capabilities during initialization:

| Capability  | Feature Flag                 | Description                        |
|-------------|------------------------------|------------------------------------|
| `prompts`   | `listChanged`                | Prompt template management         |
| `resources` | `subscribe`<br/>`listChanged`| Resource exposure and updates      |
| `tools`     | `listChanged`                | Tool discovery and execution       |
| `logging`   | -                            | Server logging configuration       |
| `completion`| -                            | Argument completion suggestions    |

## Documentation

- [Model Context Protocol documentation](https://modelcontextprotocol.io)
- [Model Context Protocol specification](https://spec.modelcontextprotocol.io)
- [Officially supported servers](https://github.com/modelcontextprotocol/servers)

## Contributing

We are passionate about supporting contributors of all levels of experience and would love to see you get involved in the project. See the [contributing guide](CONTRIBUTING.md) to get started.

## License

This project is licensed under the MIT License - see the LICENSE file for details.
```

--------------------------------------------------------------------------------
/integration_tests/test_dbt_ls.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Integration test for the dbt_ls tool that lists dbt resources.
"""
import os
import sys
import json
from pathlib import Path

# Add parent directory to python path to import from common.py
sys.path.append(str(Path(__file__).parent))
from common import run_cli_command, verify_output

# Path to the jaffle_shop project
JAFFLE_SHOP_PATH = Path(__file__).parent.parent / "dbt_integration_tests/jaffle_shop_duckdb"

def test_dbt_ls():
    """Test the dbt_ls tool by listing models"""
    print("Testing dbt_ls tool...")

    try:
        # Call the dbt_ls tool to list all models
        print("Listing all models...")
        ls_result = run_cli_command("ls", {
            "project_dir": str(JAFFLE_SHOP_PATH),
            "profiles_dir": str(JAFFLE_SHOP_PATH),  # Explicitly set profiles_dir to the same as project_dir
            "resource_type": "model",
            "output_format": "json"
        })

        # Parse the JSON result
        try:
            result_data = json.loads(ls_result)

            # Extract the actual output from the JSON response
            if isinstance(result_data, dict) and "output" in result_data:
                output = result_data["output"]
                if isinstance(output, str) and (output.startswith("[") or output.startswith("{")):
                    # If output is a JSON string, parse it
                    output = json.loads(output)
            else:
                output = result_data

            # Print the raw output for debugging
            print(f"Raw output type: {type(output)}")
            if isinstance(output, str):
                print(f"Raw output: {output[:100]}...")
            elif isinstance(output, dict):
                print(f"Raw output keys: {list(output.keys())}")
            elif isinstance(output, list):
                print(f"Raw output length: {len(output)}")

                # Filter out log messages before displaying
                filtered_items = []
                for item in output:
                    if isinstance(item, dict) and "name" in item:
                        name_value = item["name"]
                        # Skip items with ANSI color codes or log messages
                        if '\x1b[' in name_value or any(log_msg in name_value for log_msg in [
                            "Running with dbt=", "Registered adapter:", "Found", "Starting"
                        ]):
                            continue
                        filtered_items.append(item)

                print(f"Filtered output length: {len(filtered_items)}")
                for i, item in enumerate(filtered_items[:3]):  # Print first 3 filtered items
                    print(f"Item {i} type: {type(item)}")
                    print(f"Item {i}: {str(item)[:100]}...")

            # Verify we have at least the expected models
            model_names = []

            # The output is a list of dictionaries or strings
            if isinstance(output, list):
                for item in output:
                    # If it's a dictionary with a name key
                    if isinstance(item, dict) and "name" in item:
                        name_value = item["name"]

                        # If it's a log message, skip it
                        if name_value.startswith('\x1b[0m'):
                            continue

                        # If it's a JSON string, try to parse it
                        if name_value.strip().startswith('{'):
                            try:
                                model_data = json.loads(name_value)
                                if "name" in model_data and "resource_type" in model_data and model_data["resource_type"] == "model":
                                    model_names.append(model_data["name"])
                            except json.JSONDecodeError:
                                pass
                        else:
                            # If it's a model name, add it
                            model_names.append(name_value)

                    # If it's a string containing JSON
                    elif isinstance(item, str) and item.strip().startswith('{'):
                        try:
                            model_data = json.loads(item)
                            if "name" in model_data and "resource_type" in model_data and model_data["resource_type"] == "model":
                                model_names.append(model_data["name"])
                        except json.JSONDecodeError:
                            pass

            expected_models = ["customers", "orders", "stg_customers", "stg_orders", "stg_payments"]

            missing_models = [model for model in expected_models if model not in model_names]
            if missing_models:
                print(f"❌ Missing expected models: {missing_models}")
                print(f"Found models: {model_names}")
                return False

            print(f"✅ Found all expected models: {expected_models}")
            print("✅ Test passed!")
            print("DEBUG: test_dbt_ls returning True")
            return True

        except json.JSONDecodeError as e:
            print(f"❌ Failed to parse JSON result: {ls_result}")
            print(f"Error: {e}")
            print("DEBUG: test_dbt_ls returning False due to JSONDecodeError")
            return False

    except Exception as e:
        print(f"❌ Test failed with exception: {e}")
        import traceback
        traceback.print_exc()
        print("DEBUG: test_dbt_ls raising exception")
        raise
def test_dbt_ls_with_profiles_dir():
    """Test the dbt_ls tool with explicit profiles_dir parameter"""
    print("Testing dbt_ls tool with explicit profiles_dir parameter...")

    try:
        # Call the dbt_ls tool with explicit profiles_dir
        print("Listing all models with explicit profiles_dir...")
        ls_result = run_cli_command("ls", {
            "project_dir": str(JAFFLE_SHOP_PATH),
            "profiles_dir": str(JAFFLE_SHOP_PATH),  # Explicitly set profiles_dir
            "resource_type": "model",
            "output_format": "json"
        })

        # Parse the JSON result (similar to test_dbt_ls)
        try:
            result_data = json.loads(ls_result)

            # Extract the actual output from the JSON response
            if isinstance(result_data, dict) and "output" in result_data:
                output = result_data["output"]
                if isinstance(output, str) and (output.startswith("[") or output.startswith("{")):
                    output = json.loads(output)
            else:
                output = result_data

            # Verify we have at least the expected models
            model_names = []

            # The output is a list of dictionaries or strings
            if isinstance(output, list):
                for item in output:
                    # If it's a dictionary with a name key
                    if isinstance(item, dict) and "name" in item:
                        name_value = item["name"]

                        # If it's a log message, skip it
                        if name_value.startswith('\x1b[0m'):
                            continue

                        # If it's a JSON string, try to parse it
                        if name_value.strip().startswith('{'):
                            try:
                                model_data = json.loads(name_value)
                                if "name" in model_data and "resource_type" in model_data and model_data["resource_type"] == "model":
                                    model_names.append(model_data["name"])
                            except json.JSONDecodeError:
                                pass
                        else:
                            # If it's a model name, add it
                            model_names.append(name_value)

                    # If it's a string containing JSON
                    elif isinstance(item, str) and item.strip().startswith('{'):
                        try:
                            model_data = json.loads(item)
                            if "name" in model_data and "resource_type" in model_data and model_data["resource_type"] == "model":
                                model_names.append(model_data["name"])
                        except json.JSONDecodeError:
                            pass

            expected_models = ["customers", "orders", "stg_customers", "stg_orders", "stg_payments"]

            missing_models = [model for model in expected_models if model not in model_names]
            if missing_models:
                print(f"❌ Missing expected models: {missing_models}")
                print(f"Found models: {model_names}")
                return False

            print(f"✅ Found all expected models: {expected_models}")
            print("✅ Test passed!")
            print("DEBUG: test_dbt_ls_with_profiles_dir returning True")
            return True

        except json.JSONDecodeError as e:
            print(f"❌ Failed to parse JSON result: {ls_result}")
            print(f"Error: {e}")
            print("DEBUG: test_dbt_ls_with_profiles_dir returning False due to JSONDecodeError")
            return False

    except Exception as e:
        print(f"❌ Test failed with exception: {e}")
        import traceback
        traceback.print_exc()
        print("DEBUG: test_dbt_ls_with_profiles_dir raising exception")
        raise

def test_dbt_ls_verbose():
    """Test the dbt_ls tool with verbose flag"""
    print("Testing dbt_ls tool with verbose flag...")

    try:
        # First test with default (simplified) output
        print("Listing models with simplified output (default)...")
        simplified_result = run_cli_command("ls", {
            "project_dir": str(JAFFLE_SHOP_PATH),
            "profiles_dir": str(JAFFLE_SHOP_PATH),
            "resource_type": "model",
            "output_format": "json"
        })

        # Then test with verbose output
        print("Listing models with verbose output...")
        verbose_result = run_cli_command("ls", {
            "project_dir": str(JAFFLE_SHOP_PATH),
            "profiles_dir": str(JAFFLE_SHOP_PATH),
            "resource_type": "model",
            "output_format": "json",
            "verbose": True
        })

        # Parse both results
        try:
            simplified_data = json.loads(simplified_result)
            verbose_data = json.loads(verbose_result)

            # Extract the actual output from the JSON responses
            if isinstance(simplified_data, dict) and "output" in simplified_data:
                simplified_output = simplified_data["output"]
                if isinstance(simplified_output, str) and (simplified_output.startswith("[") or simplified_output.startswith("{")):
                    simplified_output = json.loads(simplified_output)
            else:
                simplified_output = simplified_data

            if isinstance(verbose_data, dict) and "output" in verbose_data:
                verbose_output = verbose_data["output"]
                if isinstance(verbose_output, str) and (verbose_output.startswith("[") or verbose_output.startswith("{")):
                    verbose_output = json.loads(verbose_output)
            else:
                verbose_output = verbose_data

            # Verify both outputs contain the expected models
            simplified_models = []
            verbose_models = []

            # Debug output
            print(f"DEBUG: Simplified output type: {type(simplified_output)}")
            if isinstance(simplified_output, list):
                print(f"DEBUG: Simplified output length: {len(simplified_output)}")
                if simplified_output and len(simplified_output) > 0:
                    print(f"DEBUG: First simplified item type: {type(simplified_output[0])}")
                    print(f"DEBUG: First simplified item: {simplified_output[0]}")

            print(f"DEBUG: Verbose output type: {type(verbose_output)}")
            if isinstance(verbose_output, list):
                print(f"DEBUG: Verbose output length: {len(verbose_output)}")
                if verbose_output and len(verbose_output) > 0:
                    print(f"DEBUG: First verbose item type: {type(verbose_output[0])}")
                    print(f"DEBUG: First verbose item: {verbose_output[0]}")

            # Process simplified output
            if isinstance(simplified_output, list):
                for item in simplified_output:
                    # Handle dictionary items (properly formatted model data)
                    if isinstance(item, dict) and "name" in item and "resource_type" in item:
                        simplified_models.append(item["name"])

                        # Verify simplified output only has the required fields
                        if set(item.keys()) != {"name", "resource_type", "depends_on"}:
                            print(f"❌ Simplified output has unexpected fields: {set(item.keys())}")
                            return False

                        # Verify depends_on only has nodes
                        if "depends_on" in item and set(item["depends_on"].keys()) != {"nodes"}:
                            print(f"❌ Simplified output depends_on has unexpected fields: {set(item['depends_on'].keys())}")
                            return False
                    # Handle string items (could be model names or log messages)
                    elif isinstance(item, str):
                        # Skip log messages and only add actual model names
                        if not item.startswith('\x1b[') and not any(log_msg in item for log_msg in [
                            "Running with dbt=", "Registered adapter:", "Found", "Starting"
                        ]):
                            simplified_models.append(item)

            # Process verbose output
            if isinstance(verbose_output, list):
                for item in verbose_output:
                    # Handle dictionary items (properly formatted model data)
                    if isinstance(item, dict) and "name" in item and "resource_type" in item:
                        verbose_models.append(item["name"])

                        # Verify verbose output has more fields than simplified
                        if len(item.keys()) <= 3:
                            print(f"❌ Verbose output doesn't have enough fields: {set(item.keys())}")
                            return False

                        # Check for fields that should be in verbose but not simplified
                        for field in ["package_name", "original_file_path", "unique_id", "config"]:
                            if field not in item:
                                print(f"❌ Verbose output missing expected field: {field}")
                                return False
                    # Handle string items (could be model names or log messages)
                    elif isinstance(item, str):
                        # Skip log messages and only add actual model names
                        if not item.startswith('\x1b[') and not any(log_msg in item for log_msg in [
                            "Running with dbt=", "Registered adapter:", "Found", "Starting"
                        ]):
                            verbose_models.append(item)

            # Filter out any log messages from the model lists
            simplified_models = [model for model in simplified_models if model in ["customers", "orders", "stg_customers", "stg_orders", "stg_payments"]]
            verbose_models = [model for model in verbose_models if model in ["customers", "orders", "stg_customers", "stg_orders", "stg_payments"]]

            # Sort the model lists for consistent comparison
            simplified_models.sort()
            verbose_models.sort()

            # Verify both outputs have the same models
            expected_models = ["customers", "orders", "stg_customers", "stg_orders", "stg_payments"]
            expected_models.sort()

            missing_simplified = [model for model in expected_models if model not in simplified_models]
            missing_verbose = [model for model in expected_models if model not in verbose_models]

            if missing_simplified:
                print(f"❌ Simplified output missing expected models: {missing_simplified}")
                print(f"Found models: {simplified_models}")
                return False

            if missing_verbose:
                print(f"❌ Verbose output missing expected models: {missing_verbose}")
                print(f"Found models: {verbose_models}")
                return False

            # Debug output for final model lists
            print(f"DEBUG: Final simplified_models: {simplified_models}")
            print(f"DEBUG: Final verbose_models: {verbose_models}")
            print(f"DEBUG: Models equal? {simplified_models == verbose_models}")

            if simplified_models != verbose_models:
                print(f"❌ Simplified and verbose outputs have different models")
                print(f"Simplified: {simplified_models}")
                print(f"Verbose: {verbose_models}")
                return False

            print(f"✅ Found all expected models in both outputs: {expected_models}")
            print("✅ Test passed!")
            return True

        except json.JSONDecodeError as e:
            print(f"❌ Failed to parse JSON results")
            print(f"Error: {e}")
            return False

    except Exception as e:
        print(f"❌ Test failed with exception: {e}")
        import traceback
        traceback.print_exc()
        raise

if __name__ == "__main__":
    success = True
    try:
        print("DEBUG: Starting test_dbt_ls")
        test_ls_result = test_dbt_ls()
        print(f"DEBUG: test_dbt_ls result: {test_ls_result}")
        success = test_ls_result and success

        print("DEBUG: Starting test_dbt_ls_with_profiles_dir")
        profiles_result = test_dbt_ls_with_profiles_dir()
        print(f"DEBUG: test_dbt_ls_with_profiles_dir result: {profiles_result}")
        success = profiles_result and success

        print("DEBUG: Starting test_dbt_ls_verbose")
        verbose_result = test_dbt_ls_verbose()
        print(f"DEBUG: test_dbt_ls_verbose result: {verbose_result}")
        success = verbose_result and success

        print(f"DEBUG: Final success value: {success}")
        exit_code = 0 if success else 1
        print(f"DEBUG: Exiting with code: {exit_code}")
        sys.exit(exit_code)
    except Exception as e:
        print(f"DEBUG: Exception occurred: {e}")
        sys.exit(1)
```

--------------------------------------------------------------------------------
/src/cli.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Command-line interface for dbt tools.
"""

import os
import sys
import json
import asyncio
import argparse
from pathlib import Path
from typing import Dict, Any, Optional, List

# No need for these imports anymore
from src.config import initialize as initialize_config


def parse_args() -> argparse.Namespace:
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(description="DBT CLI MCP Command Line Interface")

    # Global options
    parser.add_argument(
        "--dbt-path",
        help="Path to dbt executable",
        default=os.environ.get("DBT_PATH", "dbt")
    )
    parser.add_argument(
        "--env-file",
        help="Path to environment file",
        default=os.environ.get("ENV_FILE", ".env")
    )
    parser.add_argument(
        "--log-level",
        help="Logging level",
        choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
        default=os.environ.get("LOG_LEVEL", "INFO")
    )
    parser.add_argument(
        "--format",
        help="Output format",
        choices=["text", "json"],
        default="text"
    )

    # Set up subparsers for each command
    subparsers = parser.add_subparsers(dest="command", help="Command to execute")

    # dbt_run command
    run_parser = subparsers.add_parser("run", help="Run dbt models")
    run_parser.add_argument("--models", help="Specific models to run")
    run_parser.add_argument("--selector", help="Named selector to use")
    run_parser.add_argument("--exclude", help="Models to exclude")
    run_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
    run_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")
    run_parser.add_argument("--full-refresh", help="Perform a full refresh", action="store_true")

    # dbt_test command
    test_parser = subparsers.add_parser("test", help="Run dbt tests")
    test_parser.add_argument("--models", help="Specific models to test")
    test_parser.add_argument("--selector", help="Named selector to use")
    test_parser.add_argument("--exclude", help="Models to exclude")
    test_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
    test_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")

    # dbt_ls command
    ls_parser = subparsers.add_parser("ls", help="List dbt resources")
    ls_parser.add_argument("--models", help="Specific models to list")
    ls_parser.add_argument("--selector", help="Named selector to use")
    ls_parser.add_argument("--exclude", help="Models to exclude")
    ls_parser.add_argument("--resource-type", help="Type of resource to list")
    ls_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
    ls_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")
    ls_parser.add_argument("--output-format", help="Output format", choices=["json", "name", "path", "selector"], default="json")
    ls_parser.add_argument("--verbose", help="Return full JSON output instead of simplified version", action="store_true")

    # dbt_compile command
    compile_parser = subparsers.add_parser("compile", help="Compile dbt models")
    compile_parser.add_argument("--models", help="Specific models to compile")
    compile_parser.add_argument("--selector", help="Named selector to use")
    compile_parser.add_argument("--exclude", help="Models to exclude")
    compile_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
    compile_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")

    # dbt_debug command
    debug_parser = subparsers.add_parser("debug", help="Debug dbt project")
    debug_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
    debug_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")

    # dbt_deps command
    deps_parser = subparsers.add_parser("deps", help="Install dbt package dependencies")
    deps_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
    deps_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")

    # dbt_seed command
    seed_parser = subparsers.add_parser("seed", help="Load CSV files as seed data")
    seed_parser.add_argument("--selector", help="Named selector to use")
    seed_parser.add_argument("--exclude", help="Seeds to exclude")
    seed_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
    seed_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")

    # dbt_show command
    show_parser = subparsers.add_parser("show", help="Preview model results")
    show_parser.add_argument("--models", help="Specific model to show", required=True)
    show_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
    show_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")
    show_parser.add_argument("--limit", help="Limit the number of rows returned", type=int)
    show_parser.add_argument("--output-format", help="Output format (json, table, etc.)", default="json")

    # dbt_build command
    build_parser = subparsers.add_parser("build", help="Run build command")
    build_parser.add_argument("--models", help="Specific models to build")
    build_parser.add_argument("--selector", help="Named selector to use")
    build_parser.add_argument("--exclude", help="Models to exclude")
    build_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
    build_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")
    build_parser.add_argument("--full-refresh", help="Perform a full refresh", action="store_true")

    # configure command
    configure_parser = subparsers.add_parser("configure", help="Configure dbt path")
    configure_parser.add_argument("path", help="Path to dbt executable")

    return parser.parse_args()


# Define tool functions directly
async def run_dbt_run(models=None, selector=None, exclude=None, project_dir=".", profiles_dir=None, full_refresh=False):
    """Run dbt models."""
    command = ["run"]

    if models:
        command.extend(["-s", models])

    if selector:
        command.extend(["--selector", selector])

    if exclude:
        command.extend(["--exclude", exclude])

    if full_refresh:
        command.append("--full-refresh")

    from src.command import execute_dbt_command
    result = await execute_dbt_command(command, project_dir, profiles_dir)

    if not result["success"]:
        error_msg = f"Error executing dbt run: {result['error']}"
        if 'output' in result and result['output']:
            error_msg += f"\nOutput: {result['output']}"
        return error_msg

    return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"])

async def run_dbt_test(models=None, selector=None, exclude=None, project_dir=".", profiles_dir=None):
    """Run dbt tests."""
    command = ["test"]

    if models:
        command.extend(["-s", models])

    if selector:
        command.extend(["--selector", selector])

    if exclude:
        command.extend(["--exclude", exclude])

    from src.command import execute_dbt_command
    result = await execute_dbt_command(command, project_dir, profiles_dir)

    if not result["success"]:
        error_msg = f"Error executing dbt test: {result['error']}"
        if result["output"]:
            error_msg += f"\nOutput: {result['output']}"
        return error_msg

    return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"])

async def run_dbt_ls(models=None, selector=None, exclude=None, resource_type=None, project_dir=".", profiles_dir=None, output_format="json", verbose=False):
    """List dbt resources."""
    command = ["ls"]

    if models:
        command.extend(["-s", models])

    if selector:
        command.extend(["--selector", selector])

    if exclude:
        command.extend(["--exclude", exclude])

    if resource_type:
        command.extend(["--resource-type", resource_type])

    command.extend(["--output", output_format])

    from src.command import execute_dbt_command, parse_dbt_list_output
    import re

    result = await execute_dbt_command(command, project_dir, profiles_dir)

    if not result["success"]:
        error_msg = f"Error executing dbt ls: {result['error']}"
        if "output" in result and result["output"]:
            error_msg += f"\nOutput: {result['output']}"
        return error_msg

    # For json format, parse the output and return as JSON
    if output_format == "json":
        # Return raw output if it's an empty string or None
        if not result["output"]:
            return "[]"

        # If the output is already a list, return it directly
        if isinstance(result["output"], list):
            parsed = result["output"]
        else:
            # Parse the output
            parsed = parse_dbt_list_output(result["output"])

        # If not verbose, simplify the output
        if not verbose and parsed:
            simplified = []
            for item in parsed:
                if isinstance(item, dict):
                    simplified.append({
                        "name": item.get("name"),
                        "resource_type": item.get("resource_type"),
                        "depends_on": {
                            "nodes": item.get("depends_on", {}).get("nodes", [])
                        }
                    })
            parsed = simplified

        return json.dumps(parsed, indent=2)

    # For other formats, return the raw output
    return str(result["output"])

async def run_dbt_compile(models=None, selector=None, exclude=None, project_dir=".", profiles_dir=None):
    """Compile dbt models."""
    command = ["compile"]

    if models:
        command.extend(["-s", models])

    if selector:
        command.extend(["--selector", selector])

    if exclude:
        command.extend(["--exclude", exclude])

    from src.command import execute_dbt_command
    result = await execute_dbt_command(command, project_dir, profiles_dir)

    if not result["success"]:
        error_msg = f"Error executing dbt compile: {result['error']}"
        if result["output"]:
            error_msg += f"\nOutput: {result['output']}"
        return error_msg

    return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"])

async def run_dbt_debug(project_dir=".", profiles_dir=None):
    """Debug dbt project."""
    command = ["debug"]

    from src.command import execute_dbt_command
    result = await execute_dbt_command(command, project_dir, profiles_dir)

    if not result["success"]:
        error_msg = f"Error executing dbt debug: {result['error']}"
        if result["output"]:
            error_msg += f"\nOutput: {result['output']}"
        return error_msg

    return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"])

async def run_dbt_deps(project_dir=".", profiles_dir=None):
    """Install dbt package dependencies."""
    command = ["deps"]

    from src.command import execute_dbt_command
    result = await execute_dbt_command(command, project_dir, profiles_dir)

    if not result["success"]:
        error_msg = f"Error executing dbt deps: {result['error']}"
        if result["output"]:
            error_msg += f"\nOutput: {result['output']}"
        return error_msg

    return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"])

async def run_dbt_seed(selector=None, exclude=None, project_dir=".", profiles_dir=None):
    """Load CSV files as seed data."""
    command = ["seed"]

    if selector:
        command.extend(["--selector", selector])

    if exclude:
        command.extend(["--exclude", exclude])

    from src.command import execute_dbt_command
    result = await execute_dbt_command(command, project_dir, profiles_dir)

    if not result["success"]:
        error_msg = f"Error executing dbt seed: {result['error']}"
        if result["output"]:
            error_msg += f"\nOutput: {result['output']}"
        return error_msg

    return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"])
async def run_dbt_show(models, project_dir=".", profiles_dir=None, limit=None, output_format="json"):
    """Preview model results."""
    # For successful cases, use --quiet and --output json for clean JSON output
    # For error cases, don't use --quiet to get the full error message

    from src.command import execute_dbt_command
    import re

    # Check if models parameter contains inline SQL
    is_inline_sql = models.strip().lower().startswith('select ')

    # If it's inline SQL, strip out any LIMIT clause as we'll handle it with the --limit parameter
    if is_inline_sql:
        # Use regex to remove LIMIT clause from the SQL
        models = re.sub(r'\bLIMIT\s+\d+\b', '', models, flags=re.IGNORECASE)

        # For inline SQL, use the --inline flag with the SQL as its value
        command = ["show", f"--inline={models}", "--output", output_format]

        if limit:
            command.extend(["--limit", str(limit)])

        result = await execute_dbt_command(command, project_dir, profiles_dir)

        # Check for specific error patterns in the output
        if not result["success"] or (
            isinstance(result["output"], str) and
            any(err in result["output"].lower() for err in ["error", "failed", "syntax", "exception"])
        ):
            error_msg = "Error executing dbt show with inline SQL"
            if result["output"]:
                return error_msg + "\n" + str(result["output"])
            elif result["error"]:
                return error_msg + "\n" + str(result["error"])
            else:
                return f"{error_msg}: Command failed with exit code {result.get('returncode', 'unknown')}"
    else:
        # For regular model references, check if the model exists first
        check_command = ["ls", "-s", models]
        check_result = await execute_dbt_command(check_command, project_dir, profiles_dir)

        # If the model doesn't exist, return the error message
        if not check_result["success"] or "does not match any enabled nodes" in str(check_result["output"]):
            error_msg = "Error executing dbt show: Model does not exist or is not enabled"
            if check_result["output"]:
                return error_msg + "\n" + str(check_result["output"])
            elif check_result["error"]:
                return error_msg + "\n" + str(check_result["error"])
            else:
                return error_msg

        # If the model exists, run the show command with --quiet and --output json
        command = ["show", "-s", models, "--quiet", "--output", output_format]

        if limit:
            command.extend(["--limit", str(limit)])

        result = await execute_dbt_command(command, project_dir, profiles_dir)

    # If the command succeeded, return the JSON output
    if result["success"]:
        return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"])

    # If the command failed, return the error message
    error_msg = "Error executing dbt show: "
    if result["output"]:
        return error_msg + str(result["output"])
    elif result["error"]:
        return error_msg + str(result["error"])
    else:
        return f"{error_msg}Command failed with exit code {result.get('returncode', 'unknown')}"

async def run_dbt_build(models=None, selector=None, exclude=None, project_dir=".", profiles_dir=None, full_refresh=False):
    """Run build command."""
    command = ["build"]

    if models:
        command.extend(["-s", models])

    if selector:
        command.extend(["--selector", selector])

    if exclude:
        command.extend(["--exclude", exclude])

    if full_refresh:
        command.append("--full-refresh")

    from src.command import execute_dbt_command
    result = await execute_dbt_command(command, project_dir, profiles_dir)

    if not result["success"]:
        error_msg = f"Error executing dbt build: {result['error']}"
        if result["output"]:
            error_msg += f"\nOutput: {result['output']}"
        return error_msg

    return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"])

async def run_configure_dbt_path(path):
    """Configure dbt path."""
    import os
    from src.config import set_config

    if not os.path.isfile(path):
        return f"Error: File not found at {path}"

    set_config("dbt_path", path)
    return f"dbt path configured to: {path}"

async def main_async() -> None:
    """Main entry point for the CLI."""
    args = parse_args()

    # Set environment variables from arguments
    os.environ["DBT_PATH"] = args.dbt_path
    os.environ["ENV_FILE"] = args.env_file
    os.environ["LOG_LEVEL"] = args.log_level

    # Initialize configuration
    initialize_config()

    # Map commands to functions
    command_map = {
        "run": run_dbt_run,
        "test": run_dbt_test,
        "ls": run_dbt_ls,
        "compile": run_dbt_compile,
        "debug": run_dbt_debug,
        "deps": run_dbt_deps,
        "seed": run_dbt_seed,
        "show": run_dbt_show,
        "build": run_dbt_build,
        "configure": run_configure_dbt_path
    }

    if args.command not in command_map:
        print(f"Command '{args.command}' not found. Use --help for usage information.")
        sys.exit(1)

    # Prepare arguments for the function
    func_args = {}

    if args.command == "run":
        func_args = {
            "models": args.models,
            "selector": args.selector,
            "exclude": args.exclude,
            "project_dir": args.project_dir,
            "profiles_dir": args.profiles_dir,
            "full_refresh": args.full_refresh
        }
    elif args.command == "test":
        func_args = {
            "models": args.models,
            "selector": args.selector,
            "exclude": args.exclude,
            "project_dir": args.project_dir,
            "profiles_dir": args.profiles_dir
        }
    elif args.command == "ls":
        func_args = {
            "models": args.models,
            "selector": args.selector,
            "exclude": args.exclude,
            "resource_type": args.resource_type,
            "project_dir": args.project_dir,
            "profiles_dir": args.profiles_dir,
            "output_format": args.output_format,
            "verbose": args.verbose
        }
    elif args.command == "compile":
        func_args = {
            "models": args.models,
            "selector": args.selector,
            "exclude": args.exclude,
            "project_dir": args.project_dir,
            "profiles_dir": args.profiles_dir
        }
    elif args.command == "debug":
        func_args = {
            "project_dir": args.project_dir,
            "profiles_dir": args.profiles_dir
        }
    elif args.command == "deps":
        func_args = {
            "project_dir": args.project_dir,
            "profiles_dir": args.profiles_dir
        }
    elif args.command == "seed":
        func_args = {
            "selector": args.selector,
            "exclude": args.exclude,
            "project_dir": args.project_dir,
            "profiles_dir": args.profiles_dir
        }
    elif args.command == "show":
        func_args = {
            "models": args.models,
            "project_dir": args.project_dir,
            "profiles_dir": args.profiles_dir,
            "limit": args.limit,
            "output_format": args.output_format
        }
    elif args.command == "build":
        func_args = {
            "models": args.models,
            "selector": args.selector,
            "exclude": args.exclude,
            "project_dir": args.project_dir,
            "profiles_dir": args.profiles_dir,
            "full_refresh": args.full_refresh
        }
    elif args.command == "configure":
        func_args = {
            "path": args.path
        }

    # Execute the function
    result = await command_map[args.command](**{k: v for k, v in func_args.items() if v is not None})

    # Print the result
    # For dbt_show command errors, print raw output regardless of format
    if args.command == "show" and isinstance(result, str) and result.startswith("Error executing dbt show:"):
        print(result)
    elif args.format == "json":
        try:
            # If result is already a JSON string, parse it first
            if isinstance(result, str) and (result.startswith("{") or result.startswith("[")):
                parsed = json.loads(result)
                print(json.dumps(parsed, indent=2))
            else:
                print(json.dumps({"output": result}, indent=2))
        except json.JSONDecodeError:
            print(json.dumps({"output": result}, indent=2))
    else:
        print(result)


def main_entry() -> None:
    """Entry point for setuptools."""
    asyncio.run(main_async())


if __name__ == "__main__":
    asyncio.run(main_async())
```

--------------------------------------------------------------------------------
/src/tools.py:
--------------------------------------------------------------------------------

```python
"""
MCP tool implementations for the DBT CLI MCP Server.

This module defines all the MCP tools that map to dbt CLI commands.
Each tool is a function decorated with @mcp.tool() that handles a specific dbt command.
"""

import logging
import json
import re
from typing import Optional, Dict, Any, List
from functools import partial

from mcp.server.fastmcp import FastMCP
from pydantic import Field

from src.command import execute_dbt_command, parse_dbt_list_output, process_command_result
from src.config import get_config, set_config
from src.formatters import default_formatter, ls_formatter, show_formatter

# Logger for this module
logger = logging.getLogger(__name__)


def register_tools(mcp: FastMCP) -> None:
    """
    Register all tools with the MCP server.

    Args:
        mcp: The FastMCP server instance
    """

    @mcp.tool()
    async def dbt_run(
        models: Optional[str] = Field(
            default=None,
            description="Specific models to run, using the dbt selection syntax (e.g., \"model_name+\")"
        ),
        selector: Optional[str] = Field(
            default=None,
            description="Named selector to use"
        ),
        exclude: Optional[str] = Field(
            default=None,
            description="Models to exclude"
        ),
        project_dir: str = Field(
            default=".",
            description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
        ),
        profiles_dir: Optional[str] = Field(
            default=None,
            description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
        ),
        full_refresh: bool = Field(
            default=False,
            description="Whether to perform a full refresh"
        )
    ) -> str:
        """Run dbt models. An AI agent should use this tool when it needs to execute dbt models to transform data and build analytical tables in the data warehouse. This is essential for refreshing data or implementing new data transformations in a project.

        Returns:
            Output from the dbt run command as text (this command does not support JSON output format)
        """
        command = ["run"]

        if models:
            command.extend(["-s", models])

        if selector:
            command.extend(["--selector", selector])

        if exclude:
            command.extend(["--exclude", exclude])

        if full_refresh:
            command.append("--full-refresh")

        # The --no-print flag is not supported by dbt Cloud CLI
        # We'll rely on proper parsing to handle any print macros

        result = await execute_dbt_command(command, project_dir, profiles_dir)

        # Use the centralized result processor
        return await process_command_result(result, command_name="run")

    @mcp.tool()
    async def dbt_test(
        models: Optional[str] = Field(
            default=None,
            description="Specific models to test, using the dbt selection syntax"
        ),
        selector: Optional[str] = Field(
            default=None,
            description="Named selector to use"
        ),
        exclude: Optional[str] = Field(
            default=None,
            description="Models to exclude"
        ),
        project_dir: str = Field(
            default=".",
            description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
        ),
        profiles_dir: Optional[str] = Field(
            default=None,
            description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
        )
    ) -> str:
        """Run dbt tests. An AI agent should use this tool when it needs to validate data quality and integrity by running tests defined in a dbt project. This helps ensure that data transformations meet expected business rules and constraints before being used for analysis or reporting.

        Returns:
            Output from the dbt test command as text (this command does not support JSON output format)
        """
        command = ["test"]

        if models:
            command.extend(["-s", models])

        if selector:
            command.extend(["--selector", selector])

        if exclude:
            command.extend(["--exclude", exclude])

        # The --no-print flag is not supported by dbt Cloud CLI
        # We'll rely on proper parsing to handle any print macros

        result = await execute_dbt_command(command, project_dir, profiles_dir)

        # Use the centralized result processor
        return await process_command_result(result, command_name="test")

    @mcp.tool()
    async def dbt_ls(
        models: Optional[str] = Field(
            default=None,
            description="Specific models to list, using the dbt selection syntax. Note that you probably want to specify your selection here e.g. silver.fact"
        ),
        selector: Optional[str] = Field(
            default=None,
            description="Named selector to use"
        ),
        exclude: Optional[str] = Field(
            default=None,
            description="Models to exclude"
        ),
        resource_type: Optional[str] = Field(
            default=None,
            description="Type of resource to list (model, test, source, etc.)"
        ),
        project_dir: str = Field(
            default=".",
            description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
        ),
        profiles_dir: Optional[str] = Field(
            default=None,
            description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
        ),
        output_format: str = Field(
            default="json",
            description="Output format (json, name, path, or selector)"
        ),
        verbose: bool = Field(
            default=False,
            description="Return full JSON output instead of simplified version"
        )
    ) -> str:
        """List dbt resources. An AI agent should use this tool when it needs to discover available models, tests, sources, and other resources within a dbt project. This helps the agent understand the project structure, identify dependencies, and select specific resources for other operations like running or testing.

        Returns:
            When output_format is 'json' (default):
              - With verbose=False (default): returns a simplified JSON with only name, resource_type, and depends_on.nodes
              - With verbose=True: returns a full JSON with all resource details
            When output_format is 'name', 'path', or 'selector', returns plain text with the respective format.
        """
        # Log diagnostic information
        logger.info(f"Starting dbt_ls with project_dir={project_dir}, output_format={output_format}")

        command = ["ls"]

        if models:
            command.extend(["-s", models])

        if selector:
            command.extend(["--selector", selector])

        if exclude:
            command.extend(["--exclude", exclude])

        if resource_type:
            command.extend(["--resource-type", resource_type])

        command.extend(["--output", output_format])

        command.extend(["--quiet"])

        logger.info(f"Executing dbt command: dbt {' '.join(command)}")
        result = await execute_dbt_command(command, project_dir, profiles_dir)
        logger.info(f"dbt command result: success={result['success']}, returncode={result.get('returncode')}")

        # Use the centralized result processor with ls_formatter
        formatter = partial(ls_formatter, output_format=output_format, verbose=verbose)

        return await process_command_result(
            result,
            command_name="ls",
            output_formatter=formatter,
            include_debug_info=True  # Include extra debug info for this command
        )

    @mcp.tool()
    async def dbt_compile(
        models: Optional[str] = Field(
            default=None,
            description="Specific models to compile, using the dbt selection syntax"
        ),
        selector: Optional[str] = Field(
            default=None,
            description="Named selector to use"
        ),
        exclude: Optional[str] = Field(
            default=None,
            description="Models to exclude"
        ),
        project_dir: str = Field(
            default=".",
            description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
        ),
        profiles_dir: Optional[str] = Field(
            default=None,
            description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
        )
    ) -> str:
        """Compile dbt models. An AI agent should use this tool when it needs to generate the SQL that will be executed without actually running it against the database. This is valuable for validating SQL syntax, previewing transformations, or investigating how dbt interprets models before committing to execution.

        Returns:
            Output from the dbt compile command as text (this command does not support JSON output format)
        """
        command = ["compile"]

        if models:
            command.extend(["-s", models])

        if selector:
            command.extend(["--selector", selector])

        if exclude:
            command.extend(["--exclude", exclude])

        # The --no-print flag is not supported by dbt Cloud CLI
        # We'll rely on proper parsing to handle any print macros

        result = await execute_dbt_command(command, project_dir, profiles_dir)

        # Use the centralized result processor
        return await process_command_result(result, command_name="compile")

    @mcp.tool()
    async def dbt_debug(
        project_dir: str = Field(
            default=".",
            description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
        ),
        profiles_dir: Optional[str] = Field(
            default=None,
            description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
        )
    ) -> str:
        """Run dbt debug to validate the project setup. An AI agent should use this tool when it needs to troubleshoot configuration issues, check database connectivity, or verify that all project dependencies are properly installed. This is essential for diagnosing problems before attempting to run models or tests.

        Returns:
            Output from the dbt debug command as text (this command does not support JSON output format)
        """
        command = ["debug"]

        # The --no-print flag is not supported by dbt Cloud CLI
        # We'll rely on proper parsing to handle any print macros

        result = await execute_dbt_command(command, project_dir, profiles_dir)

        # Use the centralized result processor
        return await process_command_result(result, command_name="debug")

    @mcp.tool()
    async def dbt_deps(
        project_dir: str = Field(
            default=".",
            description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
        ),
        profiles_dir: Optional[str] = Field(
            default=None,
            description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
        )
    ) -> str:
        """Install dbt package dependencies. An AI agent should use this tool when it needs to install or update external packages that the dbt project depends on. This ensures that all required modules, macros, and models from other packages are available before running the project.

        Returns:
            Output from the dbt deps command as text (this command does not support JSON output format)
        """
        command = ["deps"]

        # The --no-print flag is not supported by dbt Cloud CLI
        # We'll rely on proper parsing to handle any print macros

        result = await execute_dbt_command(command, project_dir, profiles_dir)

        # Use the centralized result processor
        return await process_command_result(result, command_name="deps")

    @mcp.tool()
    async def dbt_seed(
        selector: Optional[str] = Field(
            default=None,
            description="Named selector to use"
        ),
        exclude: Optional[str] = Field(
            default=None,
            description="Seeds to exclude"
        ),
        project_dir: str = Field(
            default=".",
            description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
        ),
        profiles_dir: Optional[str] = Field(
            default=None,
            description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
        )
    ) -> str:
        """Load CSV files as seed data. An AI agent should use this tool when it needs to load initial data from CSV files into the database. This is essential for creating reference tables, test datasets, or any static data that models will depend on.

        Returns:
            Output from the dbt seed command as text (this command does not support JSON output format)
        """
        command = ["seed"]

        # The --no-print flag is not supported by dbt Cloud CLI
        # We'll rely on proper parsing to handle any print macros

        if selector:
            command.extend(["--selector", selector])

        if exclude:
            command.extend(["--exclude", exclude])

        result = await execute_dbt_command(command, project_dir, profiles_dir)

        # Use the centralized result processor
        return await process_command_result(result, command_name="seed")

    @mcp.tool()
    async def dbt_show(
        models: str = Field(
            description="Specific model to show. For model references, use standard dbt syntax like 'model_name'. For inline SQL, use the format 'select * from {{ ref(\"model_name\") }}' to reference other models."
        ),
        project_dir: str = Field(
            default=".",
            description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
        ),
        profiles_dir: Optional[str] = Field(
            default=None,
            description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
        ),
        limit: Optional[int] = Field(
            default=None,
            description="Limit the number of rows returned"
        ),
        output: Optional[str] = Field(
            default="json",
            description="Output format (json, table, etc.)"
        )
    ) -> str:
        """Preview the results of a model. An AI agent should use this tool when it needs to preview data from a specific model without materializing it. This helps inspect transformation results, debug issues, or demonstrate how data looks after processing without modifying the target database.

        Returns:
            Output from the dbt show command, defaulting to JSON format if not specified
        """
        # Use enhanced SQL detection
        is_inline_sql, sql_type = is_inline_sql_query(models)

        # If it's SQL, check for security risks
        if is_inline_sql:
            has_risk, risk_reason = contains_mutation_risk(models)
            if has_risk:
                logger.warning(f"Security risk detected in SQL: {risk_reason}")
                error_result = {
                    "success": False,
                    "output": f"Security validation failed: {risk_reason}. For security reasons, mutation operations are not allowed.",
                    "error": "SecurityValidationError",
                    "returncode": 1
                }
                return await process_command_result(
                    error_result,
                    command_name="show",
                    include_debug_info=True
                )

        logger.info(f"dbt_show called with models={models}, is_inline_sql={is_inline_sql}")

        # If it's inline SQL, strip out any LIMIT clause as we'll handle it with the --limit parameter
        if is_inline_sql:
            # Use regex to remove LIMIT clause from the SQL
            original_models = models
            models = re.sub(r'\bLIMIT\s+\d+\b', '', models, flags=re.IGNORECASE)
            logger.info(f"Stripped LIMIT clause: {original_models} -> {models}")

            # For inline SQL, use the --inline flag with the SQL as its value
            command = ["show", f"--inline={models}", "--output", output or "json"]

            # Only add --limit if the inline type is WITH or SELECT (select_inline vs meta_inline)
            if limit and sql_type in ["WITH", "SELECT"]:
                command.extend(["--limit", str(limit)])

            logger.info(f"Executing dbt command: {' '.join(command)}")
            # Don't use --quiet for inline SQL to ensure we get error messages
            result = await execute_dbt_command(command, project_dir, profiles_dir)

            logger.info(f"Command result: success={result['success']}, returncode={result.get('returncode')}")
            if isinstance(result["output"], str):
                logger.info(f"Output (first 100 chars): {result['output'][:100]}")
            elif isinstance(result["output"], (dict, list)):
                logger.info(f"Output structure: {json.dumps(result['output'])[:100]}")

            # Check for specific error patterns in the output
            if not result["success"] or (
                isinstance(result["output"], str) and
                any(err in result["output"].lower() for err in ["error", "failed", "syntax", "exception"])
            ):
                logger.warning(f"Error detected in output: {result['output'][:200]}")
                error_result = {
                    "success": False,
                    "output": f"Error executing inline SQL\n{result['output']}",
                    "error": result["error"],
                    "returncode": result["returncode"]
                }
                return await process_command_result(
                    error_result,
                    command_name="show",
                    include_debug_info=True
                )
        else:
            # For regular model references, check if the model exists first
            check_command = ["ls", "-s", models]
            check_result = await execute_dbt_command(check_command, project_dir, profiles_dir)

            # If the model doesn't exist, return the error message
            if not check_result["success"] or "does not match any enabled nodes" in str(check_result["output"]):
                error_result = {
                    "success": False,
                    "output": f"Model does not exist or is not enabled\n{check_result['output']}",
                    "error": check_result["error"],
                    "returncode": check_result["returncode"]
                }
                return await process_command_result(
                    error_result,
                    command_name="show",
                    include_debug_info=True
                )

            # If the model exists, run the show command with --quiet and --output json
            command = ["show", "-s", models, "--quiet", "--output", output or "json"]

            if limit:
                command.extend(["--limit", str(limit)])

            result = await execute_dbt_command(command, project_dir, profiles_dir)

        # Use the centralized result processor
        return await process_command_result(
            result,
            command_name="show",
            output_formatter=show_formatter,
            include_debug_info=True
        )

    @mcp.tool()
    async def dbt_build(
        models: Optional[str] = Field(
            default=None,
            description="Specific models to build, using the dbt selection syntax"
        ),
        selector: Optional[str] = Field(
            default=None,
            description="Named selector to use"
        ),
        exclude: Optional[str] = Field(
            default=None,
            description="Models to exclude"
        ),
        project_dir: str = Field(
            default=".",
            description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
        ),
        profiles_dir: Optional[str] = Field(
            default=None,
            description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
        ),
        full_refresh: bool = Field(
            default=False,
            description="Whether to perform a full refresh"
        )
    ) -> str:
        """Run build command (seeds, tests, snapshots, and models). An AI agent should use this tool when it needs to execute a comprehensive build process that runs seeds, snapshots, models, and tests in the correct order. This is ideal for complete project deployment or ensuring all components work together.

        Returns:
            Output from the dbt build command as text (this command does not support JSON output format)
        """
        command = ["build"]

        if models:
            command.extend(["-s", models])

        if selector:
            command.extend(["--selector", selector])

        if exclude:
            command.extend(["--exclude", exclude])

        if full_refresh:
            command.append("--full-refresh")

        # The --no-print flag is not supported by dbt Cloud CLI
        # We'll rely on proper parsing to handle any print macros

        result = await execute_dbt_command(command, project_dir, profiles_dir)

        # Use the centralized result processor
        return await process_command_result(result, command_name="build")

    logger.info("Registered all dbt tools")


def is_inline_sql_query(query: str) -> tuple[bool, Optional[str]]:
    """
    Determine if the given string is an inline SQL query or a model reference.

    This function uses multiple heuristics to determine if a string is likely
    an SQL query rather than a model name:
    1. Checks for common SQL keywords at the beginning
    2. Looks for SQL syntax patterns
    3. Considers length and complexity
    4. Handles SQL with comments (both single-line and multi-line)
    5. Recognizes dbt templating syntax

    Args:
        query: The string to check

    Returns:
        A tuple of (is_sql, sql_type) where:
        - is_sql: True if the input is SQL, False otherwise
        - sql_type: The type of SQL statement if is_sql is True, None otherwise
          (e.g., "SELECT", "WITH", "SHOW", etc.)
    """
    # Normalize the query by trimming whitespace
    normalized_query = query.strip()

    # Skip empty queries
    if not normalized_query:
        return False, None

    # Check if the query contains SQL comments
    has_single_line_comment = '--' in normalized_query
    has_multi_line_comment = '/*' in normalized_query and '*/' in normalized_query

    # If the query only contains comments, it's still SQL
    if has_single_line_comment or has_multi_line_comment:
        # Check if it's only comments by removing them and seeing if anything remains
        # Remove /* */ style comments
        sql_no_comments = re.sub(r'/\*.*?\*/', ' ', normalized_query, flags=re.DOTALL)
        # Remove -- style comments
        sql_no_comments = re.sub(r'--.*?$', ' ', sql_no_comments, flags=re.MULTILINE)
        # Normalize whitespace
        sql_no_comments = ' '.join(sql_no_comments.split()).strip()

        if not sql_no_comments:
            # If nothing remains after removing comments, it's only comments
            return True, "COMMENT"

    # Convert to lowercase for case-insensitive matching
    normalized_query_lower = normalized_query.lower()

    # Check for SQL comments at the beginning and skip them for detection
    # This handles both single-line (--) and multi-line (/* */) comments
    comment_pattern = r'^(\s*(--[^\n]*\n|\s*/\*.*?\*/\s*)*\s*)'
    match = re.match(comment_pattern, normalized_query_lower, re.DOTALL)
    if match:
        # Skip past the comments for keyword detection
        start_pos = match.end()
        if start_pos >= len(normalized_query_lower):
            # If the query is only comments, it's still SQL
            return True, "COMMENT"
        normalized_query_lower = normalized_query_lower[start_pos:]

    # Common SQL statement starting keywords
    sql_starters = {
        'select': 'SELECT',
        'with': 'WITH',
        'show': 'SHOW',
        'describe': 'DESCRIBE',
        'explain': 'EXPLAIN',
        'analyze': 'ANALYZE',
        'use': 'USE',
        'set': 'SET'
    }

    # Check if the query starts with a common SQL keyword
    for keyword, sql_type in sql_starters.items():
        if normalized_query_lower.startswith(keyword + ' '):
            return True, sql_type

    # Check for more complex patterns like CTEs
    # WITH clause followed by identifier and AS
    cte_pattern = r'^\s*with\s+[a-z0-9_]+\s+as\s*\('
    if re.search(cte_pattern, normalized_query_lower, re.IGNORECASE):
        return True, "WITH"

    # Check for Jinja templating with SQL inside
    jinja_sql_pattern = r'{{\s*sql\s*}}'
    if re.search(jinja_sql_pattern, normalized_query_lower):
        return True, "JINJA"

    # Check for dbt ref or source macros which indicate SQL
    dbt_macro_pattern = r'{{\s*(ref|source)\s*\(\s*[\'"]'
    if re.search(dbt_macro_pattern, normalized_query_lower):
        return True, "DBT_MACRO"

    # If the query contains certain SQL syntax elements, it's likely SQL
    sql_syntax_elements = [
        r'\bfrom\s+[a-z0-9_]+',  # FROM clause
        r'\bjoin\s+[a-z0-9_]+',   # JOIN clause
        r'\bwhere\s+',            # WHERE clause
        r'\bgroup\s+by\s+',       # GROUP BY clause
        r'\border\s+by\s+',       # ORDER BY clause
        r'\bhaving\s+',           # HAVING clause
        r'\bunion\s+',            # UNION operator
        r'\bcase\s+when\s+'       # CASE expression
    ]

    for pattern in sql_syntax_elements:
        if re.search(pattern, normalized_query_lower, re.IGNORECASE):
            return True, "SQL_SYNTAX"

    # If the query is long and contains spaces, it's more likely to be SQL than a model name
    if len(normalized_query_lower) > 30 and ' ' in normalized_query_lower:
        return True, "COMPLEX"

    # If none of the above conditions are met, it's likely a model name
    return False, None


def contains_mutation_risk(sql: str) -> tuple[bool, str]:
    """
    Check if the SQL query contains potentially dangerous operations.

    This function scans SQL for operations that could modify or delete data,
    which should be prohibited in a read-only context like dbt show.

    Args:
        sql: The SQL query to check

    Returns:
        A tuple of (has_risk, reason) where:
        - has_risk: True if the query contains risky operations, False otherwise
        - reason: A description of the risk if has_risk is True, empty string otherwise
    """
    # Normalize the SQL by removing comments and extra whitespace
    # This helps prevent comment-based evasion techniques

    # Remove /* */ style comments
    sql_no_comments = re.sub(r'/\*.*?\*/', ' ', sql, flags=re.DOTALL)

    # Remove -- style comments
    sql_no_comments = re.sub(r'--.*?$', ' ', sql_no_comments, flags=re.MULTILINE)

    # Normalize whitespace
    normalized_sql = ' '.join(sql_no_comments.split()).lower()

    # Check for multiple SQL statements (potential SQL injection)
    # This needs to be checked first to ensure proper error message
    if ';' in normalized_sql:
        # Check if there's actual SQL after the semicolon
        statements = normalized_sql.split(';')
        if len(statements) > 1:
            for stmt in statements[1:]:
                if stmt.strip():
                    return True, "Multiple SQL statements detected - potential SQL injection risk"

    # Dangerous SQL operations patterns
    dangerous_patterns = [
        # Data modification operations
        (r'\bdelete\s+from\b', "DELETE operation detected"),
        (r'\btruncate\s+table\b', "TRUNCATE operation detected"),
        (r'\bdrop\s+table\b', "DROP TABLE operation detected"),
        (r'\bdrop\s+database\b', "DROP DATABASE operation detected"),
        (r'\bdrop\s+schema\b', "DROP SCHEMA operation detected"),
        (r'\balter\s+table\b', "ALTER TABLE operation detected"),
        (r'\bcreate\s+table\b', "CREATE TABLE operation detected"),
        (r'\bcreate\s+or\s+replace\b', "CREATE OR REPLACE operation detected"),
        (r'\binsert\s+into\b', "INSERT operation detected"),
        (r'\bupdate\s+.*?\bset\b', "UPDATE operation detected"),
        (r'\bmerge\s+into\b', "MERGE operation detected"),

        # Database administration operations
        (r'\bgrant\b', "GRANT operation detected"),
        (r'\brevoke\b', "REVOKE operation detected"),
        (r'\bcreate\s+user\b', "CREATE USER operation detected"),
        (r'\balter\s+user\b', "ALTER USER operation detected"),
        (r'\bdrop\s+user\b', "DROP USER operation detected"),

        # Execution of arbitrary code
        (r'\bexec\b', "EXEC operation detected"),
        (r'\bexecute\s+immediate\b', "EXECUTE IMMEDIATE detected"),
        (r'\bcall\b', "CALL procedure detected")
    ]

    # Check for each dangerous pattern
    for pattern, reason in dangerous_patterns:
        if re.search(pattern, normalized_sql, re.IGNORECASE):
            return True, reason

    # Check for specific Snowflake commands that could be risky
    snowflake_patterns = [
        (r'\bcopy\s+into\b', "Snowflake COPY INTO operation detected"),
        (r'\bunload\s+to\b', "Snowflake UNLOAD operation detected"),
        (r'\bput\b', "Snowflake PUT operation detected"),
        (r'\bremove\b', "Snowflake REMOVE operation detected"),
        (r'\bmodify\b', "Snowflake MODIFY operation detected")
    ]

    for pattern, reason in snowflake_patterns:
        if re.search(pattern, normalized_sql, re.IGNORECASE):
            return True, reason

    # No risks detected
    return False, ""
```
Page 2/3FirstPrevNextLast