#
tokens: 47905/50000 17/313 files (page 3/4)
lines: off (toggle) GitHub
raw markdown copy
This is page 3 of 4. Use http://codebase.md/dbt-labs/dbt-mcp?page={x} to view the full context.

# Directory Structure

```
├── .changes
│   ├── header.tpl.md
│   ├── unreleased
│   │   └── .gitkeep
│   ├── v0.1.3.md
│   ├── v0.10.0.md
│   ├── v0.10.1.md
│   ├── v0.10.2.md
│   ├── v0.10.3.md
│   ├── v0.2.0.md
│   ├── v0.2.1.md
│   ├── v0.2.10.md
│   ├── v0.2.11.md
│   ├── v0.2.12.md
│   ├── v0.2.13.md
│   ├── v0.2.14.md
│   ├── v0.2.15.md
│   ├── v0.2.16.md
│   ├── v0.2.17.md
│   ├── v0.2.18.md
│   ├── v0.2.19.md
│   ├── v0.2.2.md
│   ├── v0.2.20.md
│   ├── v0.2.3.md
│   ├── v0.2.4.md
│   ├── v0.2.5.md
│   ├── v0.2.6.md
│   ├── v0.2.7.md
│   ├── v0.2.8.md
│   ├── v0.2.9.md
│   ├── v0.3.0.md
│   ├── v0.4.0.md
│   ├── v0.4.1.md
│   ├── v0.4.2.md
│   ├── v0.5.0.md
│   ├── v0.6.0.md
│   ├── v0.6.1.md
│   ├── v0.6.2.md
│   ├── v0.7.0.md
│   ├── v0.8.0.md
│   ├── v0.8.1.md
│   ├── v0.8.2.md
│   ├── v0.8.3.md
│   ├── v0.8.4.md
│   ├── v0.9.0.md
│   ├── v0.9.1.md
│   ├── v1.0.0.md
│   └── v1.1.0.md
├── .changie.yaml
├── .github
│   ├── actions
│   │   └── setup-python
│   │       └── action.yml
│   ├── CODEOWNERS
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   └── feature_request.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── changelog-check.yml
│       ├── codeowners-check.yml
│       ├── create-release-pr.yml
│       ├── release.yml
│       └── run-checks-pr.yaml
├── .gitignore
├── .pre-commit-config.yaml
├── .task
│   └── checksum
│       └── d2
├── .tool-versions
├── .vscode
│   ├── launch.json
│   └── settings.json
├── CHANGELOG.md
├── CONTRIBUTING.md
├── docs
│   ├── d2.png
│   └── diagram.d2
├── evals
│   └── semantic_layer
│       └── test_eval_semantic_layer.py
├── examples
│   ├── .DS_Store
│   ├── aws_strands_agent
│   │   ├── __init__.py
│   │   ├── .DS_Store
│   │   ├── dbt_data_scientist
│   │   │   ├── __init__.py
│   │   │   ├── .env.example
│   │   │   ├── agent.py
│   │   │   ├── prompts.py
│   │   │   ├── quick_mcp_test.py
│   │   │   ├── test_all_tools.py
│   │   │   └── tools
│   │   │       ├── __init__.py
│   │   │       ├── dbt_compile.py
│   │   │       ├── dbt_mcp.py
│   │   │       └── dbt_model_analyzer.py
│   │   ├── LICENSE
│   │   ├── README.md
│   │   └── requirements.txt
│   ├── google_adk_agent
│   │   ├── __init__.py
│   │   ├── main.py
│   │   ├── pyproject.toml
│   │   └── README.md
│   ├── langgraph_agent
│   │   ├── __init__.py
│   │   ├── .python-version
│   │   ├── main.py
│   │   ├── pyproject.toml
│   │   ├── README.md
│   │   └── uv.lock
│   ├── openai_agent
│   │   ├── __init__.py
│   │   ├── .gitignore
│   │   ├── .python-version
│   │   ├── main_streamable.py
│   │   ├── main.py
│   │   ├── pyproject.toml
│   │   ├── README.md
│   │   └── uv.lock
│   ├── openai_responses
│   │   ├── __init__.py
│   │   ├── .gitignore
│   │   ├── .python-version
│   │   ├── main.py
│   │   ├── pyproject.toml
│   │   ├── README.md
│   │   └── uv.lock
│   ├── pydantic_ai_agent
│   │   ├── __init__.py
│   │   ├── .gitignore
│   │   ├── .python-version
│   │   ├── main.py
│   │   ├── pyproject.toml
│   │   └── README.md
│   └── remote_mcp
│       ├── .python-version
│       ├── main.py
│       ├── pyproject.toml
│       ├── README.md
│       └── uv.lock
├── LICENSE
├── pyproject.toml
├── README.md
├── src
│   ├── client
│   │   ├── __init__.py
│   │   ├── main.py
│   │   └── tools.py
│   ├── dbt_mcp
│   │   ├── __init__.py
│   │   ├── .gitignore
│   │   ├── config
│   │   │   ├── config_providers.py
│   │   │   ├── config.py
│   │   │   ├── dbt_project.py
│   │   │   ├── dbt_yaml.py
│   │   │   ├── headers.py
│   │   │   ├── settings.py
│   │   │   └── transport.py
│   │   ├── dbt_admin
│   │   │   ├── __init__.py
│   │   │   ├── client.py
│   │   │   ├── constants.py
│   │   │   ├── run_results_errors
│   │   │   │   ├── __init__.py
│   │   │   │   ├── config.py
│   │   │   │   └── parser.py
│   │   │   └── tools.py
│   │   ├── dbt_cli
│   │   │   ├── binary_type.py
│   │   │   └── tools.py
│   │   ├── dbt_codegen
│   │   │   ├── __init__.py
│   │   │   └── tools.py
│   │   ├── discovery
│   │   │   ├── client.py
│   │   │   └── tools.py
│   │   ├── errors
│   │   │   ├── __init__.py
│   │   │   ├── admin_api.py
│   │   │   ├── base.py
│   │   │   ├── cli.py
│   │   │   ├── common.py
│   │   │   ├── discovery.py
│   │   │   ├── semantic_layer.py
│   │   │   └── sql.py
│   │   ├── gql
│   │   │   └── errors.py
│   │   ├── lsp
│   │   │   ├── __init__.py
│   │   │   ├── lsp_binary_manager.py
│   │   │   ├── lsp_client.py
│   │   │   ├── lsp_connection.py
│   │   │   ├── providers
│   │   │   │   ├── __init__.py
│   │   │   │   ├── local_lsp_client_provider.py
│   │   │   │   ├── local_lsp_connection_provider.py
│   │   │   │   ├── lsp_client_provider.py
│   │   │   │   └── lsp_connection_provider.py
│   │   │   └── tools.py
│   │   ├── main.py
│   │   ├── mcp
│   │   │   ├── create.py
│   │   │   └── server.py
│   │   ├── oauth
│   │   │   ├── client_id.py
│   │   │   ├── context_manager.py
│   │   │   ├── dbt_platform.py
│   │   │   ├── fastapi_app.py
│   │   │   ├── logging.py
│   │   │   ├── login.py
│   │   │   ├── refresh_strategy.py
│   │   │   ├── token_provider.py
│   │   │   └── token.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── admin_api
│   │   │   │   ├── cancel_job_run.md
│   │   │   │   ├── get_job_details.md
│   │   │   │   ├── get_job_run_artifact.md
│   │   │   │   ├── get_job_run_details.md
│   │   │   │   ├── get_job_run_error.md
│   │   │   │   ├── list_job_run_artifacts.md
│   │   │   │   ├── list_jobs_runs.md
│   │   │   │   ├── list_jobs.md
│   │   │   │   ├── retry_job_run.md
│   │   │   │   └── trigger_job_run.md
│   │   │   ├── dbt_cli
│   │   │   │   ├── args
│   │   │   │   │   ├── full_refresh.md
│   │   │   │   │   ├── limit.md
│   │   │   │   │   ├── resource_type.md
│   │   │   │   │   ├── selectors.md
│   │   │   │   │   ├── sql_query.md
│   │   │   │   │   └── vars.md
│   │   │   │   ├── build.md
│   │   │   │   ├── compile.md
│   │   │   │   ├── docs.md
│   │   │   │   ├── list.md
│   │   │   │   ├── parse.md
│   │   │   │   ├── run.md
│   │   │   │   ├── show.md
│   │   │   │   └── test.md
│   │   │   ├── dbt_codegen
│   │   │   │   ├── args
│   │   │   │   │   ├── case_sensitive_cols.md
│   │   │   │   │   ├── database_name.md
│   │   │   │   │   ├── generate_columns.md
│   │   │   │   │   ├── include_data_types.md
│   │   │   │   │   ├── include_descriptions.md
│   │   │   │   │   ├── leading_commas.md
│   │   │   │   │   ├── materialized.md
│   │   │   │   │   ├── model_name.md
│   │   │   │   │   ├── model_names.md
│   │   │   │   │   ├── schema_name.md
│   │   │   │   │   ├── source_name.md
│   │   │   │   │   ├── table_name.md
│   │   │   │   │   ├── table_names.md
│   │   │   │   │   ├── tables.md
│   │   │   │   │   └── upstream_descriptions.md
│   │   │   │   ├── generate_model_yaml.md
│   │   │   │   ├── generate_source.md
│   │   │   │   └── generate_staging_model.md
│   │   │   ├── discovery
│   │   │   │   ├── get_all_models.md
│   │   │   │   ├── get_all_sources.md
│   │   │   │   ├── get_exposure_details.md
│   │   │   │   ├── get_exposures.md
│   │   │   │   ├── get_mart_models.md
│   │   │   │   ├── get_model_children.md
│   │   │   │   ├── get_model_details.md
│   │   │   │   ├── get_model_health.md
│   │   │   │   └── get_model_parents.md
│   │   │   ├── lsp
│   │   │   │   ├── args
│   │   │   │   │   ├── column_name.md
│   │   │   │   │   └── model_id.md
│   │   │   │   └── get_column_lineage.md
│   │   │   ├── prompts.py
│   │   │   └── semantic_layer
│   │   │       ├── get_dimensions.md
│   │   │       ├── get_entities.md
│   │   │       ├── get_metrics_compiled_sql.md
│   │   │       ├── list_metrics.md
│   │   │       ├── list_saved_queries.md
│   │   │       └── query_metrics.md
│   │   ├── py.typed
│   │   ├── semantic_layer
│   │   │   ├── client.py
│   │   │   ├── gql
│   │   │   │   ├── gql_request.py
│   │   │   │   └── gql.py
│   │   │   ├── levenshtein.py
│   │   │   ├── tools.py
│   │   │   └── types.py
│   │   ├── sql
│   │   │   └── tools.py
│   │   ├── telemetry
│   │   │   └── logging.py
│   │   ├── tools
│   │   │   ├── annotations.py
│   │   │   ├── definitions.py
│   │   │   ├── policy.py
│   │   │   ├── register.py
│   │   │   ├── tool_names.py
│   │   │   └── toolsets.py
│   │   └── tracking
│   │       └── tracking.py
│   └── remote_mcp
│       ├── __init__.py
│       └── session.py
├── Taskfile.yml
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── env_vars.py
│   ├── integration
│   │   ├── __init__.py
│   │   ├── dbt_codegen
│   │   │   ├── __init__.py
│   │   │   └── test_dbt_codegen.py
│   │   ├── discovery
│   │   │   └── test_discovery.py
│   │   ├── initialization
│   │   │   ├── __init__.py
│   │   │   └── test_initialization.py
│   │   ├── lsp
│   │   │   └── test_lsp_connection.py
│   │   ├── remote_mcp
│   │   │   └── test_remote_mcp.py
│   │   ├── remote_tools
│   │   │   └── test_remote_tools.py
│   │   ├── semantic_layer
│   │   │   └── test_semantic_layer.py
│   │   └── tracking
│   │       └── test_tracking.py
│   ├── mocks
│   │   └── config.py
│   └── unit
│       ├── __init__.py
│       ├── config
│       │   ├── __init__.py
│       │   ├── test_config.py
│       │   └── test_transport.py
│       ├── dbt_admin
│       │   ├── __init__.py
│       │   ├── test_client.py
│       │   ├── test_error_fetcher.py
│       │   └── test_tools.py
│       ├── dbt_cli
│       │   ├── __init__.py
│       │   ├── test_cli_integration.py
│       │   └── test_tools.py
│       ├── dbt_codegen
│       │   ├── __init__.py
│       │   └── test_tools.py
│       ├── discovery
│       │   ├── __init__.py
│       │   ├── conftest.py
│       │   ├── test_exposures_fetcher.py
│       │   └── test_sources_fetcher.py
│       ├── lsp
│       │   ├── __init__.py
│       │   ├── test_local_lsp_client_provider.py
│       │   ├── test_local_lsp_connection_provider.py
│       │   ├── test_lsp_client.py
│       │   ├── test_lsp_connection.py
│       │   └── test_lsp_tools.py
│       ├── oauth
│       │   ├── test_credentials_provider.py
│       │   ├── test_fastapi_app_pagination.py
│       │   └── test_token.py
│       ├── semantic_layer
│       │   ├── __init__.py
│       │   └── test_saved_queries.py
│       ├── tools
│       │   ├── test_disable_tools.py
│       │   ├── test_tool_names.py
│       │   ├── test_tool_policies.py
│       │   └── test_toolsets.py
│       └── tracking
│           └── test_tracking.py
├── ui
│   ├── .gitignore
│   ├── assets
│   │   ├── dbt_logo BLK.svg
│   │   └── dbt_logo WHT.svg
│   ├── eslint.config.js
│   ├── index.html
│   ├── package.json
│   ├── pnpm-lock.yaml
│   ├── pnpm-workspace.yaml
│   ├── README.md
│   ├── src
│   │   ├── App.css
│   │   ├── App.tsx
│   │   ├── global.d.ts
│   │   ├── index.css
│   │   ├── main.tsx
│   │   └── vite-env.d.ts
│   ├── tsconfig.app.json
│   ├── tsconfig.json
│   ├── tsconfig.node.json
│   └── vite.config.ts
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/tests/unit/lsp/test_local_lsp_client_provider.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for LocalLSPClientProvider class."""

from unittest.mock import AsyncMock, MagicMock

import pytest

from dbt_mcp.lsp.lsp_client import LSPClient
from dbt_mcp.lsp.providers.local_lsp_client_provider import LocalLSPClientProvider
from dbt_mcp.lsp.providers.lsp_connection_provider import (
    LSPConnectionProvider,
    LSPConnectionProviderProtocol,
)


class MockLSPConnectionProvider(LSPConnectionProvider):
    """Mock implementation of LSPConnectionProvider for testing."""

    def __init__(self, mock_connection: LSPConnectionProviderProtocol | None = None):
        self.mock_connection = mock_connection or MagicMock()
        self.get_connection_call_count = 0
        self.cleanup_connection_call_count = 0
        self.should_raise_on_get_connection: Exception | None = None

    async def get_connection(self) -> LSPConnectionProviderProtocol:
        """Return the mock connection."""
        self.get_connection_call_count += 1
        if self.should_raise_on_get_connection:
            raise self.should_raise_on_get_connection
        return self.mock_connection

    async def cleanup_connection(self) -> None:
        """Track cleanup calls."""
        self.cleanup_connection_call_count += 1


@pytest.fixture
def mock_connection_provider() -> MockLSPConnectionProvider:
    """Create a test LSP connection provider."""
    return MockLSPConnectionProvider()


class TestLocalLSPClientProvider:
    """Test LocalLSPClientProvider class."""

    @pytest.mark.asyncio
    async def test_get_client_returns_lsp_client(
        self, mock_connection_provider: MockLSPConnectionProvider
    ) -> None:
        """Test that get_client returns an LSPClient instance."""
        provider = LocalLSPClientProvider(mock_connection_provider)

        # Get client
        client = await provider.get_client()

        # Verify client is LSPClient instance
        assert isinstance(client, LSPClient)

        # Verify connection provider was called
        assert mock_connection_provider.get_connection_call_count == 1

        # Verify client has the correct connection
        assert client.lsp_connection is mock_connection_provider.mock_connection

    @pytest.mark.asyncio
    async def test_get_client_with_custom_timeout(
        self, mock_connection_provider: MockLSPConnectionProvider
    ) -> None:
        """Test that get_client passes custom timeout to LSPClient."""
        custom_timeout = 120.0
        provider = LocalLSPClientProvider(
            mock_connection_provider, timeout=custom_timeout
        )

        # Get client
        client = await provider.get_client()

        # Verify client has the custom timeout
        assert isinstance(client, LSPClient)
        assert client.timeout == custom_timeout

    @pytest.mark.asyncio
    async def test_get_client_with_default_timeout(
        self, mock_connection_provider: MockLSPConnectionProvider
    ) -> None:
        """Test that get_client uses default timeout when not specified."""
        provider = LocalLSPClientProvider(mock_connection_provider)

        # Get client
        client = await provider.get_client()

        # Verify client uses the default timeout from LSPClient
        from dbt_mcp.lsp.lsp_client import DEFAULT_LSP_TIMEOUT

        assert isinstance(client, LSPClient)
        assert client.timeout == DEFAULT_LSP_TIMEOUT

    @pytest.mark.asyncio
    async def test_get_client_with_none_timeout(
        self, mock_connection_provider: MockLSPConnectionProvider
    ) -> None:
        """Test that get_client handles None timeout correctly."""
        provider = LocalLSPClientProvider(mock_connection_provider, timeout=None)

        # Get client
        client = await provider.get_client()

        # Verify client uses the default timeout
        from dbt_mcp.lsp.lsp_client import DEFAULT_LSP_TIMEOUT

        assert isinstance(client, LSPClient)
        assert client.timeout == DEFAULT_LSP_TIMEOUT

    @pytest.mark.asyncio
    async def test_get_client_multiple_calls_create_new_clients(
        self, mock_connection_provider: MockLSPConnectionProvider
    ) -> None:
        """Test that multiple calls to get_client create new client instances."""
        provider = LocalLSPClientProvider(mock_connection_provider)

        # Get multiple clients
        client1 = await provider.get_client()
        client2 = await provider.get_client()
        client3 = await provider.get_client()

        # Each call should create a new LSPClient instance
        assert isinstance(client1, LSPClient)
        assert isinstance(client2, LSPClient)
        assert isinstance(client3, LSPClient)
        assert client1 is not client2
        assert client2 is not client3
        assert client1 is not client3

        # But all should use the same connection (from the provider)
        mock_connection = mock_connection_provider.mock_connection
        assert client1.lsp_connection is mock_connection
        assert client2.lsp_connection is mock_connection
        assert client3.lsp_connection is mock_connection

        # Connection provider should be called each time
        assert mock_connection_provider.get_connection_call_count == 3

    @pytest.mark.asyncio
    async def test_get_client_propagates_connection_provider_errors(
        self, mock_connection_provider: MockLSPConnectionProvider
    ) -> None:
        """Test that get_client propagates errors from connection provider."""
        provider = LocalLSPClientProvider(mock_connection_provider)

        # Configure connection provider to raise an error
        mock_connection_provider.should_raise_on_get_connection = RuntimeError(
            "Connection failed"
        )

        # Get client should propagate the error
        with pytest.raises(RuntimeError, match="Connection failed"):
            await provider.get_client()

    @pytest.mark.asyncio
    async def test_client_has_correct_connection(
        self, mock_connection_provider: MockLSPConnectionProvider
    ) -> None:
        """Test that created client has the correct connection reference."""
        provider = LocalLSPClientProvider(mock_connection_provider, timeout=45.0)

        # Get client
        client = await provider.get_client()

        # Verify client configuration
        assert isinstance(client, LSPClient)
        assert client.lsp_connection is mock_connection_provider.mock_connection
        assert client.timeout == 45.0

    @pytest.mark.asyncio
    async def test_integration_with_real_lsp_client_methods(
        self, mock_connection_provider: MockLSPConnectionProvider
    ) -> None:
        """Test that the returned client can call LSPClient methods."""
        # Setup mock connection with required methods
        mock_connection = MagicMock()
        mock_connection.compiled = MagicMock(return_value=True)
        mock_connection.send_request = AsyncMock(
            return_value={"nodes": [{"id": "test"}]}
        )
        mock_connection_provider.mock_connection = mock_connection

        provider = LocalLSPClientProvider(mock_connection_provider, timeout=60.0)

        # Get client
        client = await provider.get_client()

        # Verify we can call LSPClient methods
        assert isinstance(client, LSPClient)
        result = await client.get_column_lineage("model.test.table", "column_name")

        # Verify the method was called on the connection
        mock_connection.send_request.assert_called()
        assert "nodes" in result

    def test_provider_initialization(
        self, mock_connection_provider: MockLSPConnectionProvider
    ) -> None:
        """Test that provider initializes correctly."""
        # With custom timeout
        provider1 = LocalLSPClientProvider(mock_connection_provider, timeout=90.0)
        assert provider1.lsp_connection_provider is mock_connection_provider
        assert provider1.timeout == 90.0

        # Without timeout
        provider2 = LocalLSPClientProvider(mock_connection_provider)
        assert provider2.lsp_connection_provider is mock_connection_provider
        assert provider2.timeout is None

    @pytest.mark.asyncio
    async def test_provider_works_with_different_timeouts(
        self, mock_connection_provider: MockLSPConnectionProvider
    ) -> None:
        """Test that different provider instances can have different timeouts."""
        # Create providers with different timeouts
        provider1 = LocalLSPClientProvider(mock_connection_provider, timeout=30.0)
        provider2 = LocalLSPClientProvider(mock_connection_provider, timeout=60.0)
        provider3 = LocalLSPClientProvider(mock_connection_provider, timeout=120.0)

        # Get clients from each provider
        client1 = await provider1.get_client()
        client2 = await provider2.get_client()
        client3 = await provider3.get_client()

        # Verify each client has its respective timeout
        assert isinstance(client1, LSPClient)
        assert isinstance(client2, LSPClient)
        assert isinstance(client3, LSPClient)
        assert client1.timeout == 30.0
        assert client2.timeout == 60.0
        assert client3.timeout == 120.0

```

--------------------------------------------------------------------------------
/src/dbt_mcp/dbt_cli/tools.py:
--------------------------------------------------------------------------------

```python
import os
import subprocess
from collections.abc import Iterable, Sequence

from mcp.server.fastmcp import FastMCP
from pydantic import Field

from dbt_mcp.config.config import DbtCliConfig
from dbt_mcp.dbt_cli.binary_type import get_color_disable_flag
from dbt_mcp.prompts.prompts import get_prompt
from dbt_mcp.tools.definitions import ToolDefinition
from dbt_mcp.tools.register import register_tools
from dbt_mcp.tools.tool_names import ToolName
from dbt_mcp.tools.annotations import create_tool_annotations


def create_dbt_cli_tool_definitions(config: DbtCliConfig) -> list[ToolDefinition]:
    def _run_dbt_command(
        command: list[str],
        selector: str | None = None,
        resource_type: list[str] | None = None,
        is_selectable: bool = False,
        is_full_refresh: bool | None = False,
        vars: str | None = None,
    ) -> str:
        try:
            # Commands that should always be quiet to reduce output verbosity
            verbose_commands = [
                "build",
                "compile",
                "docs",
                "parse",
                "run",
                "test",
                "list",
            ]

            if is_full_refresh is True:
                command.append("--full-refresh")

            if vars and isinstance(vars, str):
                command.extend(["--vars", vars])

            if selector:
                selector_params = str(selector).split(" ")
                command.extend(["--select"] + selector_params)

            if isinstance(resource_type, Iterable):
                command.extend(["--resource-type"] + resource_type)

            full_command = command.copy()
            # Add --quiet flag to specific commands to reduce context window usage
            if len(full_command) > 0 and full_command[0] in verbose_commands:
                main_command = full_command[0]
                command_args = full_command[1:] if len(full_command) > 1 else []
                full_command = [main_command, "--quiet", *command_args]

            # We change the path only if this is an absolute path, otherwise we can have
            # problems with relative paths applied multiple times as DBT_PROJECT_DIR
            # is applied to dbt Core and Fusion as well (but not the dbt Cloud CLI)
            cwd_path = config.project_dir if os.path.isabs(config.project_dir) else None

            # Add appropriate color disable flag based on binary type
            color_flag = get_color_disable_flag(config.binary_type)
            args = [config.dbt_path, color_flag, *full_command]

            process = subprocess.Popen(
                args=args,
                cwd=cwd_path,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                stdin=subprocess.DEVNULL,
                text=True,
            )
            output, _ = process.communicate(timeout=config.dbt_cli_timeout)
            return output or "OK"
        except subprocess.TimeoutExpired:
            return "Timeout: dbt command took too long to complete." + (
                " Try using a specific selector to narrow down the results."
                if is_selectable
                else ""
            )

    def build(
        selector: str | None = Field(
            default=None, description=get_prompt("dbt_cli/args/selectors")
        ),
        is_full_refresh: bool | None = Field(
            default=None, description=get_prompt("dbt_cli/args/full_refresh")
        ),
        vars: str | None = Field(
            default=None, description=get_prompt("dbt_cli/args/vars")
        ),
    ) -> str:
        return _run_dbt_command(
            ["build"],
            selector,
            is_selectable=True,
            is_full_refresh=is_full_refresh,
            vars=vars,
        )

    def compile() -> str:
        return _run_dbt_command(["compile"])

    def docs() -> str:
        return _run_dbt_command(["docs", "generate"])

    def ls(
        selector: str | None = Field(
            default=None, description=get_prompt("dbt_cli/args/selectors")
        ),
        resource_type: list[str] | None = Field(
            default=None,
            description=get_prompt("dbt_cli/args/resource_type"),
        ),
    ) -> str:
        return _run_dbt_command(
            ["list"],
            selector,
            resource_type=resource_type,
            is_selectable=True,
        )

    def parse() -> str:
        return _run_dbt_command(["parse"])

    def run(
        selector: str | None = Field(
            default=None, description=get_prompt("dbt_cli/args/selectors")
        ),
        is_full_refresh: bool | None = Field(
            default=None, description=get_prompt("dbt_cli/args/full_refresh")
        ),
        vars: str | None = Field(
            default=None, description=get_prompt("dbt_cli/args/vars")
        ),
    ) -> str:
        return _run_dbt_command(
            ["run"],
            selector,
            is_selectable=True,
            is_full_refresh=is_full_refresh,
            vars=vars,
        )

    def test(
        selector: str | None = Field(
            default=None, description=get_prompt("dbt_cli/args/selectors")
        ),
        vars: str | None = Field(
            default=None, description=get_prompt("dbt_cli/args/vars")
        ),
    ) -> str:
        return _run_dbt_command(["test"], selector, is_selectable=True, vars=vars)

    def show(
        sql_query: str = Field(description=get_prompt("dbt_cli/args/sql_query")),
        limit: int = Field(default=5, description=get_prompt("dbt_cli/args/limit")),
    ) -> str:
        args = ["show", "--inline", sql_query, "--favor-state"]
        # This is quite crude, but it should be okay for now
        # until we have a dbt Fusion integration.
        cli_limit = None
        if "limit" in sql_query.lower():
            # When --limit=-1, dbt won't apply a separate limit.
            cli_limit = -1
        elif limit:
            # This can be problematic if the LLM provides
            # a SQL limit and a `limit` argument. However, preferencing the limit
            # in the SQL query leads to a better experience when the LLM
            # makes that mistake.
            cli_limit = limit
        if cli_limit is not None:
            args.extend(["--limit", str(cli_limit)])
        args.extend(["--output", "json"])
        return _run_dbt_command(args)

    return [
        ToolDefinition(
            fn=build,
            description=get_prompt("dbt_cli/build"),
            annotations=create_tool_annotations(
                title="dbt build",
                read_only_hint=False,
                destructive_hint=True,
                idempotent_hint=False,
            ),
        ),
        ToolDefinition(
            fn=compile,
            description=get_prompt("dbt_cli/compile"),
            annotations=create_tool_annotations(
                title="dbt compile",
                read_only_hint=True,
                destructive_hint=False,
                idempotent_hint=True,
            ),
        ),
        ToolDefinition(
            fn=docs,
            description=get_prompt("dbt_cli/docs"),
            annotations=create_tool_annotations(
                title="dbt docs",
                read_only_hint=True,
                destructive_hint=False,
                idempotent_hint=True,
            ),
        ),
        ToolDefinition(
            name="list",
            fn=ls,
            description=get_prompt("dbt_cli/list"),
            annotations=create_tool_annotations(
                title="dbt list",
                read_only_hint=True,
                destructive_hint=False,
                idempotent_hint=True,
            ),
        ),
        ToolDefinition(
            fn=parse,
            description=get_prompt("dbt_cli/parse"),
            annotations=create_tool_annotations(
                title="dbt parse",
                read_only_hint=True,
                destructive_hint=False,
                idempotent_hint=True,
            ),
        ),
        ToolDefinition(
            fn=run,
            description=get_prompt("dbt_cli/run"),
            annotations=create_tool_annotations(
                title="dbt run",
                read_only_hint=False,
                destructive_hint=True,
                idempotent_hint=False,
            ),
        ),
        ToolDefinition(
            fn=test,
            description=get_prompt("dbt_cli/test"),
            annotations=create_tool_annotations(
                title="dbt test",
                read_only_hint=False,
                destructive_hint=True,
                idempotent_hint=False,
            ),
        ),
        ToolDefinition(
            fn=show,
            description=get_prompt("dbt_cli/show"),
            annotations=create_tool_annotations(
                title="dbt show",
                read_only_hint=True,
                destructive_hint=False,
                idempotent_hint=True,
            ),
        ),
    ]


def register_dbt_cli_tools(
    dbt_mcp: FastMCP,
    config: DbtCliConfig,
    exclude_tools: Sequence[ToolName] = [],
) -> None:
    register_tools(
        dbt_mcp,
        create_dbt_cli_tool_definitions(config),
        exclude_tools,
    )

```

--------------------------------------------------------------------------------
/src/dbt_mcp/dbt_admin/run_results_errors/parser.py:
--------------------------------------------------------------------------------

```python
import logging
from typing import Any

from pydantic import ValidationError

from dbt_mcp.config.config_providers import AdminApiConfig
from dbt_mcp.dbt_admin.client import DbtAdminAPIClient
from dbt_mcp.dbt_admin.constants import (
    SOURCE_FRESHNESS_STEP_NAME,
    STATUS_MAP,
    JobRunStatus,
    RunResultsStatus,
)
from dbt_mcp.dbt_admin.run_results_errors.config import (
    ErrorResultSchema,
    ErrorStepSchema,
    RunDetailsSchema,
    RunResultsArtifactSchema,
    RunResultSchema,
    RunStepSchema,
)
from dbt_mcp.errors import ArtifactRetrievalError

logger = logging.getLogger(__name__)


class ErrorFetcher:
    """Parses dbt Cloud job run data to extract focused error information."""

    def __init__(
        self,
        run_id: int,
        run_details: dict[str, Any],
        client: DbtAdminAPIClient,
        admin_api_config: AdminApiConfig,
    ):
        """
        Initialize parser with run data.
        Args:
            run_id: dbt Cloud job run ID
            run_details: Raw run details from get_job_run_details()
            client: DbtAdminAPIClient instance for fetching artifacts
            admin_api_config: Admin API configuration
        """
        self.run_id = run_id
        self.run_details = run_details
        self.client = client
        self.admin_api_config = admin_api_config

    async def analyze_run_errors(self) -> dict[str, Any]:
        """Parse the run data and return all failed steps with their details."""
        try:
            run_details = RunDetailsSchema.model_validate(self.run_details)
            failed_steps = self._find_all_failed_steps(run_details)

            if run_details.is_cancelled:
                error_result = self._create_error_result(
                    message="Job run was cancelled",
                    finished_at=run_details.finished_at,
                )
                return {"failed_steps": [error_result]}

            if not failed_steps:
                error_result = self._create_error_result("No failed step found")
                return {"failed_steps": [error_result]}

            processed_steps = []
            for step in failed_steps:
                step_result = await self._get_failure_details(step)
                processed_steps.append(step_result)

            return {"failed_steps": processed_steps}

        except ValidationError as e:
            logger.error(f"Schema validation failed for run {self.run_id}: {e}")
            error_result = self._create_error_result(f"Validation failed: {e!s}")
            return {"failed_steps": [error_result]}
        except Exception as e:
            logger.error(f"Error analyzing run {self.run_id}: {e}")
            error_result = self._create_error_result(str(e))
            return {"failed_steps": [error_result]}

    def _find_all_failed_steps(
        self, run_details: RunDetailsSchema
    ) -> list[RunStepSchema]:
        """Find all failed steps in the run."""
        failed_steps = []
        for step in run_details.run_steps:
            if step.status == STATUS_MAP[JobRunStatus.ERROR]:
                failed_steps.append(step)
        return failed_steps

    async def _get_failure_details(self, failed_step: RunStepSchema) -> dict[str, Any]:
        """Get simplified failure information from failed step."""
        run_results_content = await self._fetch_run_results_artifact(failed_step)

        if not run_results_content:
            return self._handle_artifact_error(failed_step)

        return self._parse_run_results(run_results_content, failed_step)

    async def _fetch_run_results_artifact(
        self, failed_step: RunStepSchema
    ) -> str | None:
        """Fetch run_results.json artifact for the failed step."""
        step_index = failed_step.index

        try:
            if step_index is not None:
                run_results_content = await self.client.get_job_run_artifact(
                    self.admin_api_config.account_id,
                    self.run_id,
                    "run_results.json",
                    step=step_index,
                )
                logger.info(f"Got run_results.json from failed step {step_index}")
                return run_results_content
            else:
                raise ArtifactRetrievalError(
                    "No step index available for artifact retrieval"
                )

        except Exception as e:
            logger.error(f"Failed to get run_results.json from step {step_index}: {e}")
            return None

    def _parse_run_results(
        self, run_results_content: str, failed_step: RunStepSchema
    ) -> dict[str, Any]:
        """Parse run_results.json content and extract errors."""
        try:
            run_results = RunResultsArtifactSchema.model_validate_json(
                run_results_content
            )
            errors = self._extract_errors_from_results(run_results.results)

            return self._build_error_response(errors, failed_step, run_results.args)

        except ValidationError as e:
            logger.warning(f"run_results.json validation failed: {e}")
            return self._handle_artifact_error(failed_step, e)
        except Exception as e:
            return self._handle_artifact_error(failed_step, e)

    def _extract_errors_from_results(
        self, results: list[RunResultSchema]
    ) -> list[ErrorResultSchema]:
        """Extract error results from run results."""
        errors = []
        for result in results:
            if result.status in [
                RunResultsStatus.ERROR.value,
                RunResultsStatus.FAIL.value,
            ]:
                relation_name = (
                    result.relation_name
                    if result.relation_name is not None
                    else "No database relation"
                )
                error = ErrorResultSchema(
                    unique_id=result.unique_id,
                    relation_name=relation_name,
                    message=result.message or "",
                    compiled_code=result.compiled_code,
                )
                errors.append(error)
        return errors

    def _build_error_response(
        self,
        errors: list[ErrorResultSchema],
        failed_step: RunStepSchema,
        args: Any | None,
    ) -> dict[str, Any]:
        """Build the final error response structure."""
        target = args.target if args else None
        step_name = failed_step.name
        finished_at = failed_step.finished_at
        truncated_logs = self._truncated_logs(failed_step)

        if errors:
            return ErrorStepSchema(
                errors=errors,
                step_name=step_name,
                finished_at=finished_at,
                target=target,
            ).model_dump()

        message = "No failures found in run_results.json"

        return self._create_error_result(
            message=message,
            target=target,
            step_name=step_name,
            finished_at=finished_at,
            truncated_logs=truncated_logs,
        )

    def _create_error_result(
        self,
        message: str,
        unique_id: str | None = None,
        relation_name: str | None = None,
        target: str | None = None,
        step_name: str | None = None,
        finished_at: str | None = None,
        compiled_code: str | None = None,
        truncated_logs: str | None = None,
    ) -> dict[str, Any]:
        """Create a standardized error results using ErrorStepSchema."""
        error = ErrorResultSchema(
            unique_id=unique_id,
            relation_name=relation_name,
            message=message,
            compiled_code=compiled_code,
            truncated_logs=truncated_logs,
        )
        return ErrorStepSchema(
            errors=[error],
            step_name=step_name,
            finished_at=finished_at,
            target=target,
        ).model_dump()

    def _handle_artifact_error(
        self, failed_step: RunStepSchema, error: Exception | None = None
    ) -> dict[str, Any]:
        """Handle cases where run_results.json is not available."""
        relation_name = "No database relation"
        step_name = failed_step.name
        finished_at = failed_step.finished_at
        truncated_logs = self._truncated_logs(failed_step)

        # Special handling for source freshness steps
        if SOURCE_FRESHNESS_STEP_NAME.lower() in step_name.lower():
            message = "Source freshness error - returning logs"
        else:
            message = "run_results.json not available - returning logs"

        return self._create_error_result(
            message=message,
            relation_name=relation_name,
            step_name=step_name,
            finished_at=finished_at,
            truncated_logs=truncated_logs,
        )

    def _truncated_logs(self, failed_step: RunStepSchema) -> str | None:
        """Truncate logs to the last 50 lines."""
        TRUNCATED_LOGS_LENGTH = 50

        split_logs = failed_step.logs.splitlines() if failed_step.logs else []
        if len(split_logs) > TRUNCATED_LOGS_LENGTH:
            split_logs = [
                f"Logs truncated to last {TRUNCATED_LOGS_LENGTH} lines..."
            ] + split_logs[-TRUNCATED_LOGS_LENGTH:]
        return "\n".join(split_logs)

```

--------------------------------------------------------------------------------
/src/dbt_mcp/dbt_admin/client.py:
--------------------------------------------------------------------------------

```python
import logging
from functools import cache
from typing import Any

import requests

from dbt_mcp.config.config_providers import (
    AdminApiConfig,
    ConfigProvider,
)
from dbt_mcp.errors import AdminAPIError

logger = logging.getLogger(__name__)


class DbtAdminAPIClient:
    """Client for interacting with the dbt Admin API."""

    def __init__(self, config_provider: ConfigProvider[AdminApiConfig]):
        self.config_provider = config_provider

    async def get_config(self) -> AdminApiConfig:
        return await self.config_provider.get_config()

    async def get_headers(self) -> dict[str, str]:
        config = await self.get_config()
        return {
            "Content-Type": "application/json",
            "Accept": "application/json",
        } | config.headers_provider.get_headers()

    async def _make_request(
        self, method: str, endpoint: str, **kwargs
    ) -> dict[str, Any]:
        """Make a request to the dbt API."""
        config = await self.get_config()
        url = f"{config.url}{endpoint}"
        headers = await self.get_headers()

        try:
            response = requests.request(method, url, headers=headers, **kwargs)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            logger.error(f"API request failed: {e}")
            raise AdminAPIError(f"API request failed: {e}")

    @cache
    async def list_jobs(self, account_id: int, **params) -> list[dict[str, Any]]:
        """List jobs for an account."""
        result = await self._make_request(
            "GET",
            f"/api/v2/accounts/{account_id}/jobs/?include_related=['most_recent_run','most_recent_completed_run']",
            params=params,
        )
        data = result.get("data", [])

        # we filter the data to the most relevant fields
        # the rest of the fields can be retrieved with the get_job tool
        filtered_data = [
            {
                "id": job.get("id"),
                "name": job.get("name"),
                "description": job.get("description"),
                "dbt_version": job.get("dbt_version"),
                "job_type": job.get("job_type"),
                "triggers": job.get("triggers"),
                "most_recent_run_id": job.get("most_recent_run").get("id")
                if job.get("most_recent_run")
                else None,
                "most_recent_run_status": job.get("most_recent_run").get(
                    "status_humanized"
                )
                if job.get("most_recent_run")
                else None,
                "most_recent_run_started_at": job.get("most_recent_run").get(
                    "started_at"
                )
                if job.get("most_recent_run")
                else None,
                "most_recent_run_finished_at": job.get("most_recent_run").get(
                    "finished_at"
                )
                if job.get("most_recent_run")
                else None,
                "most_recent_completed_run_id": job.get(
                    "most_recent_completed_run"
                ).get("id")
                if job.get("most_recent_completed_run")
                else None,
                "most_recent_completed_run_status": job.get(
                    "most_recent_completed_run"
                ).get("status_humanized")
                if job.get("most_recent_completed_run")
                else None,
                "most_recent_completed_run_started_at": job.get(
                    "most_recent_completed_run"
                ).get("started_at")
                if job.get("most_recent_completed_run")
                else None,
                "most_recent_completed_run_finished_at": job.get(
                    "most_recent_completed_run"
                ).get("finished_at")
                if job.get("most_recent_completed_run")
                else None,
                "schedule": job.get("schedule").get("cron")
                if job.get("schedule")
                else None,
                "next_run": job.get("next_run"),
            }
            for job in data
        ]

        return filtered_data

    async def get_job_details(self, account_id: int, job_id: int) -> dict[str, Any]:
        """Get details for a specific job."""
        result = await self._make_request(
            "GET",
            f"/api/v2/accounts/{account_id}/jobs/{job_id}/?include_related=['most_recent_run','most_recent_completed_run']",
        )
        return result.get("data", {})

    async def trigger_job_run(
        self, account_id: int, job_id: int, cause: str, **kwargs
    ) -> dict[str, Any]:
        """Trigger a job run."""
        data = {"cause": cause, **kwargs}
        result = await self._make_request(
            "POST", f"/api/v2/accounts/{account_id}/jobs/{job_id}/run/", json=data
        )
        return result.get("data", {})

    async def list_jobs_runs(self, account_id: int, **params) -> list[dict[str, Any]]:
        """List runs for an account."""
        extra_info = "?include_related=['job']"
        result = await self._make_request(
            "GET", f"/api/v2/accounts/{account_id}/runs/{extra_info}", params=params
        )

        data = result.get("data", [])

        # we remove less relevant fields from the data we get to avoid filling the context with too much data
        for run in data:
            run["job_name"] = run.get("job", {}).get("name", "")
            run["job_steps"] = run.get("job", {}).get("execute_step", "")
            run.pop("job", None)
            run.pop("account_id", None)
            run.pop("environment_id", None)
            run.pop("blocked_by", None)
            run.pop("used_repo_cache", None)
            run.pop("audit", None)
            run.pop("created_at_humanized", None)
            run.pop("duration_humanized", None)
            run.pop("finished_at_humanized", None)
            run.pop("queued_duration_humanized", None)
            run.pop("run_duration_humanized", None)
            run.pop("artifacts_saved", None)
            run.pop("artifact_s3_path", None)
            run.pop("has_docs_generated", None)
            run.pop("has_sources_generated", None)
            run.pop("notifications_sent", None)
            run.pop("executed_by_thread_id", None)
            run.pop("updated_at", None)
            run.pop("dequeued_at", None)
            run.pop("last_checked_at", None)
            run.pop("last_heartbeat_at", None)
            run.pop("trigger", None)
            run.pop("run_steps", None)
            run.pop("deprecation", None)
            run.pop("environment", None)

        return data

    async def get_job_run_details(
        self, account_id: int, run_id: int, include_logs: bool = False
    ) -> dict[str, Any]:
        """Get details for a specific job run."""

        incl = "?include_related=['run_steps']"
        result = await self._make_request(
            "GET", f"/api/v2/accounts/{account_id}/runs/{run_id}/{incl}"
        )
        data = result.get("data", {})

        # we remove the truncated debug logs and logs (conditionally), they are not very relevant
        for step in data.get("run_steps", []):
            if not include_logs:
                step.pop("logs", None)
            step.pop("truncated_debug_logs", None)

        return data

    async def cancel_job_run(self, account_id: int, run_id: int) -> dict[str, Any]:
        """Cancel a job run."""
        result = await self._make_request(
            "POST", f"/api/v2/accounts/{account_id}/runs/{run_id}/cancel/"
        )
        return result.get("data", {})

    async def retry_job_run(self, account_id: int, run_id: int) -> dict[str, Any]:
        """Retry a failed job run."""
        result = await self._make_request(
            "POST", f"/api/v2/accounts/{account_id}/runs/{run_id}/retry/"
        )
        return result.get("data", {})

    async def list_job_run_artifacts(self, account_id: int, run_id: int) -> list[str]:
        """List artifacts for a job run."""
        result = await self._make_request(
            "GET", f"/api/v2/accounts/{account_id}/runs/{run_id}/artifacts/"
        )
        data = result.get("data", [])

        # we remove the compiled and run artifacts, they are not very relevant and there are thousands of them, filling the context
        filtered_data = [
            artifact
            for artifact in data
            if (
                not artifact.startswith("compiled/") and not artifact.startswith("run/")
            )
        ]
        return filtered_data

    async def get_job_run_artifact(
        self,
        account_id: int,
        run_id: int,
        artifact_path: str,
        step: int | None = None,
    ) -> Any:
        """Get a specific job run artifact."""
        params = {}
        if step:
            params["step"] = step

        config = await self.get_config()
        get_artifact_header = {
            "Accept": "*/*",
        } | config.headers_provider.get_headers()

        response = requests.get(
            f"{config.url}/api/v2/accounts/{account_id}/runs/{run_id}/artifacts/{artifact_path}",
            headers=get_artifact_header,
            params=params,
        )
        response.raise_for_status()
        return response.text

```

--------------------------------------------------------------------------------
/src/dbt_mcp/dbt_admin/tools.py:
--------------------------------------------------------------------------------

```python
import logging
from collections.abc import Sequence
from typing import Any

from mcp.server.fastmcp import FastMCP

from dbt_mcp.config.config_providers import (
    AdminApiConfig,
    ConfigProvider,
)
from dbt_mcp.dbt_admin.client import DbtAdminAPIClient
from dbt_mcp.dbt_admin.constants import JobRunStatus, STATUS_MAP
from dbt_mcp.dbt_admin.run_results_errors import ErrorFetcher
from dbt_mcp.prompts.prompts import get_prompt
from dbt_mcp.tools.annotations import create_tool_annotations
from dbt_mcp.tools.definitions import ToolDefinition
from dbt_mcp.tools.register import register_tools
from dbt_mcp.tools.tool_names import ToolName

logger = logging.getLogger(__name__)


def create_admin_api_tool_definitions(
    admin_client: DbtAdminAPIClient,
    admin_api_config_provider: ConfigProvider[AdminApiConfig],
) -> list[ToolDefinition]:
    async def list_jobs(
        # TODO: add support for project_id in the future
        # project_id: Optional[int] = None,
        limit: int | None = None,
        offset: int | None = None,
    ) -> list[dict[str, Any]]:
        """List jobs in an account."""
        admin_api_config = await admin_api_config_provider.get_config()
        params = {}
        # if project_id:
        #     params["project_id"] = project_id
        if admin_api_config.prod_environment_id:
            params["environment_id"] = admin_api_config.prod_environment_id
        if limit:
            params["limit"] = limit
        if offset:
            params["offset"] = offset
        return await admin_client.list_jobs(admin_api_config.account_id, **params)

    async def get_job_details(job_id: int) -> dict[str, Any]:
        """Get details for a specific job."""
        admin_api_config = await admin_api_config_provider.get_config()
        return await admin_client.get_job_details(admin_api_config.account_id, job_id)

    async def trigger_job_run(
        job_id: int,
        cause: str = "Triggered by dbt MCP",
        git_branch: str | None = None,
        git_sha: str | None = None,
        schema_override: str | None = None,
    ) -> dict[str, Any]:
        """Trigger a job run."""
        admin_api_config = await admin_api_config_provider.get_config()
        kwargs = {}
        if git_branch:
            kwargs["git_branch"] = git_branch
        if git_sha:
            kwargs["git_sha"] = git_sha
        if schema_override:
            kwargs["schema_override"] = schema_override
        return await admin_client.trigger_job_run(
            admin_api_config.account_id, job_id, cause, **kwargs
        )

    async def list_jobs_runs(
        job_id: int | None = None,
        status: JobRunStatus | None = None,
        limit: int | None = None,
        offset: int | None = None,
        order_by: str | None = None,
    ) -> list[dict[str, Any]]:
        """List runs in an account."""
        admin_api_config = await admin_api_config_provider.get_config()
        params: dict[str, Any] = {}
        if job_id:
            params["job_definition_id"] = job_id
        if status:
            status_id = STATUS_MAP[status]
            params["status"] = status_id
        if limit:
            params["limit"] = limit
        if offset:
            params["offset"] = offset
        if order_by:
            params["order_by"] = order_by
        return await admin_client.list_jobs_runs(admin_api_config.account_id, **params)

    async def get_job_run_details(
        run_id: int,
    ) -> dict[str, Any]:
        """Get details for a specific job run."""
        admin_api_config = await admin_api_config_provider.get_config()
        return await admin_client.get_job_run_details(
            admin_api_config.account_id, run_id
        )

    async def cancel_job_run(run_id: int) -> dict[str, Any]:
        """Cancel a job run."""
        admin_api_config = await admin_api_config_provider.get_config()
        return await admin_client.cancel_job_run(admin_api_config.account_id, run_id)

    async def retry_job_run(run_id: int) -> dict[str, Any]:
        """Retry a failed job run."""
        admin_api_config = await admin_api_config_provider.get_config()
        return await admin_client.retry_job_run(admin_api_config.account_id, run_id)

    async def list_job_run_artifacts(run_id: int) -> list[str]:
        """List artifacts for a job run."""
        admin_api_config = await admin_api_config_provider.get_config()
        return await admin_client.list_job_run_artifacts(
            admin_api_config.account_id, run_id
        )

    async def get_job_run_artifact(
        run_id: int, artifact_path: str, step: int | None = None
    ) -> Any:
        """Get a specific job run artifact."""
        admin_api_config = await admin_api_config_provider.get_config()
        return await admin_client.get_job_run_artifact(
            admin_api_config.account_id, run_id, artifact_path, step
        )

    async def get_job_run_error(run_id: int) -> dict[str, Any] | str:
        """Get focused error information for a failed job run."""
        try:
            admin_api_config = await admin_api_config_provider.get_config()
            run_details = await admin_client.get_job_run_details(
                admin_api_config.account_id, run_id, include_logs=True
            )
            error_fetcher = ErrorFetcher(
                run_id, run_details, admin_client, admin_api_config
            )
            return await error_fetcher.analyze_run_errors()

        except Exception as e:
            logger.error(f"Error getting run error details for {run_id}: {e}")
            return str(e)

    return [
        ToolDefinition(
            description=get_prompt("admin_api/list_jobs"),
            fn=list_jobs,
            annotations=create_tool_annotations(
                title="List Jobs",
                read_only_hint=True,
                destructive_hint=False,
                idempotent_hint=True,
            ),
        ),
        ToolDefinition(
            description=get_prompt("admin_api/get_job_details"),
            fn=get_job_details,
            annotations=create_tool_annotations(
                title="Get Job Details",
                read_only_hint=True,
                destructive_hint=False,
                idempotent_hint=True,
            ),
        ),
        ToolDefinition(
            description=get_prompt("admin_api/trigger_job_run"),
            fn=trigger_job_run,
            annotations=create_tool_annotations(
                title="Trigger Job Run",
                read_only_hint=False,
                destructive_hint=False,
                idempotent_hint=False,
            ),
        ),
        ToolDefinition(
            description=get_prompt("admin_api/list_jobs_runs"),
            fn=list_jobs_runs,
            annotations=create_tool_annotations(
                title="List Jobs Runs",
                read_only_hint=True,
                destructive_hint=False,
                idempotent_hint=True,
            ),
        ),
        ToolDefinition(
            description=get_prompt("admin_api/get_job_run_details"),
            fn=get_job_run_details,
            annotations=create_tool_annotations(
                title="Get Job Run Details",
                read_only_hint=True,
                destructive_hint=False,
                idempotent_hint=True,
            ),
        ),
        ToolDefinition(
            description=get_prompt("admin_api/cancel_job_run"),
            fn=cancel_job_run,
            annotations=create_tool_annotations(
                title="Cancel Job Run",
                read_only_hint=False,
                destructive_hint=False,
                idempotent_hint=False,
            ),
        ),
        ToolDefinition(
            description=get_prompt("admin_api/retry_job_run"),
            fn=retry_job_run,
            annotations=create_tool_annotations(
                title="Retry Job Run",
                read_only_hint=False,
                destructive_hint=False,
                idempotent_hint=False,
            ),
        ),
        ToolDefinition(
            description=get_prompt("admin_api/list_job_run_artifacts"),
            fn=list_job_run_artifacts,
            annotations=create_tool_annotations(
                title="List Job Run Artifacts",
                read_only_hint=True,
                destructive_hint=False,
                idempotent_hint=True,
            ),
        ),
        ToolDefinition(
            description=get_prompt("admin_api/get_job_run_artifact"),
            fn=get_job_run_artifact,
            annotations=create_tool_annotations(
                title="Get Job Run Artifact",
                read_only_hint=True,
                destructive_hint=False,
                idempotent_hint=True,
            ),
        ),
        ToolDefinition(
            description=get_prompt("admin_api/get_job_run_error"),
            fn=get_job_run_error,
            annotations=create_tool_annotations(
                title="Get Job Run Error",
                read_only_hint=True,
                destructive_hint=False,
                idempotent_hint=True,
            ),
        ),
    ]


def register_admin_api_tools(
    dbt_mcp: FastMCP,
    admin_config_provider: ConfigProvider[AdminApiConfig],
    exclude_tools: Sequence[ToolName] = [],
) -> None:
    """Register dbt Admin API tools."""
    admin_client = DbtAdminAPIClient(admin_config_provider)
    register_tools(
        dbt_mcp,
        create_admin_api_tool_definitions(admin_client, admin_config_provider),
        exclude_tools,
    )

```

--------------------------------------------------------------------------------
/tests/unit/lsp/test_local_lsp_connection_provider.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for LocalLSPConnectionProvider class."""

from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from dbt_mcp.lsp.lsp_binary_manager import LspBinaryInfo
from dbt_mcp.lsp.providers.local_lsp_connection_provider import (
    LocalLSPConnectionProvider,
)
from dbt_mcp.lsp.lsp_connection import SocketLSPConnection


@pytest.fixture
def lsp_binary_info(tmp_path) -> LspBinaryInfo:
    """Create a test LSP binary info."""
    binary_path = tmp_path / "dbt-lsp"
    binary_path.touch()
    return LspBinaryInfo(path=str(binary_path), version="1.0.0")


@pytest.fixture
def project_dir(tmp_path) -> str:
    """Create a test project directory."""
    project_dir = tmp_path / "project"
    project_dir.mkdir()
    return str(project_dir)


class TestLocalLSPConnectionProvider:
    """Test LocalLSPConnectionProvider class."""

    @pytest.mark.asyncio
    async def test_get_connection_creates_connection_on_first_call(
        self, lsp_binary_info: LspBinaryInfo, project_dir: str
    ) -> None:
        """Test that get_connection creates a new connection on first call."""
        provider = LocalLSPConnectionProvider(lsp_binary_info, project_dir)

        # Mock the SocketLSPConnection
        mock_connection = MagicMock(spec=SocketLSPConnection)
        mock_connection.start = AsyncMock()
        mock_connection.initialize = AsyncMock()

        with patch(
            "dbt_mcp.lsp.providers.local_lsp_connection_provider.SocketLSPConnection",
            return_value=mock_connection,
        ) as mock_conn_class:
            connection = await provider.get_connection()

            # Verify connection was created with correct arguments
            mock_conn_class.assert_called_once_with(
                binary_path=lsp_binary_info.path,
                args=[],
                cwd=project_dir,
            )

            # Verify connection lifecycle methods were called
            mock_connection.start.assert_called_once()
            mock_connection.initialize.assert_called_once()

            # Verify the connection is returned
            assert connection == mock_connection
            assert provider.lsp_connection == mock_connection

    @pytest.mark.asyncio
    async def test_get_connection_returns_existing_connection(
        self, lsp_binary_info: LspBinaryInfo, project_dir: str
    ) -> None:
        """Test that get_connection returns the same connection on subsequent calls."""
        provider = LocalLSPConnectionProvider(lsp_binary_info, project_dir)

        # Mock the connection
        mock_connection = MagicMock(spec=SocketLSPConnection)
        mock_connection.start = AsyncMock()
        mock_connection.initialize = AsyncMock()

        with patch(
            "dbt_mcp.lsp.providers.local_lsp_connection_provider.SocketLSPConnection",
            return_value=mock_connection,
        ) as mock_conn_class:
            # First call
            connection1 = await provider.get_connection()

            # Second call
            connection2 = await provider.get_connection()

            # Third call
            connection3 = await provider.get_connection()

            # Connection should only be created once
            mock_conn_class.assert_called_once()
            mock_connection.start.assert_called_once()
            mock_connection.initialize.assert_called_once()

            # All calls should return the same instance
            assert connection1 is connection2
            assert connection2 is connection3
            assert connection1 is mock_connection

    @pytest.mark.asyncio
    async def test_get_connection_handles_start_failure(
        self, lsp_binary_info: LspBinaryInfo, project_dir: str
    ) -> None:
        """Test that get_connection handles connection start failure."""
        provider = LocalLSPConnectionProvider(lsp_binary_info, project_dir)

        # Mock connection that fails to start
        mock_connection = MagicMock(spec=SocketLSPConnection)
        mock_connection.start = AsyncMock(side_effect=RuntimeError("Start failed"))
        mock_connection.initialize = AsyncMock()

        with patch(
            "dbt_mcp.lsp.providers.local_lsp_connection_provider.SocketLSPConnection",
            return_value=mock_connection,
        ):
            with pytest.raises(
                RuntimeError, match="Failed to establish LSP connection"
            ):
                await provider.get_connection()

            # Verify connection was cleaned up
            assert provider.lsp_connection is None

    @pytest.mark.asyncio
    async def test_get_connection_handles_initialize_failure(
        self, lsp_binary_info: LspBinaryInfo, project_dir: str
    ) -> None:
        """Test that get_connection handles connection initialize failure."""
        provider = LocalLSPConnectionProvider(lsp_binary_info, project_dir)

        # Mock connection that fails to initialize
        mock_connection = MagicMock(spec=SocketLSPConnection)
        mock_connection.start = AsyncMock()
        mock_connection.initialize = AsyncMock(
            side_effect=RuntimeError("Initialize failed")
        )

        with patch(
            "dbt_mcp.lsp.providers.local_lsp_connection_provider.SocketLSPConnection",
            return_value=mock_connection,
        ):
            with pytest.raises(
                RuntimeError, match="Failed to establish LSP connection"
            ):
                await provider.get_connection()

            # Verify connection was cleaned up
            assert provider.lsp_connection is None

    @pytest.mark.asyncio
    async def test_cleanup_connection_stops_connection(
        self, lsp_binary_info: LspBinaryInfo, project_dir: str
    ) -> None:
        """Test that cleanup_connection properly stops the connection."""
        provider = LocalLSPConnectionProvider(lsp_binary_info, project_dir)

        # Setup a connection
        mock_connection = MagicMock(spec=SocketLSPConnection)
        mock_connection.start = AsyncMock()
        mock_connection.initialize = AsyncMock()
        mock_connection.stop = AsyncMock()

        with patch(
            "dbt_mcp.lsp.providers.local_lsp_connection_provider.SocketLSPConnection",
            return_value=mock_connection,
        ):
            # Create connection
            await provider.get_connection()
            assert provider.lsp_connection is not None

            # Cleanup
            await provider.cleanup_connection()

            # Verify stop was called
            mock_connection.stop.assert_called_once()

            # Verify connection was set to None
            assert provider.lsp_connection is None

    @pytest.mark.asyncio
    async def test_cleanup_connection_handles_no_connection(
        self, lsp_binary_info: LspBinaryInfo, project_dir: str
    ) -> None:
        """Test that cleanup_connection handles no existing connection gracefully."""
        provider = LocalLSPConnectionProvider(lsp_binary_info, project_dir)

        # No connection has been created
        assert provider.lsp_connection is None

        # Should not raise
        await provider.cleanup_connection()

        assert provider.lsp_connection is None

    @pytest.mark.asyncio
    async def test_cleanup_connection_handles_stop_failure(
        self, lsp_binary_info: LspBinaryInfo, project_dir: str
    ) -> None:
        """Test that cleanup_connection handles stop failure gracefully."""
        provider = LocalLSPConnectionProvider(lsp_binary_info, project_dir)

        # Setup a connection that fails to stop
        mock_connection = MagicMock(spec=SocketLSPConnection)
        mock_connection.start = AsyncMock()
        mock_connection.initialize = AsyncMock()
        mock_connection.stop = AsyncMock(side_effect=RuntimeError("Stop failed"))

        with patch(
            "dbt_mcp.lsp.providers.local_lsp_connection_provider.SocketLSPConnection",
            return_value=mock_connection,
        ):
            # Create connection
            await provider.get_connection()

            # Cleanup should not raise
            await provider.cleanup_connection()

            # Verify stop was attempted
            mock_connection.stop.assert_called_once()

            # Connection should still be set to None
            assert provider.lsp_connection is None

    @pytest.mark.asyncio
    async def test_connection_lifecycle_integration(
        self, lsp_binary_info: LspBinaryInfo, project_dir: str
    ) -> None:
        """Test complete connection lifecycle (create, use, cleanup)."""
        provider = LocalLSPConnectionProvider(lsp_binary_info, project_dir)

        # Mock connection
        mock_connection = MagicMock(spec=SocketLSPConnection)
        mock_connection.start = AsyncMock()
        mock_connection.initialize = AsyncMock()
        mock_connection.stop = AsyncMock()

        with patch(
            "dbt_mcp.lsp.providers.local_lsp_connection_provider.SocketLSPConnection",
            return_value=mock_connection,
        ):
            # Create connection
            connection1 = await provider.get_connection()
            assert connection1 is mock_connection
            assert provider.lsp_connection is mock_connection

            # Get connection again (should return same)
            connection2 = await provider.get_connection()
            assert connection2 is connection1

            # Cleanup
            await provider.cleanup_connection()
            assert provider.lsp_connection is None

            # Get connection again (should create new)
            connection3 = await provider.get_connection()
            assert connection3 is mock_connection

            # Verify lifecycle methods were called correctly
            assert mock_connection.start.call_count == 2
            assert mock_connection.initialize.call_count == 2
            assert mock_connection.stop.call_count == 1

```

--------------------------------------------------------------------------------
/examples/aws_strands_agent/dbt_data_scientist/test_all_tools.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""Test script to verify all tools are accessible and working."""

import os
import sys
from dotenv import load_dotenv


def test_environment_setup():
    """Test that environment is properly configured."""
    print("🔧 Testing Environment Setup")
    print("-" * 30)
    
    load_dotenv()
    
    # Check required environment variables
    required_vars = {
        "DBT_MCP_URL": "dbt MCP server URL",
        "DBT_TOKEN": "dbt Cloud authentication token", 
        "DBT_USER_ID": "dbt Cloud user ID",
        "DBT_PROD_ENV_ID": "dbt Cloud production environment ID"
    }
    
    missing_vars = []
    for var, description in required_vars.items():
        value = os.environ.get(var)
        if value:
            if var == "DBT_TOKEN":
                print(f"  ✅ {var}: {'*' * len(value)}")
            else:
                print(f"  ✅ {var}: {value}")
        else:
            print(f"  ❌ {var}: NOT SET")
            missing_vars.append(var)
    
    # Check optional variables
    optional_vars = ["DBT_DEV_ENV_ID", "DBT_ACCOUNT_ID", "DBT_PROJECT_LOCATION", "DBT_EXECUTABLE"]
    for var in optional_vars:
        value = os.environ.get(var)
        if value:
            print(f"  ✅ {var}: {value}")
        else:
            print(f"  ⚠️  {var}: NOT SET (optional)")
    
    if missing_vars:
        print(f"\n❌ Missing required variables: {', '.join(missing_vars)}")
        return False
    
    print("  ✅ Environment setup complete!")
    return True

def test_tool_imports():
    """Test that all tools can be imported."""
    print("\n📦 Testing Tool Imports")
    print("-" * 30)
    
    try:
        from tools import dbt_compile, dbt_mcp_tool, dbt_model_analyzer_agent
        print("  ✅ All tools imported successfully")
        
        # Test that tools are callable
        if callable(dbt_compile):
            print("  ✅ dbt_compile is callable")
        else:
            print("  ❌ dbt_compile is not callable")
            return False
            
        if callable(dbt_mcp_tool):
            print("  ✅ dbt_mcp_tool is callable")
        else:
            print("  ❌ dbt_mcp_tool is not callable")
            return False
            
        if callable(dbt_model_analyzer_agent):
            print("  ✅ dbt_model_analyzer_agent is callable")
        else:
            print("  ❌ dbt_model_analyzer_agent is not callable")
            return False
        
        return True
        
    except ImportError as e:
        print(f"  ❌ Import failed: {e}")
        return False

def test_agent_initialization():
    """Test that the agent can be initialized with all tools."""
    print("\n🤖 Testing Agent Initialization")
    print("-" * 30)
    
    try:
        from agent import dbt_agent
        
        # Check that agent exists and has the expected attributes
        if hasattr(dbt_agent, 'tools'):
            tools = dbt_agent.tools
            if tools and len(tools) > 0:
                print(f"  ✅ Agent initialized with {len(tools)} tools")
                
                # List the tools
                tool_names = []
                for tool in tools:
                    if hasattr(tool, 'tool_name'):
                        tool_names.append(tool.tool_name)
                    else:
                        tool_names.append(str(tool))
                
                print(f"  📋 Available tools: {', '.join(tool_names)}")
                return True
            else:
                print("  ⚠️  Agent has no tools (this might be expected if tools are loaded dynamically)")
                # This is actually okay for MCP tools that are loaded dynamically
                return True
        else:
            print("  ⚠️  Agent doesn't have tools attribute (this might be expected)")
            # Check if agent has other expected attributes
            if hasattr(dbt_agent, 'system_prompt'):
                print("  ✅ Agent has system_prompt")
                return True
            else:
                print("  ❌ Agent missing expected attributes")
                return False
            
    except Exception as e:
        print(f"  ❌ Agent initialization failed: {e}")
        return False
    """Test that the agent can be initialized with all tools."""
    print("\n🤖 Testing Agent Initialization")
    print("-" * 30)
    
    try:
        from agent import dbt_agent
        
        # Check that agent has tools
        if hasattr(dbt_agent, 'tools') and dbt_agent.tools:
            print(f"  ✅ Agent initialized with {len(dbt_agent.tools)} tools")
            
            # List the tools
            tool_names = []
            for tool in tools:
                if hasattr(tool, 'tool_name'):
                    tool_names.append(tool.tool_name)
                else:
                    tool_names.append(str(tool))
            
            print(f"  📋 Available tools: {', '.join(tool_names)}")
            return True
        else:
            print("  ❌ Agent has no tools")
            return False
            
    except Exception as e:
        print(f"  ❌ Agent initialization failed: {e}")
        return False

def test_dbt_compile_tool():
    """Test the dbt compile tool."""
    print("\n🔨 Testing dbt_compile Tool")
    print("-" * 30)
    
    try:
        from tools import dbt_compile
        
        # Test with a simple query
        test_query = "test dbt compile functionality"
        print(f"  📝 Testing with query: '{test_query}'")
        
        result = dbt_compile(test_query)
        
        if result and len(result) > 0:
            print(f"  ✅ dbt_compile tool executed successfully")
            print(f"  📄 Result preview: {result[:100]}...")
            return True
        else:
            print("  ⚠️  dbt_compile returned empty result")
            return False
            
    except Exception as e:
        print(f"  ❌ dbt_compile tool failed: {e}")
        return False

def test_dbt_mcp_tool():
    """Test the dbt MCP tool."""
    print("\n🌐 Testing dbt_mcp_tool")
    print("-" * 30)
    
    try:
        from tools import dbt_mcp_tool
        
        # Test with a simple query
        test_query = "list tools"
        print(f"  📝 Testing with query: '{test_query}'")
        
        result = dbt_mcp_tool(test_query)
        
        if result and len(result) > 0:
            print(f"  ✅ dbt_mcp_tool executed successfully")
            print(f"  📄 Result preview: {result[:100]}...")
            return True
        else:
            print("  ⚠️  dbt_mcp_tool returned empty result")
            return False
            
    except Exception as e:
        print(f"  ❌ dbt_mcp_tool failed: {e}")
        return False

def test_dbt_model_analyzer_tool():
    """Test the dbt model analyzer tool."""
    print("\n📊 Testing dbt_model_analyzer_agent")
    print("-" * 30)
    
    try:
        from tools import dbt_model_analyzer_agent
        
        # Test with a simple query
        test_query = "analyze my dbt models"
        print(f"  📝 Testing with query: '{test_query}'")
        
        result = dbt_model_analyzer_agent(test_query)
        
        if result and len(result) > 0:
            print(f"  ✅ dbt_model_analyzer_agent executed successfully")
            print(f"  📄 Result preview: {result[:100]}...")
            return True
        else:
            print("  ⚠️  dbt_model_analyzer_agent returned empty result")
            return False
            
    except Exception as e:
        print(f"  ❌ dbt_model_analyzer_agent failed: {e}")
        return False

def test_agent_with_tools():
    """Test the agent with all tools integrated."""
    print("\n🎯 Testing Agent with All Tools")
    print("-" * 30)
    
    try:
        from agent import dbt_agent
        
        # Test with different types of queries
        test_queries = [
            "What tools are available?",
            "Help me with dbt compilation",
            "Analyze my data models"
        ]
        
        for i, query in enumerate(test_queries, 1):
            print(f"  📝 Test {i}: '{query}'")
            try:
                result = dbt_agent(query)
                if result and len(str(result)) > 0:
                    print(f"    ✅ Agent responded successfully")
                    print(f"    📄 Response preview: {str(result)[:80]}...")
                else:
                    print(f"    ⚠️  Agent returned empty response")
            except Exception as e:
                print(f"    ❌ Agent failed: {e}")
        
        return True
        
    except Exception as e:
        print(f"  ❌ Agent testing failed: {e}")
        return False

def main():
    """Run all tests."""
    print("🚀 Complete Tool and Agent Test Suite")
    print("=" * 50)
    
    tests = [
        ("Environment Setup", test_environment_setup),
        ("Tool Imports", test_tool_imports),
        ("Agent Initialization", test_agent_initialization),
        ("dbt_compile Tool", test_dbt_compile_tool),
        ("dbt_mcp_tool", test_dbt_mcp_tool),
        ("dbt_model_analyzer_agent", test_dbt_model_analyzer_tool),
        ("Agent with All Tools", test_agent_with_tools)
    ]
    
    passed = 0
    total = len(tests)
    
    for test_name, test_func in tests:
        try:
            if test_func():
                passed += 1
                print(f"✅ {test_name} - PASSED")
            else:
                print(f"❌ {test_name} - FAILED")
        except Exception as e:
            print(f"❌ {test_name} - ERROR: {e}")
        print()
    
    print("=" * 50)
    print(f"📊 Test Results: {passed}/{total} tests passed")
    
    if passed == total:
        print("🎉 All tests passed! Your agent and tools are working correctly.")
        print("\nYou can now run the agent:")
        print("  python dbt_data_scientist/agent.py")
    else:
        print("⚠️  Some tests failed. Please check the errors above.")
        print("Common issues:")
        print("  - Missing environment variables")
        print("  - MCP server not accessible")
        print("  - dbt project not found")
    
    return passed == total

if __name__ == "__main__":
    success = main()
    sys.exit(0 if success else 1)

```

--------------------------------------------------------------------------------
/CHANGELOG.md:
--------------------------------------------------------------------------------

```markdown
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html),
and is generated by [Changie](https://github.com/miniscruff/changie).


## v1.1.0 - 2025-11-03
### Enhancement or New Feature
* This adds the get all sources tool.
* add auto-disable and better validation of MCP settings.
A whole bunch of tests needed to change because of this
Created fixture for allowing the default behavior
to assume all required fields for settings are fully set

* Add list_saved_queries tool to the Semantic Layer, enabling discovery of predefined MetricFlow queries via GraphQL API. This allows AI agents to list and search saved queries with their associated metrics, dimensions, and filters.
### Under the Hood
* Abstract LSP client and LSP connection via providers.
* Add version number guidelines to contributing.md
* Make release version determination automatic based on changie changelog entries. The workflow now defaults to 'auto' mode which uses 'changie batch auto' to
### Bug Fix
* Minor update to the instruction for LSP tool

## v1.0.0 - 2025-10-20
### Enhancement or New Feature
* Incroduce support for fusion LSP
### Under the Hood
* Add support for Python debugger
* Update pyproject.toml including development status
* Add example for aws_strands_agent
### Bug Fix
* Exclude Python 3.14 for now as pyarrow hasn't released wheels yet

## v0.10.3 - 2025-10-08
### Under the Hood
* Improved retry logic and post project selection screen
* Avoid double counting in usage tracking proxied tools
* Categorizing ToolCallErrors

## v0.10.2 - 2025-10-08
### Enhancement or New Feature
* Improved oauth error handling
* Remove oauth env var feature flag. Enable oauth broadly.
### Under the Hood
* Improved logging for development
* Updating prompts to include examples to avoid bad parameter generation
* Remove DBT_HOST prefix
* Update usage tracking with new fields
* Write .user.yml if it does not exist
* Changed UsageTracker to a protocol

## v0.10.1 - 2025-10-02
### Bug Fix
* Fix get_job_run_error truncated log output

## v0.10.0 - 2025-10-01
### Enhancement or New Feature
* Add get_job_run_error to Admin API tools

## v0.9.1 - 2025-09-30
### Under the Hood
* Reorganize code and add ability to format the arrow table differently

## v0.9.0 - 2025-09-30
### Enhancement or New Feature
* Adding the dbt codegen toolset.
### Under the Hood
* Updates README with new tools
* Fix .user.yml error with Fusion

## v0.8.4 - 2025-09-29
### Enhancement or New Feature
* Allow doc files to skip changie requirements
### Under the Hood
* Upgrade @vitejs/plugin-react
* Add ruff lint config to enforce Python 3.9+ coding style
* Opt-out of usage tracking with standard dbt methods

## v0.8.3 - 2025-09-24
### Under the Hood
* Rename SemanticLayerConfig.service_token to SemanticLayerConfig.token
### Bug Fix
* Fix Error handling as per native MCP error spec

## v0.8.2 - 2025-09-23
### Enhancement or New Feature
* Use `dbt --help` to identify binary type
* Increase dbt CLI timeout default
### Under the Hood
* Implement SemanticLayerClientProvider
### Bug Fix
* Update how we identify CLIs

## v0.8.1 - 2025-09-22
### Under the Hood
* Create ConfigProvider ABC

## v0.8.0 - 2025-09-22
### Enhancement or New Feature
* Allow creating pre-releases
* Return compiled code in get_model_details
### Under the Hood
* Handle Claude Desktop running multiple MCP server instances
* Add docs for using the MCP server with google ADK and dbt-core
* Add search string to SL metadata queries
* Improve parameters in query_metrics examples
* Reduce token usage in `get_job_run_details` response by removing debug param and unnecessary logs
* Automatically refresh oauth token
* Improve dbt platform context mcp.yml parsing
* Add PR and issue templates
* Address claude desktop re-triggering oauth on exit
* Turning off caching for static files
### Bug Fix
* Add account id to dbt platform context

## v0.7.0 - 2025-09-09
### Enhancement or New Feature
* Add tools to retrieve exposure information from Disco API
### Under the Hood
* Expect string sub in oauth JWT
* Using sync endpoints for oauth FastAPI server
* Fix release pipeline

## v0.6.2 - 2025-09-08
### Enhancement or New Feature
* Adding the ability to return the config.meta attribute from list metrics to give the LLM more context
* Oauth initial implementation
* Fix #251 - Add flag for no color + ability to detect binary type
### Under the Hood
* Add docs for using the MCP server with Pydantic AI
* Don't run mypy on examples

## v0.6.1 - 2025-08-28
### Enhancement or New Feature
* Add support for --vars flag
* Allow headers in AdminApiConfig
### Under the Hood
* Remove redundant and outdated documentation

## v0.6.0 - 2025-08-22
### Under the Hood
* Update docs with new tools
* Using streamable http for SQL tools
* Correctly handle admin API host containing protocol prefix

## v0.5.0 - 2025-08-20
### Enhancement or New Feature
* Add support for --full-refresh flag
* Adds a new tool to get model health (last run, tests, source freshness) from discovery API
* Add operational/admin tools to interact with the dbt platform
### Under the Hood
* LangGraph create_react_agent example
* Make model_name optional for more discovery tools
* Update example with OpenAI to show tool calls
### Bug Fix
* Fix for timeout on Windows

## v0.4.2 - 2025-08-13
### Enhancement or New Feature
* Add default --limit to show tool
### Under the Hood
* Define toolsets
### Bug Fix
* Fix the prompt to ensure grain is passed even for non-time group by"

## v0.4.1 - 2025-08-08
### Under the Hood
* Upgrade dbt-sl-sdk

## v0.4.0 - 2025-08-08
### Enhancement or New Feature
* Tool policies 
* Added Semantic Layer tool to get compiled sql
### Under the Hood
* Fix JSON formatting in README
* Document dbt Copilot credits relationship
### Bug Fix
* Make model_name of get_model_details optional

## v0.3.0 - 2025-08-05
### Enhancement or New Feature
* Add ToolAnnotations
* Add alias field to GET_MODEL_DETAILS GraphQL query
### Under the Hood
* Test remote tool equality
* Fix initialization integration test
* Refactor README
* Rename Remote Tools to SQL Tools
* Document Remote MCP
* Improved Remote MCP instructions 
### Bug Fix
* Apply dbt_cli_timeout to all dbt commands

## v0.2.20 - 2025-07-25
### Enhancement or New Feature
* Allow for disabling CLI tools
### Under the Hood
* Update codeowners
* Improve DISABLE_TOOLS configuration
* Remote MCP example
* Add unit tests for env vars combinations
* Add instructions for Claude Code in README
* Add new example for OpenAI + HTTP Streamable MCP

## v0.2.19 - 2025-07-22
### Under the Hood
* Create list of tool names

## v0.2.18 - 2025-07-22
### Enhancement or New Feature
* Move env var parsing to pydantic_settings for better validation
### Under the Hood
* Add integration test for server initialization
### Bug Fix
* Fix SL validation error message when no misspellings are found

## v0.2.17 - 2025-07-18

## v0.2.16 - 2025-07-18
### Under the Hood
* Adding the ability to exclude certain tools when registering
* OpenAI responses example

## v0.2.15 - 2025-07-16
### Under the Hood
* Refactor sl tools for reusability
* Update VSCode instructions in README

## v0.2.14 - 2025-07-14
### Enhancement or New Feature
* Make dbt CLI command timeout configurable
### Bug Fix
* Allow passing entities in the group by

## v0.2.13 - 2025-07-11
### Under the Hood
* Decouple discovery tools from FastMCP

## v0.2.12 - 2025-07-09
### Bug Fix
* Catch every tool error and surface as string

## v0.2.11 - 2025-07-03
### Bug Fix
* fix order_by input

## v0.2.10 - 2025-07-03
### Enhancement or New Feature
* Upgrade MCP SDK

## v0.2.9 - 2025-07-02
### Enhancement or New Feature
* Decrease amount of data retrieved when listing models
### Under the Hood
* OpenAI conversational analytics example
* README updates
* Move Discover headers to config

## v0.2.8 - 2025-07-02
### Enhancement or New Feature
* Raise errors if no node is selected (can also be configured)
### Bug Fix
* Fix when people provide `DBT_PROJECT_DIR` as  a relative path
* Fix link in README

## v0.2.7 - 2025-06-30
### Under the Hood
* Timeout dbt list command
* Troubleshooting section in README on clients not finding uvx
* Update Discovery config for simpler usage
### Bug Fix
* Fixing bug when ordering SL query by a metric

## v0.2.6 - 2025-06-16
### Under the Hood
* Instructing the LLM to more likely use a selector
* Instruct LLM to add limit as an argument instead of SQL
* Fix use of limit in dbt show
* Indicate type checking

## v0.2.5 - 2025-06-06
### Under the Hood
* Small improvements to improve logging and code organization.
* Move `--selector` to the code instead of the prompt
* Cursor deeplink setup
* Fix Cursor deeplinks
* Fix Cursor env var mess up
### Bug Fix
* Fix Discovery API config enablement

## v0.2.4 - 2025-06-03
### Bug Fix
* Add the missing selector argument when running commands

## v0.2.3 - 2025-06-02
### Under the Hood
* Fix release action to fetch tags

## v0.2.2 - 2025-06-02
### Under the Hood
* Update README to run the MCP server with uvx
* Logging usage events
* Improve remote tools error logging
* Move create-release-tag to release Action
* Update release process documentation
### Bug Fix
* Fix typo in GH action to create release

## v0.2.1 - 2025-05-28
### Under the Hood
* Remove hatch from tag action
* Manually triggering release

## v0.2.0 - 2025-05-28
### Enhancement or New Feature
* Using `--quiet` flag to reduce context saturation of coding assistants
* Add a tool `get_model_children`
* Added optional uniqueId parameter to model lookup methods for more precise model identification
* Enable remote tools in production
* Add selector for dbt commands
* Set pre-changie value to 0.1.13
### Under the Hood
* Require changelog entries for each PR
* Log Python version in install script
* Update license to full Apache 2.0 text
* Roll back installation script and instructions
* Re-enable tests in CI
* Refactor config for modularity
* Document remote tools
* Usage tracking scaffolding
* Update docs to clarify service token permissions required
* Increase remote tools timeout
* Update release process for new versions
* Point to the correct diagram in README
* Install hatch in release process
* Remove hatch from release process
### Bug Fix
* Fix diagram according to feature set

## v0.1.3 and before
* Initial releases before using changie
```

--------------------------------------------------------------------------------
/tests/unit/discovery/test_sources_fetcher.py:
--------------------------------------------------------------------------------

```python
from unittest.mock import patch

import pytest

from dbt_mcp.discovery.client import SourcesFetcher
from dbt_mcp.errors import GraphQLError


@pytest.fixture
def sources_fetcher(mock_api_client):
    return SourcesFetcher(api_client=mock_api_client)


async def test_fetch_sources_single_page(sources_fetcher, mock_api_client):
    mock_response = {
        "data": {
            "environment": {
                "applied": {
                    "sources": {
                        "pageInfo": {"hasNextPage": False, "endCursor": "cursor_end"},
                        "edges": [
                            {
                                "node": {
                                    "name": "customers",
                                    "uniqueId": "source.test_project.raw_data.customers",
                                    "description": "Customer data from external system",
                                    "sourceName": "raw_data",
                                    "resourceType": "source",
                                    "freshness": {
                                        "maxLoadedAt": "2024-01-15T10:30:00Z",
                                        "maxLoadedAtTimeAgoInS": 3600,
                                        "freshnessStatus": "pass",
                                    },
                                }
                            },
                            {
                                "node": {
                                    "name": "orders",
                                    "uniqueId": "source.test_project.raw_data.orders",
                                    "description": "Order data from external system",
                                    "sourceName": "raw_data",
                                    "resourceType": "source",
                                    "freshness": {
                                        "maxLoadedAt": "2024-01-15T11:00:00Z",
                                        "maxLoadedAtTimeAgoInS": 1800,
                                        "freshnessStatus": "warn",
                                    },
                                }
                            },
                        ],
                    }
                }
            }
        }
    }

    # Set up the mock to return our response
    mock_api_client.execute_query.return_value = mock_response

    # Execute the fetch
    result = await sources_fetcher.fetch_sources()

    # Verify the API was called correctly
    mock_api_client.execute_query.assert_called_once()
    call_args = mock_api_client.execute_query.call_args

    # Check that the GraphQL query contains expected elements
    query = call_args[0][0]
    assert "GetSources" in query
    assert "environment" in query
    assert "applied" in query
    assert "sources" in query

    # Check variables
    variables = call_args[0][1]
    assert variables["environmentId"] == 123
    assert variables["first"] == 100  # PAGE_SIZE
    assert variables["sourcesFilter"] == {}

    # Verify the result
    assert len(result) == 2
    assert result[0]["name"] == "customers"
    assert result[0]["sourceName"] == "raw_data"
    assert result[0]["resourceType"] == "source"
    assert result[0]["freshness"]["freshnessStatus"] == "pass"
    assert result[1]["name"] == "orders"
    assert result[1]["freshness"]["freshnessStatus"] == "warn"


@pytest.mark.parametrize(
    "filter_params,expected_filter",
    [
        # Single filter parameters
        ({"source_names": ["external_api"]}, {"sourceNames": ["external_api"]}),
        (
            {"unique_ids": ["source.test_project.raw_data.customers"]},
            {"uniqueIds": ["source.test_project.raw_data.customers"]},
        ),
        # Combined filters
        (
            {
                "source_names": ["core"],
                "unique_ids": ["source.test_project.core.users"],
            },
            {"sourceNames": ["core"], "uniqueIds": ["source.test_project.core.users"]},
        ),
    ],
)
async def test_fetch_sources_with_filters(
    sources_fetcher, mock_api_client, filter_params, expected_filter
):
    """Test that various filter parameters are correctly converted to GraphQL filter format."""
    mock_response = {
        "data": {
            "environment": {
                "applied": {
                    "sources": {
                        "pageInfo": {"hasNextPage": False, "endCursor": "cursor_end"},
                        "edges": [
                            {
                                "node": {
                                    "name": "customers",
                                    "uniqueId": "source.test_project.external_api.customers",
                                    "description": "Customer data from API",
                                    "sourceName": "external_api",
                                    "resourceType": "source",
                                    "freshness": {
                                        "maxLoadedAt": "2024-01-15T10:30:00Z",
                                        "maxLoadedAtTimeAgoInS": 3600,
                                        "freshnessStatus": "pass",
                                    },
                                }
                            }
                        ],
                    }
                }
            }
        }
    }

    mock_api_client.execute_query.return_value = mock_response

    # Execute with filters
    result = await sources_fetcher.fetch_sources(**filter_params)

    # Verify the filter was passed correctly to the GraphQL query
    call_args = mock_api_client.execute_query.call_args
    variables = call_args[0][1]
    assert variables["sourcesFilter"] == expected_filter

    # Verify the result structure
    assert isinstance(result, list)
    assert len(result) == 1


async def test_fetch_sources_empty_response(sources_fetcher, mock_api_client):
    mock_response = {
        "data": {
            "environment": {
                "applied": {
                    "sources": {
                        "pageInfo": {"hasNextPage": False, "endCursor": None},
                        "edges": [],
                    }
                }
            }
        }
    }

    mock_api_client.execute_query.return_value = mock_response

    result = await sources_fetcher.fetch_sources()

    assert result == []


async def test_fetch_sources_pagination(sources_fetcher, mock_api_client):
    # First page response
    first_page_response = {
        "data": {
            "environment": {
                "applied": {
                    "sources": {
                        "pageInfo": {"hasNextPage": True, "endCursor": "cursor_page_1"},
                        "edges": [
                            {
                                "node": {
                                    "name": "customers",
                                    "uniqueId": "source.test_project.raw_data.customers",
                                    "description": "Customer data",
                                    "sourceName": "raw_data",
                                    "resourceType": "source",
                                    "freshness": {
                                        "maxLoadedAt": "2024-01-15T10:30:00Z",
                                        "maxLoadedAtTimeAgoInS": 3600,
                                        "freshnessStatus": "pass",
                                    },
                                }
                            }
                        ],
                    }
                }
            }
        }
    }

    # Second page response (same cursor to stop pagination)
    second_page_response = {
        "data": {
            "environment": {
                "applied": {
                    "sources": {
                        "pageInfo": {
                            "hasNextPage": False,
                            "endCursor": "cursor_page_1",
                        },  # hasNextPage False stops pagination
                        "edges": [
                            {
                                "node": {
                                    "name": "orders",
                                    "uniqueId": "source.test_project.raw_data.orders",
                                    "description": "Order data",
                                    "sourceName": "raw_data",
                                    "resourceType": "source",
                                    "freshness": {
                                        "maxLoadedAt": "2024-01-15T11:00:00Z",
                                        "maxLoadedAtTimeAgoInS": 1800,
                                        "freshnessStatus": "warn",
                                    },
                                }
                            }
                        ],
                    }
                }
            }
        }
    }

    # Set up mock to return different responses for each call
    mock_api_client.execute_query.side_effect = [
        first_page_response,
        second_page_response,
    ]

    result = await sources_fetcher.fetch_sources()

    # Should have called twice due to pagination
    assert mock_api_client.execute_query.call_count == 2

    # Check that the second call includes the cursor from the first response
    first_call_args = mock_api_client.execute_query.call_args_list[0]
    second_call_args = mock_api_client.execute_query.call_args_list[1]

    # First call should have empty after cursor
    assert first_call_args[0][1]["after"] == ""

    # Second call should have the cursor from first response
    assert second_call_args[0][1]["after"] == "cursor_page_1"

    # Should have both results
    assert len(result) == 2
    assert result[0]["name"] == "customers"
    assert result[1]["name"] == "orders"


@patch("dbt_mcp.discovery.client.raise_gql_error")
async def test_fetch_sources_graphql_error_handling(
    mock_raise_gql_error, sources_fetcher, mock_api_client
):
    mock_response = {
        "data": {
            "environment": {
                "applied": {
                    "sources": {
                        "pageInfo": {"hasNextPage": False, "endCursor": None},
                        "edges": [],
                    }
                }
            }
        }
    }

    # Configure the mock to raise GraphQLError when called
    mock_raise_gql_error.side_effect = GraphQLError("Test GraphQL error")

    mock_api_client.execute_query.return_value = mock_response

    # Verify that fetch_sources raises GraphQLError
    with pytest.raises(GraphQLError, match="Test GraphQL error"):
        await sources_fetcher.fetch_sources()

    # Verify that error handling function was called
    mock_raise_gql_error.assert_called_with(mock_response)


async def test_get_environment_id(sources_fetcher):
    environment_id = await sources_fetcher.get_environment_id()
    assert environment_id == 123

```

--------------------------------------------------------------------------------
/src/dbt_mcp/oauth/fastapi_app.py:
--------------------------------------------------------------------------------

```python
import logging
from typing import cast
from urllib.parse import quote

import requests
from authlib.integrations.requests_client import OAuth2Session
from fastapi import FastAPI, Request
from fastapi.responses import RedirectResponse
from fastapi.staticfiles import StaticFiles
from starlette.types import Receive, Scope, Send
from uvicorn import Server

from dbt_mcp.oauth.context_manager import DbtPlatformContextManager
from dbt_mcp.oauth.dbt_platform import (
    DbtPlatformAccount,
    DbtPlatformContext,
    DbtPlatformEnvironment,
    DbtPlatformEnvironmentResponse,
    DbtPlatformProject,
    SelectedProjectRequest,
    dbt_platform_context_from_token_response,
)
from dbt_mcp.oauth.token import (
    DecodedAccessToken,
)

logger = logging.getLogger(__name__)


def error_redirect(error_code: str, description: str) -> RedirectResponse:
    return RedirectResponse(
        url=f"/index.html#status=error&error={quote(error_code)}&error_description={quote(description)}",
        status_code=302,
    )


class NoCacheStaticFiles(StaticFiles):
    """
    Custom StaticFiles class that adds cache-control headers to prevent caching.
    """

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        # Create a wrapper for the send function to modify headers
        async def send_wrapper(message):
            if message["type"] == "http.response.start":
                # Add no-cache headers to prevent client-side caching
                headers = dict(message.get("headers", []))
                headers[b"cache-control"] = b"no-cache, no-store, must-revalidate"
                headers[b"pragma"] = b"no-cache"
                headers[b"expires"] = b"0"
                message["headers"] = list(headers.items())
            await send(message)

        # Call the parent class with our modified send function
        await super().__call__(scope, receive, send_wrapper)


def _get_all_accounts(
    *,
    dbt_platform_url: str,
    headers: dict[str, str],
) -> list[DbtPlatformAccount]:
    accounts_response = requests.get(
        url=f"{dbt_platform_url}/api/v3/accounts/",
        headers=headers,
    )
    accounts_response.raise_for_status()
    return [
        DbtPlatformAccount(**account) for account in accounts_response.json()["data"]
    ]


def _get_all_projects_for_account(
    *,
    dbt_platform_url: str,
    account: DbtPlatformAccount,
    headers: dict[str, str],
    page_size: int = 100,
) -> list[DbtPlatformProject]:
    """Fetch all projects for an account using offset/page_size pagination."""
    offset = 0
    projects: list[DbtPlatformProject] = []
    while True:
        projects_response = requests.get(
            f"{dbt_platform_url}/api/v3/accounts/{account.id}/projects/?state=1&offset={offset}&limit={page_size}",
            headers=headers,
        )
        projects_response.raise_for_status()
        page = projects_response.json()["data"]
        projects.extend(
            DbtPlatformProject(**project, account_name=account.name) for project in page
        )
        if len(page) < page_size:
            break
        offset += page_size
    return projects


def _get_all_environments_for_project(
    *,
    dbt_platform_url: str,
    account_id: int,
    project_id: int,
    headers: dict[str, str],
    page_size: int = 100,
) -> list[DbtPlatformEnvironmentResponse]:
    """Fetch all environments for a project using offset/page_size pagination."""
    offset = 0
    environments: list[DbtPlatformEnvironmentResponse] = []
    while True:
        environments_response = requests.get(
            f"{dbt_platform_url}/api/v3/accounts/{account_id}/projects/{project_id}/environments/?state=1&offset={offset}&limit={page_size}",
            headers=headers,
        )
        environments_response.raise_for_status()
        page = environments_response.json()["data"]
        environments.extend(
            DbtPlatformEnvironmentResponse(**environment) for environment in page
        )
        if len(page) < page_size:
            break
        offset += page_size
    return environments


def create_app(
    *,
    oauth_client: OAuth2Session,
    state_to_verifier: dict[str, str],
    dbt_platform_url: str,
    static_dir: str,
    dbt_platform_context_manager: DbtPlatformContextManager,
) -> FastAPI:
    app = FastAPI()

    app.state.decoded_access_token = cast(DecodedAccessToken | None, None)
    app.state.server_ref = cast(Server | None, None)
    app.state.dbt_platform_context = cast(DbtPlatformContext | None, None)

    @app.get("/")
    def oauth_callback(request: Request) -> RedirectResponse:
        logger.info("OAuth callback received")
        # Only handle OAuth callback when provider returns with code or error.
        params = request.query_params
        if "error" in params or "error_description" in params:
            error_code = params.get("error", "unknown_error")
            error_desc = params.get("error_description", "An error occurred")
            return error_redirect(error_code, error_desc)
        if "code" not in params:
            return RedirectResponse(url="/index.html", status_code=302)
        state = params.get("state")
        if not state:
            logger.error("Missing state in OAuth callback")
            return error_redirect(
                "missing_state", "State parameter missing in OAuth callback"
            )
        try:
            code_verifier = state_to_verifier.pop(state, None)
            if not code_verifier:
                logger.error("No code_verifier found for provided state")
                return error_redirect(
                    "invalid_state", "Invalid or expired state parameter"
                )
            logger.info("Fetching initial access token")
            # Fetch the initial access token
            token_response = oauth_client.fetch_token(
                url=f"{dbt_platform_url}/oauth/token",
                authorization_response=str(request.url),
                code_verifier=code_verifier,
            )
            dbt_platform_context = dbt_platform_context_from_token_response(
                token_response, dbt_platform_url
            )
            dbt_platform_context_manager.write_context_to_file(dbt_platform_context)
            assert dbt_platform_context.decoded_access_token
            app.state.decoded_access_token = dbt_platform_context.decoded_access_token
            app.state.dbt_platform_context = dbt_platform_context
            return RedirectResponse(
                url="/index.html#status=success",
                status_code=302,
            )
        except Exception as e:
            logger.exception("OAuth callback failed")
            default_msg = "An unexpected error occurred during authentication"
            error_message = str(e) if str(e) else default_msg
            return error_redirect("oauth_failed", error_message)

    @app.post("/shutdown")
    def shutdown_server() -> dict[str, bool]:
        logger.info("Shutdown server received")
        server = app.state.server_ref
        if server is not None:
            server.should_exit = True
        return {"ok": True}

    @app.get("/projects")
    def projects() -> list[DbtPlatformProject]:
        if app.state.decoded_access_token is None:
            raise RuntimeError("Access token missing; OAuth flow not completed")
        access_token = app.state.decoded_access_token.access_token_response.access_token
        headers = {
            "Accept": "application/json",
            "Authorization": f"Bearer {access_token}",
        }
        accounts = _get_all_accounts(
            dbt_platform_url=dbt_platform_url,
            headers=headers,
        )
        projects: list[DbtPlatformProject] = []
        for account in [a for a in accounts if a.state == 1 and not a.locked]:
            projects.extend(
                _get_all_projects_for_account(
                    dbt_platform_url=dbt_platform_url,
                    account=account,
                    headers=headers,
                )
            )
        return projects

    @app.get("/dbt_platform_context")
    def get_dbt_platform_context() -> DbtPlatformContext:
        logger.info("Selected project received")
        return dbt_platform_context_manager.read_context() or DbtPlatformContext()

    @app.post("/selected_project")
    def set_selected_project(
        selected_project_request: SelectedProjectRequest,
    ) -> DbtPlatformContext:
        logger.info("Selected project received")
        if app.state.decoded_access_token is None:
            raise RuntimeError("Access token missing; OAuth flow not completed")
        access_token = app.state.decoded_access_token.access_token_response.access_token
        headers = {
            "Accept": "application/json",
            "Authorization": f"Bearer {access_token}",
        }
        accounts = _get_all_accounts(
            dbt_platform_url=dbt_platform_url,
            headers=headers,
        )
        account = next(
            (a for a in accounts if a.id == selected_project_request.account_id), None
        )
        if account is None:
            raise ValueError(f"Account {selected_project_request.account_id} not found")
        environments = _get_all_environments_for_project(
            dbt_platform_url=dbt_platform_url,
            account_id=selected_project_request.account_id,
            project_id=selected_project_request.project_id,
            headers=headers,
            page_size=100,
        )
        prod_environment = None
        dev_environment = None
        for environment in environments:
            if (
                environment.deployment_type
                and environment.deployment_type.lower() == "production"
            ):
                prod_environment = DbtPlatformEnvironment(
                    id=environment.id,
                    name=environment.name,
                    deployment_type=environment.deployment_type,
                )
            elif (
                environment.deployment_type
                and environment.deployment_type.lower() == "development"
            ):
                dev_environment = DbtPlatformEnvironment(
                    id=environment.id,
                    name=environment.name,
                    deployment_type=environment.deployment_type,
                )
        dbt_platform_context = dbt_platform_context_manager.update_context(
            new_dbt_platform_context=DbtPlatformContext(
                decoded_access_token=app.state.decoded_access_token,
                dev_environment=dev_environment,
                prod_environment=prod_environment,
                host_prefix=account.host_prefix,
                account_id=account.id,
            ),
        )
        app.state.dbt_platform_context = dbt_platform_context
        return dbt_platform_context

    app.mount(
        path="/",
        app=NoCacheStaticFiles(directory=static_dir, html=True),
    )

    return app

```

--------------------------------------------------------------------------------
/tests/unit/dbt_codegen/test_tools.py:
--------------------------------------------------------------------------------

```python
import json
import subprocess

import pytest
from pytest import MonkeyPatch

from dbt_mcp.dbt_codegen.tools import register_dbt_codegen_tools
from tests.mocks.config import mock_dbt_codegen_config


@pytest.fixture
def mock_process():
    class MockProcess:
        def __init__(self, returncode=0, output="command output"):
            self.returncode = returncode
            self._output = output

        def communicate(self, timeout=None):
            return self._output, None

    return MockProcess


@pytest.fixture
def mock_fastmcp():
    class MockFastMCP:
        def __init__(self):
            self.tools = {}

        def tool(self, **kwargs):
            def decorator(func):
                self.tools[func.__name__] = func
                return func

            return decorator

    fastmcp = MockFastMCP()
    return fastmcp, fastmcp.tools


def test_generate_source_basic_schema(
    monkeypatch: MonkeyPatch, mock_process, mock_fastmcp
):
    """Test generate_source with just schema_name parameter."""
    mock_calls = []

    def mock_popen(args, **kwargs):
        mock_calls.append(args)
        return mock_process()

    # Patch subprocess BEFORE registering tools
    monkeypatch.setattr("subprocess.Popen", mock_popen)

    # Now register tools with the mock in place
    fastmcp, _ = mock_fastmcp
    register_dbt_codegen_tools(fastmcp, mock_dbt_codegen_config)
    generate_source_tool = fastmcp.tools["generate_source"]

    # Call with just schema_name (provide all required args explicitly)
    generate_source_tool(
        schema_name="raw_data",
        database_name=None,
        table_names=None,
        generate_columns=False,
        include_descriptions=False,
    )

    # Verify the command was called correctly
    assert mock_calls
    args_list = mock_calls[0]

    # Check basic command structure
    assert args_list[0] == "/path/to/dbt"
    assert "--no-use-colors" in args_list
    assert "run-operation" in args_list
    assert "--quiet" in args_list
    assert "generate_source" in args_list

    # Check that args were passed correctly
    assert "--args" in args_list
    args_index = args_list.index("--args")
    args_json = json.loads(args_list[args_index + 1])
    assert args_json["schema_name"] == "raw_data"


def test_generate_source_with_all_parameters(
    monkeypatch: MonkeyPatch, mock_process, mock_fastmcp
):
    """Test generate_source with all parameters."""
    mock_calls = []

    def mock_popen(args, **kwargs):
        mock_calls.append(args)
        return mock_process()

    monkeypatch.setattr("subprocess.Popen", mock_popen)

    fastmcp, _ = mock_fastmcp
    register_dbt_codegen_tools(fastmcp, mock_dbt_codegen_config)
    generate_source_tool = fastmcp.tools["generate_source"]

    # Call with all parameters
    generate_source_tool(
        schema_name="raw_data",
        database_name="analytics",
        table_names=["users", "orders"],
        generate_columns=True,
        include_descriptions=True,
    )

    # Verify the args were passed correctly
    assert mock_calls
    args_list = mock_calls[0]
    args_index = args_list.index("--args")
    args_json = json.loads(args_list[args_index + 1])

    assert args_json["schema_name"] == "raw_data"
    assert args_json["database_name"] == "analytics"
    assert args_json["table_names"] == ["users", "orders"]
    assert args_json["generate_columns"] is True
    assert args_json["include_descriptions"] is True


def test_generate_model_yaml(monkeypatch: MonkeyPatch, mock_process, mock_fastmcp):
    """Test generate_model_yaml function."""
    mock_calls = []

    def mock_popen(args, **kwargs):
        mock_calls.append(args)
        return mock_process()

    monkeypatch.setattr("subprocess.Popen", mock_popen)

    fastmcp, _ = mock_fastmcp
    register_dbt_codegen_tools(fastmcp, mock_dbt_codegen_config)
    generate_model_yaml_tool = fastmcp.tools["generate_model_yaml"]

    # Call the tool
    generate_model_yaml_tool(
        model_names=["stg_users", "stg_orders"],
        upstream_descriptions=True,
        include_data_types=False,
    )

    # Verify the command
    assert mock_calls
    args_list = mock_calls[0]
    assert "generate_model_yaml" in args_list

    args_index = args_list.index("--args")
    args_json = json.loads(args_list[args_index + 1])
    assert args_json["model_names"] == ["stg_users", "stg_orders"]
    assert args_json["upstream_descriptions"] is True
    assert args_json["include_data_types"] is False


def test_generate_staging_model(monkeypatch: MonkeyPatch, mock_process, mock_fastmcp):
    """Test generate_staging_model function."""
    mock_calls = []

    def mock_popen(args, **kwargs):
        mock_calls.append(args)
        return mock_process()

    monkeypatch.setattr("subprocess.Popen", mock_popen)

    fastmcp, _ = mock_fastmcp
    register_dbt_codegen_tools(fastmcp, mock_dbt_codegen_config)
    generate_staging_model_tool = fastmcp.tools["generate_staging_model"]

    # Call the tool
    generate_staging_model_tool(
        source_name="raw_data",
        table_name="users",
        leading_commas=True,
        case_sensitive_cols=False,
        materialized="view",
    )

    # Verify the command
    assert mock_calls
    args_list = mock_calls[0]
    # Note: Still calls the underlying dbt-codegen macro generate_base_model
    assert "generate_base_model" in args_list

    args_index = args_list.index("--args")
    args_json = json.loads(args_list[args_index + 1])
    assert args_json["source_name"] == "raw_data"
    assert args_json["table_name"] == "users"
    assert args_json["leading_commas"] is True
    assert args_json["case_sensitive_cols"] is False
    assert args_json["materialized"] == "view"


def test_codegen_error_handling_missing_package(monkeypatch: MonkeyPatch, mock_fastmcp):
    """Test error handling when dbt-codegen package is not installed."""
    mock_calls = []

    class MockProcessWithError:
        def __init__(self):
            self.returncode = 1

        def communicate(self, timeout=None):
            return "dbt found 1 resource of type macro", None

    def mock_popen(args, **kwargs):
        mock_calls.append(args)
        return MockProcessWithError()

    monkeypatch.setattr("subprocess.Popen", mock_popen)

    fastmcp, _ = mock_fastmcp
    register_dbt_codegen_tools(fastmcp, mock_dbt_codegen_config)
    generate_source_tool = fastmcp.tools["generate_source"]

    # Call should return error message about missing package
    result = generate_source_tool(
        schema_name="test_schema",
        database_name=None,
        table_names=None,
        generate_columns=False,
        include_descriptions=False,
    )

    assert "dbt-codegen package may not be installed" in result
    assert "Run 'dbt deps'" in result


def test_codegen_error_handling_general_error(monkeypatch: MonkeyPatch, mock_fastmcp):
    """Test general error handling."""
    mock_calls = []

    class MockProcessWithError:
        def __init__(self):
            self.returncode = 1

        def communicate(self, timeout=None):
            return "Some other error occurred", None

    def mock_popen(args, **kwargs):
        mock_calls.append(args)
        return MockProcessWithError()

    monkeypatch.setattr("subprocess.Popen", mock_popen)

    fastmcp, _ = mock_fastmcp
    register_dbt_codegen_tools(fastmcp, mock_dbt_codegen_config)
    generate_source_tool = fastmcp.tools["generate_source"]

    # Call should return the error
    result = generate_source_tool(
        schema_name="test_schema",
        database_name=None,
        table_names=None,
        generate_columns=False,
        include_descriptions=False,
    )

    assert "Error running dbt-codegen macro" in result
    assert "Some other error occurred" in result


def test_codegen_timeout_handling(monkeypatch: MonkeyPatch, mock_fastmcp):
    """Test timeout handling for long-running operations."""

    class MockProcessWithTimeout:
        def communicate(self, timeout=None):
            raise subprocess.TimeoutExpired(cmd=["dbt", "run-operation"], timeout=10)

    def mock_popen(*args, **kwargs):
        return MockProcessWithTimeout()

    monkeypatch.setattr("subprocess.Popen", mock_popen)

    fastmcp, _ = mock_fastmcp
    register_dbt_codegen_tools(fastmcp, mock_dbt_codegen_config)
    generate_source_tool = fastmcp.tools["generate_source"]

    # Test timeout case
    result = generate_source_tool(
        schema_name="large_schema",
        database_name=None,
        table_names=None,
        generate_columns=False,
        include_descriptions=False,
    )
    assert "Timeout: dbt-codegen operation took longer than" in result
    assert "10 seconds" in result


def test_quiet_flag_placement(monkeypatch: MonkeyPatch, mock_process, mock_fastmcp):
    """Test that --quiet flag is placed correctly in the command."""
    mock_calls = []

    def mock_popen(args, **kwargs):
        mock_calls.append(args)
        return mock_process()

    monkeypatch.setattr("subprocess.Popen", mock_popen)

    fastmcp, _ = mock_fastmcp
    register_dbt_codegen_tools(fastmcp, mock_dbt_codegen_config)
    generate_source_tool = fastmcp.tools["generate_source"]

    # Call the tool
    generate_source_tool(
        schema_name="test",
        database_name=None,
        table_names=None,
        generate_columns=False,
        include_descriptions=False,
    )

    # Verify --quiet is placed after run-operation
    assert mock_calls
    args_list = mock_calls[0]

    run_op_index = args_list.index("run-operation")
    quiet_index = args_list.index("--quiet")

    # --quiet should come right after run-operation
    assert quiet_index == run_op_index + 1


def test_absolute_path_handling(monkeypatch: MonkeyPatch, mock_process, mock_fastmcp):
    """Test that absolute paths are handled correctly."""
    mock_calls = []
    captured_kwargs = {}

    def mock_popen(args, **kwargs):
        mock_calls.append(args)
        captured_kwargs.update(kwargs)
        return mock_process()

    monkeypatch.setattr("subprocess.Popen", mock_popen)

    fastmcp, _ = mock_fastmcp
    register_dbt_codegen_tools(fastmcp, mock_dbt_codegen_config)
    generate_source_tool = fastmcp.tools["generate_source"]

    # Call the tool (mock config has /test/project which is absolute)
    generate_source_tool(
        schema_name="test",
        database_name=None,
        table_names=None,
        generate_columns=False,
        include_descriptions=False,
    )

    # Verify cwd was set for absolute path
    assert "cwd" in captured_kwargs
    assert captured_kwargs["cwd"] == "/test/project"


def test_all_tools_registered(mock_fastmcp):
    """Test that all expected tools are registered."""
    fastmcp, _ = mock_fastmcp
    register_dbt_codegen_tools(fastmcp, mock_dbt_codegen_config)
    tools = fastmcp.tools

    expected_tools = [
        "generate_source",
        "generate_model_yaml",
        "generate_staging_model",
    ]

    for tool_name in expected_tools:
        assert tool_name in tools, f"Tool {tool_name} not registered"

```

--------------------------------------------------------------------------------
/tests/integration/discovery/test_discovery.py:
--------------------------------------------------------------------------------

```python
import os

import pytest

from dbt_mcp.config.config_providers import DefaultDiscoveryConfigProvider
from dbt_mcp.config.settings import CredentialsProvider, DbtMcpSettings
from dbt_mcp.discovery.client import (
    ExposuresFetcher,
    MetadataAPIClient,
    ModelFilter,
    ModelsFetcher,
    SourcesFetcher,
)


@pytest.fixture
def api_client() -> MetadataAPIClient:
    # Set up environment variables needed by DbtMcpSettings
    host = os.getenv("DBT_HOST")
    token = os.getenv("DBT_TOKEN")
    prod_env_id = os.getenv("DBT_PROD_ENV_ID")

    if not host or not token or not prod_env_id:
        raise ValueError(
            "DBT_HOST, DBT_TOKEN, and DBT_PROD_ENV_ID environment variables are required"
        )

    # Create settings and credentials provider
    # DbtMcpSettings will automatically pick up from environment variables
    settings = DbtMcpSettings()  # type: ignore
    credentials_provider = CredentialsProvider(settings)
    config_provider = DefaultDiscoveryConfigProvider(credentials_provider)

    return MetadataAPIClient(config_provider)


@pytest.fixture
def models_fetcher(api_client: MetadataAPIClient) -> ModelsFetcher:
    return ModelsFetcher(api_client)


@pytest.fixture
def exposures_fetcher(api_client: MetadataAPIClient) -> ExposuresFetcher:
    return ExposuresFetcher(api_client)


@pytest.fixture
def sources_fetcher(api_client: MetadataAPIClient) -> SourcesFetcher:
    return SourcesFetcher(api_client)


@pytest.mark.asyncio
async def test_fetch_models(models_fetcher: ModelsFetcher):
    results = await models_fetcher.fetch_models()

    # Basic validation of the response
    assert isinstance(results, list)
    assert len(results) > 0

    # Validate structure of returned models
    for model in results:
        assert "name" in model
        assert "compiledCode" in model
        assert isinstance(model["name"], str)

        # If catalog exists, validate its structure
        if model.get("catalog"):
            assert isinstance(model["catalog"], dict)
            if "columns" in model["catalog"]:
                for column in model["catalog"]["columns"]:
                    assert "name" in column
                    assert "type" in column


@pytest.mark.asyncio
async def test_fetch_models_with_filter(models_fetcher: ModelsFetcher):
    # model_filter: ModelFilter = {"access": "protected"}
    model_filter: ModelFilter = {"modelingLayer": "marts"}

    # Fetch filtered results
    filtered_results = await models_fetcher.fetch_models(model_filter=model_filter)

    # Validate filtered results
    assert len(filtered_results) > 0


@pytest.mark.asyncio
async def test_fetch_model_details(models_fetcher: ModelsFetcher):
    models = await models_fetcher.fetch_models()
    model_name = models[0]["name"]

    # Fetch filtered results
    filtered_results = await models_fetcher.fetch_model_details(model_name)

    # Validate filtered results
    assert len(filtered_results) > 0


@pytest.mark.asyncio
async def test_fetch_model_details_with_uniqueId(models_fetcher: ModelsFetcher):
    models = await models_fetcher.fetch_models()
    model = models[0]
    model_name = model["name"]
    unique_id = model["uniqueId"]

    # Fetch by name
    results_by_name = await models_fetcher.fetch_model_details(model_name)

    # Fetch by uniqueId
    results_by_uniqueId = await models_fetcher.fetch_model_details(
        model_name, unique_id
    )

    # Validate that both methods return the same result
    assert results_by_name["uniqueId"] == results_by_uniqueId["uniqueId"]
    assert results_by_name["name"] == results_by_uniqueId["name"]


@pytest.mark.asyncio
async def test_fetch_model_parents(models_fetcher: ModelsFetcher):
    models = await models_fetcher.fetch_models()
    model_name = models[0]["name"]

    # Fetch filtered results
    filtered_results = await models_fetcher.fetch_model_parents(model_name)

    # Validate filtered results
    assert len(filtered_results) > 0


@pytest.mark.asyncio
async def test_fetch_model_parents_with_uniqueId(models_fetcher: ModelsFetcher):
    models = await models_fetcher.fetch_models()
    model = models[0]
    model_name = model["name"]
    unique_id = model["uniqueId"]

    # Fetch by name
    results_by_name = await models_fetcher.fetch_model_parents(model_name)

    # Fetch by uniqueId
    results_by_uniqueId = await models_fetcher.fetch_model_parents(
        model_name, unique_id
    )

    # Validate that both methods return the same result
    assert len(results_by_name) == len(results_by_uniqueId)
    if len(results_by_name) > 0:
        # Compare the first parent's name if there are any parents
        assert results_by_name[0]["name"] == results_by_uniqueId[0]["name"]


@pytest.mark.asyncio
async def test_fetch_model_children(models_fetcher: ModelsFetcher):
    models = await models_fetcher.fetch_models()
    model_name = models[0]["name"]

    # Fetch filtered results
    filtered_results = await models_fetcher.fetch_model_children(model_name)

    # Validate filtered results
    assert isinstance(filtered_results, list)


@pytest.mark.asyncio
async def test_fetch_model_children_with_uniqueId(models_fetcher: ModelsFetcher):
    models = await models_fetcher.fetch_models()
    model = models[0]
    model_name = model["name"]
    unique_id = model["uniqueId"]

    # Fetch by name
    results_by_name = await models_fetcher.fetch_model_children(model_name)

    # Fetch by uniqueId
    results_by_uniqueId = await models_fetcher.fetch_model_children(
        model_name, unique_id
    )

    # Validate that both methods return the same result
    assert len(results_by_name) == len(results_by_uniqueId)
    if len(results_by_name) > 0:
        # Compare the first child's name if there are any children
        assert results_by_name[0]["name"] == results_by_uniqueId[0]["name"]


@pytest.mark.asyncio
async def test_fetch_exposures(exposures_fetcher: ExposuresFetcher):
    results = await exposures_fetcher.fetch_exposures()

    # Basic validation of the response
    assert isinstance(results, list)

    # If there are exposures, validate their structure
    if len(results) > 0:
        for exposure in results:
            assert "name" in exposure
            assert "uniqueId" in exposure
            assert isinstance(exposure["name"], str)
            assert isinstance(exposure["uniqueId"], str)


@pytest.mark.asyncio
async def test_fetch_exposures_pagination(exposures_fetcher: ExposuresFetcher):
    # Test that pagination works correctly by fetching all exposures
    # This test ensures the pagination logic handles multiple pages properly
    results = await exposures_fetcher.fetch_exposures()

    # Validate that we get results (assuming the test environment has some exposures)
    assert isinstance(results, list)

    # If we have more than the page size, ensure no duplicates
    if len(results) > 100:  # PAGE_SIZE is 100
        unique_ids = set()
        for exposure in results:
            unique_id = exposure["uniqueId"]
            assert unique_id not in unique_ids, f"Duplicate exposure found: {unique_id}"
            unique_ids.add(unique_id)


@pytest.mark.asyncio
async def test_fetch_exposure_details_by_unique_ids(
    exposures_fetcher: ExposuresFetcher,
):
    # First get all exposures to find one to test with
    exposures = await exposures_fetcher.fetch_exposures()

    # Skip test if no exposures are available
    if not exposures:
        pytest.skip("No exposures available in the test environment")

    # Pick the first exposure to test with
    test_exposure = exposures[0]
    unique_id = test_exposure["uniqueId"]

    # Fetch the same exposure by unique_ids
    result = await exposures_fetcher.fetch_exposure_details(unique_ids=[unique_id])

    # Validate that we got the correct exposure back
    assert isinstance(result, list)
    assert len(result) == 1
    exposure = result[0]
    assert exposure["uniqueId"] == unique_id
    assert exposure["name"] == test_exposure["name"]
    assert "exposureType" in exposure
    assert "maturity" in exposure

    # Validate structure
    if exposure.get("parents"):
        assert isinstance(exposure["parents"], list)
        for parent in exposure["parents"]:
            assert "uniqueId" in parent


@pytest.mark.asyncio
async def test_fetch_exposure_details_nonexistent(exposures_fetcher: ExposuresFetcher):
    # Test with a non-existent exposure
    result = await exposures_fetcher.fetch_exposure_details(
        unique_ids=["exposure.nonexistent.exposure"]
    )

    # Should return empty list when not found
    assert result == []


@pytest.mark.asyncio
async def test_fetch_sources(sources_fetcher: SourcesFetcher):
    """Test basic sources fetching functionality."""
    results = await sources_fetcher.fetch_sources()

    # Basic validation of the response
    assert isinstance(results, list)

    # If sources exist, validate their structure
    if len(results) > 0:
        for source in results:
            assert "name" in source
            assert "uniqueId" in source
            assert "sourceName" in source
            assert "resourceType" in source
            assert source["resourceType"] == "source"

            # Validate types
            assert isinstance(source["name"], str)
            assert isinstance(source["uniqueId"], str)
            assert isinstance(source["sourceName"], str)

            # Check for description (may be None)
            assert "description" in source

            # Validate freshness data if present
            if "freshness" in source and source["freshness"]:
                freshness = source["freshness"]
                assert isinstance(freshness, dict)
                # These fields may be present depending on configuration
                if "freshnessStatus" in freshness:
                    assert isinstance(freshness["freshnessStatus"], str)
                if "maxLoadedAt" in freshness:
                    assert freshness["maxLoadedAt"] is None or isinstance(
                        freshness["maxLoadedAt"], str
                    )
                if "maxLoadedAtTimeAgoInS" in freshness:
                    assert freshness["maxLoadedAtTimeAgoInS"] is None or isinstance(
                        freshness["maxLoadedAtTimeAgoInS"], int
                    )


@pytest.mark.asyncio
async def test_fetch_sources_with_filter(sources_fetcher: SourcesFetcher):
    """Test sources fetching with filter."""
    # First get all sources to find a valid source name
    all_sources = await sources_fetcher.fetch_sources()

    if len(all_sources) > 0:
        # Pick the first source name for filtering
        source_name = all_sources[0]["sourceName"]

        # Test filtering by source name
        filtered_results = await sources_fetcher.fetch_sources(
            source_names=[source_name]
        )

        # Validate filtered results
        assert isinstance(filtered_results, list)

        # All results should have the specified source name
        for source in filtered_results:
            assert source["sourceName"] == source_name


@pytest.mark.asyncio
async def test_get_all_sources_tool():
    """Test the get_all_sources tool function integration."""
    from dbt_mcp.config.config_providers import DefaultDiscoveryConfigProvider
    from dbt_mcp.config.settings import CredentialsProvider, DbtMcpSettings
    from dbt_mcp.discovery.tools import create_discovery_tool_definitions

    # Set up environment variables needed by DbtMcpSettings
    host = os.getenv("DBT_HOST")
    token = os.getenv("DBT_TOKEN")
    prod_env_id = os.getenv("DBT_PROD_ENV_ID")

    if not host or not token or not prod_env_id:
        pytest.skip(
            "DBT_HOST, DBT_TOKEN, and DBT_PROD_ENV_ID environment variables are required"
        )

    # Create settings and config provider
    settings = DbtMcpSettings()  # type: ignore
    credentials_provider = CredentialsProvider(settings)
    config_provider = DefaultDiscoveryConfigProvider(credentials_provider)

    # Create tool definitions
    tool_definitions = create_discovery_tool_definitions(config_provider)

    # Find the get_all_sources tool
    get_all_sources_tool = None
    for tool_def in tool_definitions:
        if tool_def.get_name() == "get_all_sources":
            get_all_sources_tool = tool_def
            break

    assert get_all_sources_tool is not None, (
        "get_all_sources tool not found in tool definitions"
    )

    # Execute the tool function
    result = await get_all_sources_tool.fn()

    # Validate the result
    assert isinstance(result, list)

    # If sources exist, validate structure
    if len(result) > 0:
        for source in result:
            assert "name" in source
            assert "uniqueId" in source
            assert "sourceName" in source
            assert "resourceType" in source
            assert source["resourceType"] == "source"

```

--------------------------------------------------------------------------------
/tests/unit/dbt_admin/test_tools.py:
--------------------------------------------------------------------------------

```python
from unittest.mock import Mock, patch, AsyncMock

import pytest

from dbt_mcp.dbt_admin.tools import (
    JobRunStatus,
    create_admin_api_tool_definitions,
    register_admin_api_tools,
)
from tests.mocks.config import mock_config


@pytest.fixture
def mock_fastmcp():
    class MockFastMCP:
        def __init__(self):
            self.tools = {}

        def tool(self, **kwargs):
            def decorator(func):
                self.tools[func.__name__] = func
                return func

            return decorator

    fastmcp = MockFastMCP()
    return fastmcp, fastmcp.tools


@pytest.fixture
def mock_admin_client():
    from unittest.mock import AsyncMock

    client = Mock()

    # Create AsyncMock methods with proper return values
    client.list_jobs = AsyncMock(
        return_value=[
            {
                "id": 1,
                "name": "test_job",
                "description": "Test job description",
                "dbt_version": "1.7.0",
                "job_type": "deploy",
                "triggers": {},
                "most_recent_run_id": 100,
                "most_recent_run_status": "success",
                "schedule": "0 9 * * *",
            }
        ]
    )

    client.get_job_details = AsyncMock(return_value={"id": 1, "name": "test_job"})
    client.trigger_job_run = AsyncMock(return_value={"id": 200, "status": "queued"})
    client.list_jobs_runs = AsyncMock(
        return_value=[
            {
                "id": 100,
                "status": 10,
                "status_humanized": "Success",
                "job_definition_id": 1,
                "started_at": "2024-01-01T00:00:00Z",
                "finished_at": "2024-01-01T00:05:00Z",
            }
        ]
    )
    client.get_job_run_details = AsyncMock(
        return_value={
            "id": 100,
            "status": 10,
            "status_humanized": "Success",
            "is_cancelled": False,
            "run_steps": [
                {
                    "index": 1,
                    "name": "Invoke dbt with `dbt build`",
                    "status": 20,
                    "status_humanized": "Error",
                    "logs_url": "https://example.com/logs",
                }
            ],
        }
    )
    client.cancel_job_run = AsyncMock(
        return_value={
            "id": 100,
            "status": 20,
            "status_humanized": "Cancelled",
        }
    )
    client.retry_job_run = AsyncMock(
        return_value={
            "id": 101,
            "status": 1,
            "status_humanized": "Queued",
        }
    )
    client.list_job_run_artifacts = AsyncMock(
        return_value=["manifest.json", "catalog.json"]
    )
    client.get_job_run_artifact = AsyncMock(return_value={"nodes": {}})

    return client


@patch("dbt_mcp.dbt_admin.tools.register_tools")
@patch("dbt_mcp.dbt_admin.tools.get_prompt")
async def test_register_admin_api_tools_all_tools(
    mock_get_prompt, mock_register_tools, mock_fastmcp
):
    mock_get_prompt.return_value = "Test prompt"
    fastmcp, tools = mock_fastmcp

    register_admin_api_tools(fastmcp, mock_config.admin_api_config_provider, [])

    # Should call register_tools with 10 tool definitions
    mock_register_tools.assert_called_once()
    args, kwargs = mock_register_tools.call_args
    tool_definitions = args[1]  # Second argument is the tool definitions list
    assert len(tool_definitions) == 10


@patch("dbt_mcp.dbt_admin.tools.register_tools")
@patch("dbt_mcp.dbt_admin.tools.get_prompt")
async def test_register_admin_api_tools_with_disabled_tools(
    mock_get_prompt, mock_register_tools, mock_fastmcp
):
    mock_get_prompt.return_value = "Test prompt"
    fastmcp, tools = mock_fastmcp

    disable_tools = ["list_jobs", "get_job", "trigger_job_run"]
    register_admin_api_tools(
        fastmcp, mock_config.admin_api_config_provider, disable_tools
    )

    # Should still call register_tools with all 10 tool definitions
    # The exclude_tools parameter is passed to register_tools to handle filtering
    mock_register_tools.assert_called_once()
    args, kwargs = mock_register_tools.call_args
    tool_definitions = args[1]  # Second argument is the tool definitions list
    exclude_tools_arg = args[2]  # Third argument is exclude_tools
    assert len(tool_definitions) == 10
    assert exclude_tools_arg == disable_tools


@patch("dbt_mcp.dbt_admin.tools.get_prompt")
async def test_list_jobs_tool(mock_get_prompt, mock_admin_client):
    mock_get_prompt.return_value = "List jobs prompt"

    # Make mock method async
    async def mock_list_jobs(account_id, **kwargs):
        return [
            {
                "id": 1,
                "name": "test_job",
                "description": "Test job description",
                "dbt_version": "1.7.0",
                "job_type": "deploy",
                "triggers": {},
                "most_recent_run_id": 100,
                "most_recent_run_status": "success",
                "schedule": "0 9 * * *",
            }
        ]

    mock_admin_client.list_jobs = mock_list_jobs

    tool_definitions = create_admin_api_tool_definitions(
        mock_admin_client, mock_config.admin_api_config_provider
    )
    list_jobs_tool = tool_definitions[0].fn  # First tool is list_jobs

    result = await list_jobs_tool(limit=10)

    assert isinstance(result, list)


@patch("dbt_mcp.dbt_admin.tools.get_prompt")
async def test_get_job_details_tool(mock_get_prompt, mock_admin_client):
    mock_get_prompt.return_value = "Get job prompt"

    tool_definitions = create_admin_api_tool_definitions(
        mock_admin_client, mock_config.admin_api_config_provider
    )
    get_job_details_tool = tool_definitions[1].fn  # Second tool is get_job_details

    result = await get_job_details_tool(job_id=1)

    assert isinstance(result, dict)
    mock_admin_client.get_job_details.assert_called_once_with(12345, 1)


@patch("dbt_mcp.dbt_admin.tools.get_prompt")
async def test_trigger_job_run_tool(mock_get_prompt, mock_admin_client):
    mock_get_prompt.return_value = "Trigger job run prompt"

    tool_definitions = create_admin_api_tool_definitions(
        mock_admin_client, mock_config.admin_api_config_provider
    )
    trigger_job_run_tool = tool_definitions[2].fn  # Third tool is trigger_job_run

    result = await trigger_job_run_tool(
        job_id=1, cause="Manual trigger", git_branch="main"
    )

    assert isinstance(result, dict)
    mock_admin_client.trigger_job_run.assert_called_once_with(
        12345, 1, "Manual trigger", git_branch="main"
    )


@patch("dbt_mcp.dbt_admin.tools.get_prompt")
async def test_list_jobs_runs_tool(mock_get_prompt, mock_admin_client):
    mock_get_prompt.return_value = "List runs prompt"

    tool_definitions = create_admin_api_tool_definitions(
        mock_admin_client, mock_config.admin_api_config_provider
    )
    list_jobs_runs_tool = tool_definitions[3].fn  # Fourth tool is list_jobs_runs

    result = await list_jobs_runs_tool(job_id=1, status=JobRunStatus.SUCCESS, limit=5)

    assert isinstance(result, list)
    mock_admin_client.list_jobs_runs.assert_called_once_with(
        12345, job_definition_id=1, status=10, limit=5
    )


@patch("dbt_mcp.dbt_admin.tools.get_prompt")
async def test_get_job_run_details_tool(mock_get_prompt, mock_admin_client):
    mock_get_prompt.return_value = "Get run prompt"

    tool_definitions = create_admin_api_tool_definitions(
        mock_admin_client, mock_config.admin_api_config_provider
    )
    get_job_run_details_tool = tool_definitions[
        4
    ].fn  # Fifth tool is get_job_run_details

    result = await get_job_run_details_tool(run_id=100)

    assert isinstance(result, dict)
    mock_admin_client.get_job_run_details.assert_called_once_with(12345, 100)


@patch("dbt_mcp.dbt_admin.tools.get_prompt")
async def test_cancel_job_run_tool(mock_get_prompt, mock_admin_client):
    mock_get_prompt.return_value = "Cancel run prompt"

    tool_definitions = create_admin_api_tool_definitions(
        mock_admin_client, mock_config.admin_api_config_provider
    )
    cancel_job_run_tool = tool_definitions[5].fn  # Sixth tool is cancel_job_run

    result = await cancel_job_run_tool(run_id=100)

    assert isinstance(result, dict)
    mock_admin_client.cancel_job_run.assert_called_once_with(12345, 100)


@patch("dbt_mcp.dbt_admin.tools.get_prompt")
async def test_retry_job_run_tool(mock_get_prompt, mock_admin_client):
    mock_get_prompt.return_value = "Retry run prompt"

    tool_definitions = create_admin_api_tool_definitions(
        mock_admin_client, mock_config.admin_api_config_provider
    )
    retry_job_run_tool = tool_definitions[6].fn  # Seventh tool is retry_job_run

    result = await retry_job_run_tool(run_id=100)

    assert isinstance(result, dict)
    mock_admin_client.retry_job_run.assert_called_once_with(12345, 100)


@patch("dbt_mcp.dbt_admin.tools.get_prompt")
async def test_list_job_run_artifacts_tool(mock_get_prompt, mock_admin_client):
    mock_get_prompt.return_value = "List run artifacts prompt"

    tool_definitions = create_admin_api_tool_definitions(
        mock_admin_client, mock_config.admin_api_config_provider
    )
    list_job_run_artifacts_tool = tool_definitions[
        7
    ].fn  # Eighth tool is list_job_run_artifacts

    result = await list_job_run_artifacts_tool(run_id=100)

    assert isinstance(result, list)
    mock_admin_client.list_job_run_artifacts.assert_called_once_with(12345, 100)


@patch("dbt_mcp.dbt_admin.tools.get_prompt")
async def test_get_job_run_artifact_tool(mock_get_prompt, mock_admin_client):
    mock_get_prompt.return_value = "Get run artifact prompt"

    tool_definitions = create_admin_api_tool_definitions(
        mock_admin_client, mock_config.admin_api_config_provider
    )
    get_job_run_artifact_tool = tool_definitions[
        8
    ].fn  # Ninth tool is get_job_run_artifact

    result = await get_job_run_artifact_tool(
        run_id=100, artifact_path="manifest.json", step=1
    )

    assert result is not None
    mock_admin_client.get_job_run_artifact.assert_called_once_with(
        12345, 100, "manifest.json", 1
    )


@patch("dbt_mcp.dbt_admin.tools.get_prompt")
async def test_tools_handle_exceptions(mock_get_prompt):
    mock_get_prompt.return_value = "Test prompt"
    mock_admin_client = Mock()
    mock_admin_client.list_jobs.side_effect = Exception("API Error")

    tool_definitions = create_admin_api_tool_definitions(
        mock_admin_client, mock_config.admin_api_config_provider
    )
    list_jobs_tool = tool_definitions[0].fn  # First tool is list_jobs

    with pytest.raises(Exception) as exc_info:
        await list_jobs_tool()
    assert "API Error" in str(exc_info.value)


@patch("dbt_mcp.dbt_admin.tools.get_prompt")
async def test_tools_with_no_optional_parameters(mock_get_prompt, mock_admin_client):
    mock_get_prompt.return_value = "Test prompt"

    tool_definitions = create_admin_api_tool_definitions(
        mock_admin_client, mock_config.admin_api_config_provider
    )

    # Test list_jobs with no parameters
    list_jobs_tool = tool_definitions[0].fn
    result = await list_jobs_tool()
    assert isinstance(result, list)
    mock_admin_client.list_jobs.assert_called_with(12345)

    # Test list_jobs_runs with no parameters
    list_jobs_runs_tool = tool_definitions[3].fn
    result = await list_jobs_runs_tool()
    assert isinstance(result, list)
    mock_admin_client.list_jobs_runs.assert_called_with(12345)

    # Test get_job_run_details
    get_job_run_details_tool = tool_definitions[4].fn
    result = await get_job_run_details_tool(run_id=100)
    assert isinstance(result, dict)
    mock_admin_client.get_job_run_details.assert_called_with(12345, 100)


@patch("dbt_mcp.dbt_admin.tools.get_prompt")
async def test_trigger_job_run_with_all_optional_params(
    mock_get_prompt, mock_admin_client
):
    mock_get_prompt.return_value = "Trigger job run prompt"

    tool_definitions = create_admin_api_tool_definitions(
        mock_admin_client, mock_config.admin_api_config_provider
    )
    trigger_job_run_tool = tool_definitions[2].fn  # Third tool is trigger_job_run

    result = await trigger_job_run_tool(
        job_id=1,
        cause="Manual trigger",
        git_branch="feature-branch",
        git_sha="abc123",
        schema_override="custom_schema",
    )

    assert isinstance(result, dict)
    mock_admin_client.trigger_job_run.assert_called_once_with(
        12345,
        1,
        "Manual trigger",
        git_branch="feature-branch",
        git_sha="abc123",
        schema_override="custom_schema",
    )


@patch("dbt_mcp.dbt_admin.tools.get_prompt")
@patch("dbt_mcp.dbt_admin.tools.ErrorFetcher")
async def test_get_job_run_error_tool(
    mock_error_fetcher_class, mock_get_prompt, mock_admin_client
):
    mock_get_prompt.return_value = "Get run error prompt"

    # Mock the ErrorFetcher instance and its analyze_run_errors method
    # Not part of client mock, so we create a separate mock
    mock_error_fetcher_instance = Mock()
    mock_error_fetcher_instance.analyze_run_errors = AsyncMock(
        return_value={
            "failed_steps": [
                {
                    "step_name": "Invoke dbt with `dbt build`",
                    "target": "prod",
                    "finished_at": "2024-01-01T10:00:00Z",
                    "errors": [
                        {
                            "unique_id": "model.analytics.user_sessions",
                            "message": "Database Error in model user_sessions...",
                            "relation_name": "prod.analytics.user_sessions",
                            "compiled_code": "SELECT * FROM raw.sessions",
                            "truncated_logs": None,
                        }
                    ],
                }
            ]
        }
    )
    mock_error_fetcher_class.return_value = mock_error_fetcher_instance

    tool_definitions = create_admin_api_tool_definitions(
        mock_admin_client, mock_config.admin_api_config_provider
    )
    get_job_run_error_tool = tool_definitions[9].fn  # Tenth tool is get_job_run_error

    result = await get_job_run_error_tool(run_id=100)

    assert isinstance(result, dict)
    assert "failed_steps" in result
    assert len(result["failed_steps"]) == 1

    step = result["failed_steps"][0]
    assert step["step_name"] == "Invoke dbt with `dbt build`"
    assert step["target"] == "prod"
    assert len(step["errors"]) == 1
    assert step["errors"][0]["message"] == "Database Error in model user_sessions..."

    mock_error_fetcher_class.assert_called_once()
    mock_error_fetcher_instance.analyze_run_errors.assert_called_once()

```

--------------------------------------------------------------------------------
/src/dbt_mcp/semantic_layer/client.py:
--------------------------------------------------------------------------------

```python
from collections.abc import Callable
from contextlib import AbstractContextManager
from typing import Any, Protocol

import pyarrow as pa
from dbtsl.api.shared.query_params import (
    GroupByParam,
    OrderByGroupBy,
    OrderByMetric,
    OrderBySpec,
)
from dbtsl.client.sync import SyncSemanticLayerClient
from dbtsl.error import QueryFailedError

from dbt_mcp.config.config_providers import ConfigProvider, SemanticLayerConfig
from dbt_mcp.errors import InvalidParameterError
from dbt_mcp.semantic_layer.gql.gql import GRAPHQL_QUERIES
from dbt_mcp.semantic_layer.gql.gql_request import submit_request
from dbt_mcp.semantic_layer.levenshtein import get_misspellings
from dbt_mcp.semantic_layer.types import (
    DimensionToolResponse,
    EntityToolResponse,
    GetMetricsCompiledSqlError,
    GetMetricsCompiledSqlResult,
    GetMetricsCompiledSqlSuccess,
    MetricToolResponse,
    OrderByParam,
    QueryMetricsError,
    QueryMetricsResult,
    QueryMetricsSuccess,
    SavedQueryToolResponse,
)


def DEFAULT_RESULT_FORMATTER(table: pa.Table) -> str:
    return table.to_pandas().to_json(orient="records", indent=2)


class SemanticLayerClientProtocol(Protocol):
    def session(self) -> AbstractContextManager[Any]: ...

    def query(
        self,
        metrics: list[str],
        group_by: list[GroupByParam | str] | None = None,
        limit: int | None = None,
        order_by: list[str | OrderByGroupBy | OrderByMetric] | None = None,
        where: list[str] | None = None,
        read_cache: bool = True,
    ) -> pa.Table: ...

    def compile_sql(
        self,
        metrics: list[str],
        group_by: list[str] | None = None,
        limit: int | None = None,
        order_by: list[str | OrderByGroupBy | OrderByMetric] | None = None,
        where: list[str] | None = None,
        read_cache: bool = True,
    ) -> str: ...


class SemanticLayerClientProvider(Protocol):
    async def get_client(self) -> SemanticLayerClientProtocol: ...


class DefaultSemanticLayerClientProvider:
    def __init__(self, config_provider: ConfigProvider[SemanticLayerConfig]):
        self.config_provider = config_provider

    async def get_client(self) -> SemanticLayerClientProtocol:
        config = await self.config_provider.get_config()
        return SyncSemanticLayerClient(
            environment_id=config.prod_environment_id,
            auth_token=config.token,
            host=config.host,
        )


class SemanticLayerFetcher:
    def __init__(
        self,
        config_provider: ConfigProvider[SemanticLayerConfig],
        client_provider: SemanticLayerClientProvider,
    ):
        self.client_provider = client_provider
        self.config_provider = config_provider
        self.entities_cache: dict[str, list[EntityToolResponse]] = {}
        self.dimensions_cache: dict[str, list[DimensionToolResponse]] = {}

    async def list_metrics(self, search: str | None = None) -> list[MetricToolResponse]:
        metrics_result = submit_request(
            await self.config_provider.get_config(),
            {"query": GRAPHQL_QUERIES["metrics"], "variables": {"search": search}},
        )
        return [
            MetricToolResponse(
                name=m.get("name"),
                type=m.get("type"),
                label=m.get("label"),
                description=m.get("description"),
                metadata=(m.get("config") or {}).get("meta", ""),
            )
            for m in metrics_result["data"]["metricsPaginated"]["items"]
        ]

    async def list_saved_queries(
        self, search: str | None = None
    ) -> list[SavedQueryToolResponse]:
        """Fetch all saved queries from the Semantic Layer API."""
        saved_queries_result = submit_request(
            await self.config_provider.get_config(),
            {
                "query": GRAPHQL_QUERIES["saved_queries"],
                "variables": {"search": search},
            },
        )
        return [
            SavedQueryToolResponse(
                name=sq.get("name"),
                label=sq.get("label"),
                description=sq.get("description"),
                metrics=[
                    m.get("name") for m in sq.get("queryParams", {}).get("metrics", [])
                ]
                if sq.get("queryParams", {}).get("metrics")
                else None,
                group_by=[
                    g.get("name") for g in sq.get("queryParams", {}).get("groupBy", [])
                ]
                if sq.get("queryParams", {}).get("groupBy")
                else None,
                where=sq.get("queryParams", {}).get("where", {}).get("whereSqlTemplate")
                if sq.get("queryParams", {}).get("where")
                else None,
            )
            for sq in saved_queries_result["data"]["savedQueriesPaginated"]["items"]
        ]

    async def get_dimensions(
        self, metrics: list[str], search: str | None = None
    ) -> list[DimensionToolResponse]:
        metrics_key = ",".join(sorted(metrics))
        if metrics_key not in self.dimensions_cache:
            dimensions_result = submit_request(
                await self.config_provider.get_config(),
                {
                    "query": GRAPHQL_QUERIES["dimensions"],
                    "variables": {
                        "metrics": [{"name": m} for m in metrics],
                        "search": search,
                    },
                },
            )
            dimensions = []
            for d in dimensions_result["data"]["dimensionsPaginated"]["items"]:
                dimensions.append(
                    DimensionToolResponse(
                        name=d.get("name"),
                        type=d.get("type"),
                        description=d.get("description"),
                        label=d.get("label"),
                        granularities=d.get("queryableGranularities")
                        + d.get("queryableTimeGranularities"),
                    )
                )
            self.dimensions_cache[metrics_key] = dimensions
        return self.dimensions_cache[metrics_key]

    async def get_entities(
        self, metrics: list[str], search: str | None = None
    ) -> list[EntityToolResponse]:
        metrics_key = ",".join(sorted(metrics))
        if metrics_key not in self.entities_cache:
            entities_result = submit_request(
                await self.config_provider.get_config(),
                {
                    "query": GRAPHQL_QUERIES["entities"],
                    "variables": {
                        "metrics": [{"name": m} for m in metrics],
                        "search": search,
                    },
                },
            )
            entities = [
                EntityToolResponse(
                    name=e.get("name"),
                    type=e.get("type"),
                    description=e.get("description"),
                )
                for e in entities_result["data"]["entitiesPaginated"]["items"]
            ]
            self.entities_cache[metrics_key] = entities
        return self.entities_cache[metrics_key]

    def _format_semantic_layer_error(self, error: Exception) -> str:
        """Format semantic layer errors by cleaning up common error message patterns."""
        error_str = str(error)
        return (
            error_str.replace("QueryFailedError(", "")
            .rstrip(")")
            .lstrip("[")
            .rstrip("]")
            .lstrip('"')
            .rstrip('"')
            .replace("INVALID_ARGUMENT: [FlightSQL]", "")
            .replace("(InvalidArgument; Prepare)", "")
            .replace("(InvalidArgument; ExecuteQuery)", "")
            .replace("Failed to prepare statement:", "")
            .replace("com.dbt.semanticlayer.exceptions.DataPlatformException:", "")
            .strip()
        )

    def _format_get_metrics_compiled_sql_error(
        self, compile_error: Exception
    ) -> GetMetricsCompiledSqlError:
        """Format get compiled SQL errors using the shared error formatter."""
        return GetMetricsCompiledSqlError(
            error=self._format_semantic_layer_error(compile_error)
        )

    async def validate_query_metrics_params(
        self, metrics: list[str], group_by: list[GroupByParam] | None
    ) -> str | None:
        errors = []
        available_metrics_names = [m.name for m in await self.list_metrics()]
        metric_misspellings = get_misspellings(
            targets=metrics,
            words=available_metrics_names,
            top_k=5,
        )
        for metric_misspelling in metric_misspellings:
            recommendations = (
                " Did you mean: " + ", ".join(metric_misspelling.similar_words) + "?"
            )
            errors.append(
                f"Metric {metric_misspelling.word} not found."
                + (recommendations if metric_misspelling.similar_words else "")
            )

        if errors:
            return f"Errors: {', '.join(errors)}"

        available_group_by = [d.name for d in await self.get_dimensions(metrics)] + [
            e.name for e in await self.get_entities(metrics)
        ]
        group_by_misspellings = get_misspellings(
            targets=[g.name for g in group_by or []],
            words=available_group_by,
            top_k=5,
        )
        for group_by_misspelling in group_by_misspellings:
            recommendations = (
                " Did you mean: " + ", ".join(group_by_misspelling.similar_words) + "?"
            )
            errors.append(
                f"Group by {group_by_misspelling.word} not found."
                + (recommendations if group_by_misspelling.similar_words else "")
            )

        if errors:
            return f"Errors: {', '.join(errors)}"
        return None

    # TODO: move this to the SDK
    def _format_query_failed_error(self, query_error: Exception) -> QueryMetricsError:
        if isinstance(query_error, QueryFailedError):
            return QueryMetricsError(
                error=self._format_semantic_layer_error(query_error)
            )
        else:
            return QueryMetricsError(error=str(query_error))

    def _get_order_bys(
        self,
        order_by: list[OrderByParam] | None,
        metrics: list[str] = [],
        group_by: list[GroupByParam] | None = None,
    ) -> list[OrderBySpec]:
        result: list[OrderBySpec] = []
        if order_by is None:
            return result
        queried_group_by = {g.name: g for g in group_by} if group_by else {}
        queried_metrics = set(metrics)
        for o in order_by:
            if o.name in queried_metrics:
                result.append(OrderByMetric(name=o.name, descending=o.descending))
            elif o.name in queried_group_by:
                selected_group_by = queried_group_by[o.name]
                result.append(
                    OrderByGroupBy(
                        name=selected_group_by.name,
                        descending=o.descending,
                        grain=selected_group_by.grain,
                    )
                )
            else:
                raise InvalidParameterError(
                    f"Order by `{o.name}` not found in metrics or group by"
                )
        return result

    async def get_metrics_compiled_sql(
        self,
        metrics: list[str],
        group_by: list[GroupByParam] | None = None,
        order_by: list[OrderByParam] | None = None,
        where: str | None = None,
        limit: int | None = None,
    ) -> GetMetricsCompiledSqlResult:
        """
        Get compiled SQL for the given metrics and group by parameters using the SDK.

        Args:
            metrics: List of metric names to get compiled SQL for
            group_by: List of group by parameters (dimensions/entities with optional grain)
            order_by: List of order by parameters
            where: Optional SQL WHERE clause to filter results
            limit: Optional limit for number of results

        Returns:
            GetMetricsCompiledSqlResult with either the compiled SQL or an error
        """
        validation_error = await self.validate_query_metrics_params(
            metrics=metrics,
            group_by=group_by,
        )
        if validation_error:
            return GetMetricsCompiledSqlError(error=validation_error)

        try:
            sl_client = await self.client_provider.get_client()
            with sl_client.session():
                parsed_order_by: list[OrderBySpec] = self._get_order_bys(
                    order_by=order_by, metrics=metrics, group_by=group_by
                )
                compiled_sql = sl_client.compile_sql(
                    metrics=metrics,
                    group_by=group_by,  # type: ignore
                    order_by=parsed_order_by,  # type: ignore
                    where=[where] if where else None,
                    limit=limit,
                    read_cache=True,
                )

                return GetMetricsCompiledSqlSuccess(sql=compiled_sql)

        except Exception as e:
            return self._format_get_metrics_compiled_sql_error(e)

    async def query_metrics(
        self,
        metrics: list[str],
        group_by: list[GroupByParam] | None = None,
        order_by: list[OrderByParam] | None = None,
        where: str | None = None,
        limit: int | None = None,
        result_formatter: Callable[[pa.Table], str] = DEFAULT_RESULT_FORMATTER,
    ) -> QueryMetricsResult:
        validation_error = await self.validate_query_metrics_params(
            metrics=metrics,
            group_by=group_by,
        )
        if validation_error:
            return QueryMetricsError(error=validation_error)

        try:
            query_error = None
            sl_client = await self.client_provider.get_client()
            with sl_client.session():
                # Catching any exception within the session
                # to ensure it is closed properly
                try:
                    parsed_order_by: list[OrderBySpec] = self._get_order_bys(
                        order_by=order_by, metrics=metrics, group_by=group_by
                    )
                    query_result = sl_client.query(
                        metrics=metrics,
                        group_by=group_by,  # type: ignore
                        order_by=parsed_order_by,  # type: ignore
                        where=[where] if where else None,
                        limit=limit,
                    )
                except Exception as e:
                    query_error = e
            if query_error:
                return self._format_query_failed_error(query_error)
            json_result = result_formatter(query_result)
            return QueryMetricsSuccess(result=json_result or "")
        except Exception as e:
            return self._format_query_failed_error(e)

```

--------------------------------------------------------------------------------
/tests/unit/dbt_admin/test_client.py:
--------------------------------------------------------------------------------

```python
from unittest.mock import Mock, patch

import pytest
import requests

from dbt_mcp.config.config_providers import AdminApiConfig
from dbt_mcp.dbt_admin.client import (
    AdminAPIError,
    DbtAdminAPIClient,
)


class MockHeadersProvider:
    """Mock headers provider for testing."""

    def __init__(self, headers: dict[str, str]):
        self._headers = headers

    def get_headers(self) -> dict[str, str]:
        return self._headers


class MockAdminApiConfigProvider:
    """Mock config provider for testing."""

    def __init__(self, config: AdminApiConfig):
        self.config = config

    async def get_config(self) -> AdminApiConfig:
        return self.config


@pytest.fixture
def admin_config():
    return AdminApiConfig(
        account_id=12345,
        headers_provider=MockHeadersProvider({"Authorization": "Bearer test_token"}),
        url="https://cloud.getdbt.com",
    )


@pytest.fixture
def admin_config_with_prefix():
    return AdminApiConfig(
        account_id=12345,
        headers_provider=MockHeadersProvider({"Authorization": "Bearer test_token"}),
        url="https://eu1.cloud.getdbt.com",
    )


@pytest.fixture
def client(admin_config):
    config_provider = MockAdminApiConfigProvider(admin_config)
    return DbtAdminAPIClient(config_provider)


@pytest.fixture
def client_with_prefix(admin_config_with_prefix):
    config_provider = MockAdminApiConfigProvider(admin_config_with_prefix)
    return DbtAdminAPIClient(config_provider)


async def test_client_initialization(client):
    config = await client.get_config()
    assert config.account_id == 12345
    assert config.headers_provider.get_headers() == {
        "Authorization": "Bearer test_token"
    }
    assert config.url == "https://cloud.getdbt.com"
    headers = await client.get_headers()
    assert headers["Authorization"] == "Bearer test_token"
    assert headers["Content-Type"] == "application/json"
    assert headers["Accept"] == "application/json"


@patch("requests.request")
async def test_make_request_success(mock_request, client):
    mock_response = Mock()
    mock_response.json.return_value = {"data": "test"}
    mock_response.raise_for_status.return_value = None
    mock_request.return_value = mock_response

    result = await client._make_request("GET", "/test/endpoint")

    assert result == {"data": "test"}
    headers = await client.get_headers()
    mock_request.assert_called_once_with(
        "GET", "https://cloud.getdbt.com/test/endpoint", headers=headers
    )


@patch("requests.request")
async def test_make_request_failure(mock_request, client):
    mock_response = Mock()
    mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError(
        "404 Not Found"
    )
    mock_request.return_value = mock_response

    with pytest.raises(AdminAPIError):
        await client._make_request("GET", "/test/endpoint")


@patch("requests.request")
async def test_list_jobs(mock_request, client):
    mock_response = Mock()
    mock_response.json.return_value = {
        "data": [
            {
                "id": 1,
                "name": "test_job",
                "description": "Test description",
                "dbt_version": "1.7.0",
                "job_type": "deploy",
                "triggers": {"github_webhook": True},
                "most_recent_run": {
                    "id": 100,
                    "status_humanized": "Success",
                    "started_at": "2024-01-01T00:00:00Z",
                    "finished_at": "2024-01-01T00:05:00Z",
                },
                "most_recent_completed_run": {
                    "id": 99,
                    "status_humanized": "Success",
                    "started_at": "2024-01-01T00:00:00Z",
                    "finished_at": "2024-01-01T00:04:00Z",
                },
                "schedule": {"cron": "0 9 * * *"},
                "next_run": "2024-01-02T09:00:00Z",
            }
        ]
    }
    mock_response.raise_for_status.return_value = None
    mock_request.return_value = mock_response

    result = await client.list_jobs(12345, project_id=1, limit=10)

    assert len(result) == 1
    assert result[0]["id"] == 1
    assert result[0]["name"] == "test_job"
    assert result[0]["most_recent_run_id"] == 100
    assert result[0]["schedule"] == "0 9 * * *"

    headers = await client.get_headers()
    mock_request.assert_called_once_with(
        "GET",
        "https://cloud.getdbt.com/api/v2/accounts/12345/jobs/?include_related=['most_recent_run','most_recent_completed_run']",
        headers=headers,
        params={"project_id": 1, "limit": 10},
    )


@patch("requests.request")
async def test_list_jobs_with_null_values(mock_request, client):
    mock_response = Mock()
    mock_response.json.return_value = {
        "data": [
            {
                "id": 1,
                "name": "test_job",
                "description": None,
                "dbt_version": "1.7.0",
                "job_type": "deploy",
                "triggers": {},
                "most_recent_run": None,
                "most_recent_completed_run": None,
                "schedule": None,
                "next_run": None,
            }
        ]
    }
    mock_response.raise_for_status.return_value = None
    mock_request.return_value = mock_response

    result = await client.list_jobs(12345)

    assert len(result) == 1
    assert result[0]["most_recent_run_id"] is None
    assert result[0]["schedule"] is None


@patch("requests.request")
async def test_get_job_details(mock_request, client):
    mock_response = Mock()
    mock_response.json.return_value = {"data": {"id": 1, "name": "test_job"}}
    mock_response.raise_for_status.return_value = None
    mock_request.return_value = mock_response

    result = await client.get_job_details(12345, 1)

    assert result == {"id": 1, "name": "test_job"}
    headers = await client.get_headers()
    mock_request.assert_called_once_with(
        "GET",
        "https://cloud.getdbt.com/api/v2/accounts/12345/jobs/1/?include_related=['most_recent_run','most_recent_completed_run']",
        headers=headers,
    )


@patch("requests.request")
async def test_trigger_job_run(mock_request, client):
    mock_response = Mock()
    mock_response.json.return_value = {"data": {"id": 200, "status": "queued"}}
    mock_response.raise_for_status.return_value = None
    mock_request.return_value = mock_response

    result = await client.trigger_job_run(
        12345, 1, "Manual trigger", git_branch="main", schema_override="test_schema"
    )

    assert result == {"id": 200, "status": "queued"}
    headers = await client.get_headers()
    mock_request.assert_called_once_with(
        "POST",
        "https://cloud.getdbt.com/api/v2/accounts/12345/jobs/1/run/",
        headers=headers,
        json={
            "cause": "Manual trigger",
            "git_branch": "main",
            "schema_override": "test_schema",
        },
    )


@patch("requests.request")
async def test_list_jobs_runs(mock_request, client):
    mock_response = Mock()
    mock_response.json.return_value = {
        "data": [
            {
                "id": 100,
                "status": 10,
                "status_humanized": "Success",
                "job": {"name": "test_job", "execute_step": ["dbt run"]},
                "started_at": "2024-01-01T00:00:00Z",
                "finished_at": "2024-01-01T00:05:00Z",
                # Fields that should be removed
                "account_id": 12345,
                "environment_id": 1,
                "blocked_by": None,
                "used_repo_cache": True,
                "audit": {},
                "created_at_humanized": "1 hour ago",
                "duration_humanized": "5 minutes",
                "finished_at_humanized": "1 hour ago",
                "queued_duration_humanized": "10 seconds",
                "run_duration_humanized": "4 minutes 50 seconds",
                "artifacts_saved": True,
                "artifact_s3_path": "s3://bucket/path",
                "has_docs_generated": True,
                "has_sources_generated": False,
                "notifications_sent": True,
                "executed_by_thread_id": "thread123",
                "updated_at": "2024-01-01T00:05:00Z",
                "dequeued_at": "2024-01-01T00:00:30Z",
                "last_checked_at": "2024-01-01T00:04:00Z",
                "last_heartbeat_at": "2024-01-01T00:04:30Z",
                "trigger": {},
                "run_steps": [],
                "deprecation": {},
                "environment": {},
            }
        ]
    }
    mock_response.raise_for_status.return_value = None
    mock_request.return_value = mock_response

    result = await client.list_jobs_runs(12345, job_definition_id=1, status="success")

    assert len(result) == 1
    run = result[0]
    assert run["id"] == 100
    assert run["job_name"] == "test_job"
    assert run["job_steps"] == ["dbt run"]

    # Verify removed fields are not present
    removed_fields = [
        "job",
        "account_id",
        "environment_id",
        "blocked_by",
        "used_repo_cache",
        "audit",
        "created_at_humanized",
        "duration_humanized",
        "finished_at_humanized",
        "queued_duration_humanized",
        "run_duration_humanized",
        "artifacts_saved",
        "artifact_s3_path",
        "has_docs_generated",
        "has_sources_generated",
        "notifications_sent",
        "executed_by_thread_id",
        "updated_at",
        "dequeued_at",
        "last_checked_at",
        "last_heartbeat_at",
        "trigger",
        "run_steps",
        "deprecation",
        "environment",
    ]
    for field in removed_fields:
        assert field not in run

    headers = await client.get_headers()
    mock_request.assert_called_once_with(
        "GET",
        "https://cloud.getdbt.com/api/v2/accounts/12345/runs/?include_related=['job']",
        headers=headers,
        params={"job_definition_id": 1, "status": "success"},
    )


@patch("requests.request")
async def test_get_job_run_details(mock_request, client):
    mock_response = Mock()
    mock_response.json.return_value = {
        "data": {
            "id": 100,
            "status": 10,
            "run_steps": [
                {
                    "id": 1,
                    "name": "dbt run",
                    "logs": "log data",
                }
            ],
        }
    }
    mock_response.raise_for_status.return_value = None
    mock_request.return_value = mock_response

    result = await client.get_job_run_details(12345, 100)

    assert result["id"] == 100
    # Verify truncated_debug_logs and logs are removed
    assert "truncated_debug_logs" not in result["run_steps"][0]
    assert "logs" not in result["run_steps"][0]

    headers = await client.get_headers()
    mock_request.assert_called_once_with(
        "GET",
        "https://cloud.getdbt.com/api/v2/accounts/12345/runs/100/?include_related=['run_steps']",
        headers=headers,
    )


@patch("requests.request")
async def test_cancel_job_run(mock_request, client):
    mock_response = Mock()
    mock_response.json.return_value = {"data": {"id": 100, "status": "cancelled"}}
    mock_response.raise_for_status.return_value = None
    mock_request.return_value = mock_response

    result = await client.cancel_job_run(12345, 100)

    assert result == {"id": 100, "status": "cancelled"}
    headers = await client.get_headers()
    mock_request.assert_called_once_with(
        "POST",
        "https://cloud.getdbt.com/api/v2/accounts/12345/runs/100/cancel/",
        headers=headers,
    )


@patch("requests.request")
async def test_retry_job_run(mock_request, client):
    mock_response = Mock()
    mock_response.json.return_value = {"data": {"id": 101, "status": "queued"}}
    mock_response.raise_for_status.return_value = None
    mock_request.return_value = mock_response

    result = await client.retry_job_run(12345, 100)

    assert result == {"id": 101, "status": "queued"}
    headers = await client.get_headers()
    mock_request.assert_called_once_with(
        "POST",
        "https://cloud.getdbt.com/api/v2/accounts/12345/runs/100/retry/",
        headers=headers,
    )


@patch("requests.request")
async def test_list_job_run_artifacts(mock_request, client):
    mock_response = Mock()
    mock_response.json.return_value = {
        "data": [
            "manifest.json",
            "catalog.json",
            "compiled/my_project/models/model.sql",
            "run/my_project/models/model.sql",
            "sources.json",
        ]
    }
    mock_response.raise_for_status.return_value = None
    mock_request.return_value = mock_response

    result = await client.list_job_run_artifacts(12345, 100)

    # Should filter out compiled/ and run/ artifacts
    expected = ["manifest.json", "catalog.json", "sources.json"]
    assert result == expected

    headers = await client.get_headers()
    mock_request.assert_called_once_with(
        "GET",
        "https://cloud.getdbt.com/api/v2/accounts/12345/runs/100/artifacts/",
        headers=headers,
    )


@patch("requests.get")
async def test_get_job_run_artifact_json(mock_get, client):
    mock_response = Mock()
    mock_response.json.return_value = {"nodes": {"model.test": {}}}
    mock_response.headers = {"content-type": "application/json"}
    mock_response.raise_for_status.return_value = None
    mock_get.return_value = mock_response

    result = await client.get_job_run_artifact(12345, 100, "manifest.json", step=1)

    # The client returns response.text, but the mock returns the mock_response.text which is a Mock object
    # In a real scenario with JSON content type, the API would return JSON as text
    assert result is not None
    mock_get.assert_called_once_with(
        "https://cloud.getdbt.com/api/v2/accounts/12345/runs/100/artifacts/manifest.json",
        headers={"Authorization": "Bearer test_token", "Accept": "*/*"},
        params={"step": 1},
    )


@patch("requests.get")
async def test_get_job_run_artifact_text(mock_get, client):
    mock_response = Mock()
    mock_response.text = "LOG DATA"
    mock_response.headers = {"content-type": "text/plain"}
    mock_response.raise_for_status.return_value = None
    mock_get.return_value = mock_response

    result = await client.get_job_run_artifact(12345, 100, "logs/dbt.log")

    assert result == "LOG DATA"
    mock_get.assert_called_once_with(
        "https://cloud.getdbt.com/api/v2/accounts/12345/runs/100/artifacts/logs/dbt.log",
        headers={"Authorization": "Bearer test_token", "Accept": "*/*"},
        params={},
    )


@patch("requests.get")
async def test_get_job_run_artifact_no_step_param(mock_get, client):
    mock_response = Mock()
    mock_response.text = "artifact content"
    mock_response.headers = {"content-type": "text/plain"}
    mock_response.raise_for_status.return_value = None
    mock_get.return_value = mock_response

    await client.get_job_run_artifact(12345, 100, "manifest.json")

    mock_get.assert_called_once_with(
        "https://cloud.getdbt.com/api/v2/accounts/12345/runs/100/artifacts/manifest.json",
        headers={"Authorization": "Bearer test_token", "Accept": "*/*"},
        params={},
    )


@patch("requests.get")
async def test_get_job_run_artifact_request_exception(mock_get, client):
    mock_get.side_effect = requests.exceptions.HTTPError("404 Not Found")

    with pytest.raises(requests.exceptions.HTTPError):
        await client.get_job_run_artifact(12345, 100, "nonexistent.json")

```

--------------------------------------------------------------------------------
/ui/src/App.tsx:
--------------------------------------------------------------------------------

```typescript
import { useEffect, useMemo, useState, useRef } from "react";
import "./App.css";
import dbtLogoBLK from "../assets/dbt_logo BLK.svg";
import dbtLogoWHT from "../assets/dbt_logo WHT.svg";

type Project = {
  id: number;
  name: string;
  account_id: number;
  account_name: string;
};

type DbtPlatformContext = {
  dev_environment: {
    id: number;
    name: string;
    deployment_type: string;
  } | null;
  prod_environment: {
    id: number;
    name: string;
    deployment_type: string;
  } | null;
  decoded_access_token: {
    decoded_claims: {
      sub: number;
    };
  };
};

type FetchRetryOptions = {
  attempts?: number;
  delayMs?: number;
  backoffFactor?: number;
  timeoutMs?: number;
  retryOnResponse?: (response: Response) => boolean;
};

function isAbortError(error: unknown): boolean {
  if (error instanceof DOMException) {
    return error.name === "AbortError";
  }
  return error instanceof Error && error.name === "AbortError";
}

function isNetworkError(error: unknown): boolean {
  if (error instanceof TypeError) {
    return true;
  }
  return error instanceof Error && error.name === "TypeError";
}

function sleep(ms: number) {
  return new Promise<void>((resolve) => {
    setTimeout(resolve, ms);
  });
}

async function fetchWithRetry(
  input: RequestInfo | URL,
  init?: RequestInit,
  options?: FetchRetryOptions
): Promise<Response> {
  const {
    attempts = 3,
    delayMs = 500,
    backoffFactor = 2,
    timeoutMs = 10000,
    retryOnResponse,
  } = options ?? {};

  let currentDelay = delayMs;

  for (let attempt = 0; attempt < attempts; attempt++) {
    if (attempt > 0 && currentDelay > 0) {
      await sleep(currentDelay);
      currentDelay *= backoffFactor;
    }

    const controller = new AbortController();
    const timeoutId = setTimeout(() => controller.abort(), timeoutMs);

    // Listen to existing signal if present
    if (init?.signal) {
      init.signal.addEventListener("abort", () => controller.abort(), {
        once: true,
      });
    }

    try {
      const response = await fetch(input, {
        ...init,
        signal: controller.signal,
      });

      clearTimeout(timeoutId);

      if (
        retryOnResponse &&
        retryOnResponse(response) &&
        attempt < attempts - 1
      ) {
        // Consume response body to free resources
        try {
          await response.arrayBuffer();
        } catch {
          // Ignore - may already be consumed or reader locked
        }
        continue;
      }

      return response;
    } catch (error) {
      clearTimeout(timeoutId);

      if (isAbortError(error)) {
        throw error;
      }

      if (!isNetworkError(error)) {
        throw error;
      }

      if (attempt === attempts - 1) {
        throw error;
      }
    }
  }

  throw new Error("Failed to fetch after retries");
}

function parseHash(): URLSearchParams {
  const hash = window.location.hash.startsWith("#")
    ? window.location.hash.slice(1)
    : window.location.hash;
  const query = hash.startsWith("?") ? hash.slice(1) : hash;
  return new URLSearchParams(query);
}

type OAuthResult = {
  status: string | null;
  error: string | null;
  errorDescription: string | null;
};

function useOAuthResult(): OAuthResult {
  const params = useMemo(() => parseHash(), []);
  return {
    status: params.get("status"),
    error: params.get("error"),
    errorDescription: params.get("error_description"),
  };
}

type CustomDropdownProps = {
  value: number | null;
  onChange: (value: string) => void;
  options: Project[];
  placeholder: string;
  id: string;
};

function CustomDropdown({
  value,
  onChange,
  options,
  placeholder,
  id,
}: CustomDropdownProps) {
  const [isOpen, setIsOpen] = useState(false);
  const dropdownRef = useRef<HTMLDivElement>(null);
  const triggerRef = useRef<HTMLButtonElement>(null);

  // Close dropdown when clicking outside
  useEffect(() => {
    function handleClickOutside(event: MouseEvent) {
      if (
        dropdownRef.current &&
        !dropdownRef.current.contains(event.target as Node)
      ) {
        setIsOpen(false);
      }
    }

    if (isOpen) {
      document.addEventListener("mousedown", handleClickOutside);
      return () => {
        document.removeEventListener("mousedown", handleClickOutside);
      };
    }
  }, [isOpen]);

  // Handle keyboard navigation
  useEffect(() => {
    function handleKeyDown(event: KeyboardEvent) {
      if (!isOpen) {
        if (
          event.key === "Enter" ||
          event.key === " " ||
          event.key === "ArrowDown"
        ) {
          event.preventDefault();
          setIsOpen(true);
        }
        return;
      }

      if (event.key === "Escape") {
        setIsOpen(false);
        triggerRef.current?.focus();
      }
    }

    if (triggerRef.current?.contains(document.activeElement)) {
      document.addEventListener("keydown", handleKeyDown);
      return () => {
        document.removeEventListener("keydown", handleKeyDown);
      };
    }
  }, [isOpen]);

  const selectedProject = options.find((p) => p.id === value);

  const handleToggle = () => {
    setIsOpen(!isOpen);
  };

  const handleOptionSelect = (project: Project) => {
    onChange(project.id.toString());
    setIsOpen(false);
    triggerRef.current?.focus();
  };

  return (
    <div className="custom-dropdown" ref={dropdownRef}>
      <button
        ref={triggerRef}
        id={id}
        type="button"
        className={`dropdown-trigger ${isOpen ? "open" : ""} ${
          !selectedProject ? "placeholder" : ""
        }`}
        onClick={handleToggle}
        aria-haspopup="listbox"
        aria-expanded={isOpen}
        aria-labelledby={`${id}-label`}
      >
        {selectedProject ? (
          <>
            <div className="option-primary">{selectedProject.name}</div>
            <div className="option-secondary">
              {selectedProject.account_name}
            </div>
          </>
        ) : (
          placeholder
        )}
      </button>

      {isOpen && (
        <div
          ref={dropdownRef}
          className="dropdown-options"
          role="listbox"
          aria-labelledby={`${id}-label`}
        >
          {options.map((project) => (
            <button
              key={`${project.account_id}-${project.id}`}
              type="button"
              className={`dropdown-option ${
                project.id === value ? "selected" : ""
              }`}
              onClick={() => handleOptionSelect(project)}
              role="option"
              aria-selected={project.id === value}
            >
              <div className="option-primary">{project.name}</div>
              <div className="option-secondary">{project.account_name}</div>
            </button>
          ))}
        </div>
      )}
    </div>
  );
}

export default function App() {
  const oauthResult = useOAuthResult();
  const [responseText, setResponseText] = useState<string | null>(null);
  const [projects, setProjects] = useState<Project[]>([]);
  const [projectsError, setProjectsError] = useState<string | null>(null);
  const [loadingProjects, setLoadingProjects] = useState(false);
  const [selectedProjectId, setSelectedProjectId] = useState<number | null>(
    null
  );
  const [dbtPlatformContext, setDbtPlatformContext] =
    useState<DbtPlatformContext | null>(null);
  const [continuing, setContinuing] = useState(false);
  const [shutdownComplete, setShutdownComplete] = useState(false);

  // Load available projects after OAuth success
  useEffect(() => {
    if (oauthResult.status !== "success") return;
    const abortController = new AbortController();
    let cancelled = false;

    const loadProjects = async () => {
      setLoadingProjects(true);
      setProjectsError(null);

      try {
        const response = await fetchWithRetry(
          "/projects",
          { signal: abortController.signal },
          { attempts: 3, delayMs: 400 }
        );

        if (!response.ok) {
          throw new Error(`Failed to load projects (${response.status})`);
        }

        const data: Project[] = await response.json();

        if (!cancelled) {
          setProjects(data);
        }
      } catch (err) {
        if (cancelled || isAbortError(err)) {
          return;
        }

        const msg = err instanceof Error ? err.message : String(err);
        setProjectsError(msg);
      } finally {
        if (!cancelled) {
          setLoadingProjects(false);
        }
      }
    };

    loadProjects();

    return () => {
      cancelled = true;
      abortController.abort();
    };
  }, [oauthResult.status]);

  // Fetch saved selected project on load after OAuth success
  useEffect(() => {
    if (oauthResult.status !== "success") return;
    const abortController = new AbortController();
    let cancelled = false;

    (async () => {
      try {
        const res = await fetchWithRetry(
          "/dbt_platform_context",
          { signal: abortController.signal },
          { attempts: 2, delayMs: 400 }
        );
        if (!res.ok || cancelled) return; // if no config yet or server error, skip silently
        const data: DbtPlatformContext = await res.json();
        if (!cancelled) {
          setDbtPlatformContext(data);
        }
      } catch (err) {
        if (isAbortError(err) || cancelled) {
          return;
        }
        // ignore other failures to keep UX consistent
      }
    })();

    return () => {
      cancelled = true;
      abortController.abort();
    };
  }, [oauthResult.status]);

  const onContinue = async () => {
    if (continuing) return;
    setContinuing(true);
    setResponseText(null);
    try {
      const res = await fetchWithRetry(
        "/shutdown",
        { method: "POST" },
        { attempts: 3, delayMs: 400 }
      );
      const text = await res.text();
      if (res.ok) {
        setShutdownComplete(true);
        window.close();
      } else {
        setResponseText(text);
      }
    } catch (err) {
      setResponseText(String(err));
    } finally {
      setContinuing(false);
    }
  };

  const onSelectProject = async (projectIdStr: string) => {
    setDbtPlatformContext(null);
    const projectId = Number(projectIdStr);
    setSelectedProjectId(Number.isNaN(projectId) ? null : projectId);
    const project = projects.find((p) => p.id === projectId);
    if (!project) return;
    try {
      const res = await fetchWithRetry(
        "/selected_project",
        {
          method: "POST",
          headers: { "Content-Type": "application/json" },
          body: JSON.stringify({
            account_id: project.account_id,
            project_id: project.id,
          }),
        },
        { attempts: 3, delayMs: 400 }
      );
      if (res.ok) {
        const data = await res.json();
        setDbtPlatformContext(data);
      } else {
        setResponseText(await res.text());
        setDbtPlatformContext(null);
      }
    } catch (err) {
      setResponseText(String(err));
      setDbtPlatformContext(null);
    }
  };

  return (
    <div className="app-container">
      <div className="logo-container">
        <img src={dbtLogoBLK} alt="dbt" className="logo logo-light" />
        <img src={dbtLogoWHT} alt="dbt" className="logo logo-dark" />
      </div>
      <div className="app-content">
        <header className="app-header">
          <h1>dbt Platform Setup</h1>
          <p>Configure your dbt Platform connection</p>
        </header>

        {oauthResult.status === "error" && (
          <section className="error-section">
            <div className="section-header">
              <h2>Authentication Error</h2>
              <p>There was a problem during authentication</p>
            </div>

            <div className="error-details">
              {oauthResult.error && (
                <div className="error-item">
                  <strong>Error Code:</strong>
                  <code className="error-code">{oauthResult.error}</code>
                </div>
              )}

              {oauthResult.errorDescription && (
                <div className="error-item">
                  <strong>Description:</strong>
                  <p className="error-description">
                    {decodeURIComponent(oauthResult.errorDescription)}
                  </p>
                </div>
              )}

              <div className="error-actions">
                <p>
                  Please close this window and try again. If the problem
                  persists, contact support.
                </p>
              </div>
            </div>
          </section>
        )}

        {oauthResult.status === "success" && !shutdownComplete && (
          <section className="project-selection-section">
            <div className="section-header">
              <h2>Select a Project</h2>
              <p>Choose the dbt project you want to work with</p>
            </div>

            <div className="form-content">
              {loadingProjects && (
                <div className="loading-state">
                  <div className="spinner"></div>
                  <span>Loading projects…</span>
                </div>
              )}

              {projectsError && (
                <div className="error-state">
                  <strong>Error loading projects</strong>
                  <p>{projectsError}</p>
                </div>
              )}

              {!loadingProjects && !projectsError && (
                <div className="form-group">
                  <label
                    htmlFor="project-select"
                    className="form-label"
                    id="project-select-label"
                  >
                    Available Projects
                  </label>
                  <CustomDropdown
                    id="project-select"
                    value={selectedProjectId}
                    onChange={onSelectProject}
                    options={projects}
                    placeholder="Choose a project"
                  />
                </div>
              )}
            </div>
          </section>
        )}

        {dbtPlatformContext && !shutdownComplete && (
          <section className="context-section">
            <div className="section-header">
              <h2>Current Configuration</h2>
              <p>Your dbt Platform context is ready</p>
            </div>

            <div className="context-details">
              <div className="context-item">
                <strong>User ID:</strong>{" "}
                {dbtPlatformContext.decoded_access_token?.decoded_claims.sub}
              </div>

              {dbtPlatformContext.dev_environment && (
                <div className="context-item">
                  <strong>Development Environment:</strong>
                  <div className="environment-details">
                    <span className="env-name">
                      {dbtPlatformContext.dev_environment.name}
                    </span>
                  </div>
                </div>
              )}

              {dbtPlatformContext.prod_environment && (
                <div className="context-item">
                  <strong>Production Environment:</strong>
                  <div className="environment-details">
                    <span className="env-name">
                      {dbtPlatformContext.prod_environment.name}
                    </span>
                  </div>
                </div>
              )}
            </div>
          </section>
        )}

        {dbtPlatformContext && !shutdownComplete && (
          <div className="button-container">
            <button
              onClick={onContinue}
              className="primary-button"
              disabled={selectedProjectId === null || continuing}
            >
              {continuing ? "Closing…" : "Continue"}
            </button>
          </div>
        )}

        {shutdownComplete && (
          <section className="completion-section">
            <div className="completion-card">
              <h2>All Set!</h2>
              <p>
                Your dbt Platform setup has finished. This window can now be
                closed.
              </p>
            </div>
          </section>
        )}

        {responseText && (
          <section className="response-section">
            <div className="section-header">
              <h3>Response</h3>
            </div>
            <pre className="response-text">{responseText}</pre>
          </section>
        )}
      </div>
    </div>
  );
}

```

--------------------------------------------------------------------------------
/tests/unit/tracking/test_tracking.py:
--------------------------------------------------------------------------------

```python
import json
import uuid
from unittest.mock import patch

import pytest

from dbt_mcp.config.settings import AuthenticationMethod, DbtMcpSettings
from dbt_mcp.tools.tool_names import ToolName
from dbt_mcp.tools.toolsets import Toolset, proxied_tools
from dbt_mcp.tracking.tracking import DefaultUsageTracker, ToolCalledEvent
from tests.mocks.config import MockCredentialsProvider


class TestUsageTracker:
    @pytest.mark.asyncio
    async def test_emit_tool_called_event_disabled(self):
        # Create settings with tracking explicitly disabled
        # usage_tracking_enabled is a property, so we need to set do_not_track
        mock_settings = DbtMcpSettings.model_construct(
            do_not_track="true",
        )

        tracker = DefaultUsageTracker(
            credentials_provider=MockCredentialsProvider(mock_settings),
            session_id=uuid.uuid4(),
        )

        with patch("dbt_mcp.tracking.tracking.log_proto") as mock_log_proto:
            await tracker.emit_tool_called_event(
                tool_called_event=ToolCalledEvent(
                    tool_name="list_metrics",
                    arguments={"foo": "bar"},
                    start_time_ms=0,
                    end_time_ms=1,
                    error_message=None,
                ),
            )

        mock_log_proto.assert_not_called()

    @pytest.mark.asyncio
    async def test_emit_tool_called_event_enabled(self):
        # Create settings with tracking enabled
        # usage_tracking_enabled is a property - tracking is enabled by default
        # when do_not_track and send_anonymous_usage_data are not set
        mock_settings = DbtMcpSettings.model_construct(
            do_not_track=None,  # Not disabled
            send_anonymous_usage_data=None,  # Not disabled
            dbt_prod_env_id=1,
            dbt_dev_env_id=2,
            dbt_user_id=3,
            actual_host="test.dbt.com",
            actual_host_prefix="prefix",
        )

        mock_credentials_provider = MockCredentialsProvider(mock_settings)

        tracker = DefaultUsageTracker(
            credentials_provider=mock_credentials_provider,
            session_id=uuid.uuid4(),
        )

        with (
            patch("uuid.uuid4", return_value="event-1"),
            patch("dbt_mcp.tracking.tracking.log_proto") as mock_log_proto,
            patch(
                "dbt_mcp.tracking.tracking.DefaultUsageTracker._get_local_user_id",
                return_value="local-user",
            ),
        ):
            await tracker.emit_tool_called_event(
                tool_called_event=ToolCalledEvent(
                    tool_name="list_metrics",
                    arguments={"foo": "bar"},
                    start_time_ms=0,
                    end_time_ms=1,
                    error_message=None,
                ),
            )

        mock_log_proto.assert_called_once()
        tool_called = mock_log_proto.call_args.args[0]
        assert tool_called.tool_name == "list_metrics"
        assert json.loads(tool_called.arguments["foo"]) == "bar"
        assert tool_called.dbt_cloud_environment_id_dev == "2"
        assert tool_called.dbt_cloud_environment_id_prod == "1"
        assert tool_called.dbt_cloud_user_id == "3"
        assert tool_called.local_user_id == "local-user"

    @pytest.mark.asyncio
    async def test_get_local_user_id_success(self):
        """Test loading local_user_id from .user.yml file"""
        mock_settings = DbtMcpSettings.model_construct(
            dbt_profiles_dir="/fake/profiles",
        )
        tracker = DefaultUsageTracker(
            credentials_provider=MockCredentialsProvider(mock_settings),
            session_id=uuid.uuid4(),
        )

        user_data = {"id": "user-123"}
        with patch("dbt_mcp.tracking.tracking.try_read_yaml", return_value=user_data):
            result = tracker._get_local_user_id(mock_settings)
            assert result == "user-123"

    @pytest.mark.asyncio
    async def test_get_local_user_id_caching(self):
        """Test that local_user_id is cached after first load"""
        mock_settings = DbtMcpSettings.model_construct(
            dbt_profiles_dir="/fake/profiles",
        )
        tracker = DefaultUsageTracker(
            credentials_provider=MockCredentialsProvider(mock_settings),
            session_id=uuid.uuid4(),
        )

        user_data = {"id": "user-123"}
        with patch(
            "dbt_mcp.tracking.tracking.try_read_yaml", return_value=user_data
        ) as mock_read:
            # First call should load from file
            result1 = tracker._get_local_user_id(mock_settings)
            assert result1 == "user-123"
            assert mock_read.call_count == 1

            # Second call should use cached value
            result2 = tracker._get_local_user_id(mock_settings)
            assert result2 == "user-123"
            assert mock_read.call_count == 1  # Not called again

    @pytest.mark.asyncio
    async def test_get_local_user_id_fusion_format(self):
        """Test handling of dbt Fusion format for .user.yml"""
        mock_settings = DbtMcpSettings.model_construct(
            dbt_profiles_dir="/fake/profiles",
        )
        tracker = DefaultUsageTracker(
            credentials_provider=MockCredentialsProvider(mock_settings),
            session_id=uuid.uuid4(),
        )

        # dbt Fusion may return a string directly instead of a dict
        user_data = "user-fusion-456"
        with patch("dbt_mcp.tracking.tracking.try_read_yaml", return_value=user_data):
            result = tracker._get_local_user_id(mock_settings)
            assert result == "user-fusion-456"

    @pytest.mark.asyncio
    async def test_get_local_user_id_no_file(self):
        """Test behavior when .user.yml doesn't exist - should generate new UUID"""
        mock_settings = DbtMcpSettings.model_construct(
            dbt_profiles_dir="/fake/profiles",
        )
        tracker = DefaultUsageTracker(
            credentials_provider=MockCredentialsProvider(mock_settings),
            session_id=uuid.uuid4(),
        )

        with patch("dbt_mcp.tracking.tracking.try_read_yaml", return_value=None):
            result = tracker._get_local_user_id(mock_settings)
            # When file doesn't exist, a new UUID should be generated
            assert result is not None
            # Verify it's a valid UUID string
            uuid.UUID(result)  # This will raise ValueError if invalid

    @pytest.mark.asyncio
    async def test_get_settings_caching(self):
        """Test that settings are cached after first retrieval"""
        mock_settings = DbtMcpSettings.model_construct(
            dbt_prod_env_id=123,
        )
        mock_credentials_provider = MockCredentialsProvider(mock_settings)

        tracker = DefaultUsageTracker(
            credentials_provider=mock_credentials_provider,
            session_id=uuid.uuid4(),
        )

        # Mock the credentials provider to track call count
        original_get_credentials = mock_credentials_provider.get_credentials
        call_count = 0

        async def tracked_get_credentials():
            nonlocal call_count
            call_count += 1
            return await original_get_credentials()

        mock_credentials_provider.get_credentials = tracked_get_credentials

        # First call should fetch settings
        settings1 = await tracker._get_settings()
        assert settings1.dbt_prod_env_id == 123
        assert call_count == 1

        # Second call should use cached settings
        settings2 = await tracker._get_settings()
        assert settings2.dbt_prod_env_id == 123
        assert call_count == 1  # Not called again

    @pytest.mark.asyncio
    async def test_get_disabled_toolsets_none_disabled(self):
        """Test when all toolsets are enabled"""
        mock_settings = DbtMcpSettings.model_construct(
            disable_sql=False,
            disable_semantic_layer=False,
            disable_discovery=False,
            disable_dbt_cli=False,
            disable_admin_api=False,
            disable_dbt_codegen=False,
        )
        tracker = DefaultUsageTracker(
            credentials_provider=MockCredentialsProvider(mock_settings),
            session_id=uuid.uuid4(),
        )

        disabled = tracker._get_disabled_toolsets(mock_settings)
        assert disabled == []

    @pytest.mark.asyncio
    async def test_get_disabled_toolsets_some_disabled(self):
        """Test when some toolsets are disabled"""
        mock_settings = DbtMcpSettings.model_construct(
            disable_sql=True,
            disable_semantic_layer=True,
            disable_discovery=False,
            disable_dbt_cli=False,
            disable_admin_api=False,
            disable_dbt_codegen=False,
        )
        tracker = DefaultUsageTracker(
            credentials_provider=MockCredentialsProvider(mock_settings),
            session_id=uuid.uuid4(),
        )

        disabled = tracker._get_disabled_toolsets(mock_settings)
        assert set(disabled) == {Toolset.SQL, Toolset.SEMANTIC_LAYER}

    @pytest.mark.asyncio
    async def test_get_disabled_toolsets_all_disabled(self):
        """Test when all toolsets are disabled"""
        mock_settings = DbtMcpSettings.model_construct(
            disable_sql=True,
            disable_semantic_layer=True,
            disable_discovery=True,
            disable_dbt_cli=True,
            disable_admin_api=True,
            disable_dbt_codegen=True,
        )
        tracker = DefaultUsageTracker(
            credentials_provider=MockCredentialsProvider(mock_settings),
            session_id=uuid.uuid4(),
        )

        disabled = tracker._get_disabled_toolsets(mock_settings)
        assert set(disabled) == {
            Toolset.SQL,
            Toolset.SEMANTIC_LAYER,
            Toolset.DISCOVERY,
            Toolset.DBT_CLI,
            Toolset.ADMIN_API,
            Toolset.DBT_CODEGEN,
        }

    @pytest.mark.asyncio
    async def test_emit_tool_called_event_includes_authentication_method(self):
        """Test that authentication_method is included in the event"""
        mock_settings = DbtMcpSettings.model_construct(
            do_not_track=None,
            send_anonymous_usage_data=None,
            dbt_prod_env_id=1,
        )
        mock_credentials_provider = MockCredentialsProvider(mock_settings)
        mock_credentials_provider.authentication_method = AuthenticationMethod.ENV_VAR

        tracker = DefaultUsageTracker(
            credentials_provider=mock_credentials_provider,
            session_id=uuid.uuid4(),
        )

        with (
            patch("dbt_mcp.tracking.tracking.log_proto") as mock_log_proto,
            patch(
                "dbt_mcp.tracking.tracking.DefaultUsageTracker._get_local_user_id",
                return_value=None,
            ),
        ):
            await tracker.emit_tool_called_event(
                tool_called_event=ToolCalledEvent(
                    tool_name="test_tool",
                    arguments={},
                    start_time_ms=0,
                    end_time_ms=1,
                    error_message=None,
                ),
            )

        mock_log_proto.assert_called_once()
        tool_called = mock_log_proto.call_args.args[0]
        assert tool_called.authentication_method == "env_var"

    @pytest.mark.asyncio
    async def test_emit_tool_called_event_includes_disabled_toolsets(self):
        """Test that disabled_toolsets are included in the event"""
        mock_settings = DbtMcpSettings.model_construct(
            do_not_track=None,
            send_anonymous_usage_data=None,
            disable_sql=True,
            disable_semantic_layer=True,
            disable_discovery=False,
            disable_dbt_cli=False,
            disable_admin_api=False,
            disable_dbt_codegen=False,
        )
        mock_credentials_provider = MockCredentialsProvider(mock_settings)

        tracker = DefaultUsageTracker(
            credentials_provider=mock_credentials_provider,
            session_id=uuid.uuid4(),
        )

        with (
            patch("dbt_mcp.tracking.tracking.log_proto") as mock_log_proto,
            patch(
                "dbt_mcp.tracking.tracking.DefaultUsageTracker._get_local_user_id",
                return_value=None,
            ),
        ):
            await tracker.emit_tool_called_event(
                tool_called_event=ToolCalledEvent(
                    tool_name="test_tool",
                    arguments={},
                    start_time_ms=0,
                    end_time_ms=1,
                    error_message=None,
                ),
            )

        mock_log_proto.assert_called_once()
        tool_called = mock_log_proto.call_args.args[0]
        assert set(tool_called.disabled_toolsets) == {"sql", "semantic_layer"}

    @pytest.mark.asyncio
    async def test_emit_tool_called_event_includes_disabled_tools(self):
        """Test that disabled_tools are included in the event"""
        mock_settings = DbtMcpSettings.model_construct(
            do_not_track=None,
            send_anonymous_usage_data=None,
            disable_tools=[ToolName.BUILD, ToolName.RUN],
        )
        mock_credentials_provider = MockCredentialsProvider(mock_settings)

        tracker = DefaultUsageTracker(
            credentials_provider=mock_credentials_provider,
            session_id=uuid.uuid4(),
        )

        with (
            patch("dbt_mcp.tracking.tracking.log_proto") as mock_log_proto,
            patch(
                "dbt_mcp.tracking.tracking.DefaultUsageTracker._get_local_user_id",
                return_value=None,
            ),
        ):
            await tracker.emit_tool_called_event(
                tool_called_event=ToolCalledEvent(
                    tool_name="test_tool",
                    arguments={},
                    start_time_ms=0,
                    end_time_ms=1,
                    error_message=None,
                ),
            )

        mock_log_proto.assert_called_once()
        tool_called = mock_log_proto.call_args.args[0]
        assert set(tool_called.disabled_tools) == {"build", "run"}

    @pytest.mark.asyncio
    async def test_emit_tool_called_event_includes_session_id(self):
        """Test that session_id is included in the event context"""
        mock_settings = DbtMcpSettings.model_construct(
            do_not_track=None,
            send_anonymous_usage_data=None,
        )
        mock_credentials_provider = MockCredentialsProvider(mock_settings)

        session_id = uuid.uuid4()
        tracker = DefaultUsageTracker(
            credentials_provider=mock_credentials_provider,
            session_id=session_id,
        )

        with (
            patch("dbt_mcp.tracking.tracking.log_proto") as mock_log_proto,
            patch(
                "dbt_mcp.tracking.tracking.DefaultUsageTracker._get_local_user_id",
                return_value=None,
            ),
        ):
            await tracker.emit_tool_called_event(
                tool_called_event=ToolCalledEvent(
                    tool_name="test_tool",
                    arguments={},
                    start_time_ms=0,
                    end_time_ms=1,
                    error_message=None,
                ),
            )

        mock_log_proto.assert_called_once()
        tool_called = mock_log_proto.call_args.args[0]
        assert tool_called.ctx.session_id == str(session_id)

    @pytest.mark.asyncio
    async def test_emit_tool_called_event_includes_dbt_mcp_version(self):
        """Test that dbt_mcp_version is included in the event"""
        mock_settings = DbtMcpSettings.model_construct(
            do_not_track=None,
            send_anonymous_usage_data=None,
        )
        mock_credentials_provider = MockCredentialsProvider(mock_settings)

        tracker = DefaultUsageTracker(
            credentials_provider=mock_credentials_provider,
            session_id=uuid.uuid4(),
        )

        with (
            patch("dbt_mcp.tracking.tracking.log_proto") as mock_log_proto,
            patch(
                "dbt_mcp.tracking.tracking.DefaultUsageTracker._get_local_user_id",
                return_value=None,
            ),
        ):
            await tracker.emit_tool_called_event(
                tool_called_event=ToolCalledEvent(
                    tool_name="test_tool",
                    arguments={},
                    start_time_ms=0,
                    end_time_ms=1,
                    error_message=None,
                ),
            )

        mock_log_proto.assert_called_once()
        tool_called = mock_log_proto.call_args.args[0]
        # Just verify the field exists, don't assert specific version
        assert hasattr(tool_called, "dbt_mcp_version")
        assert isinstance(tool_called.dbt_mcp_version, str)

    @pytest.mark.asyncio
    async def test_emit_tool_called_event_proxied_tools_not_tracked(self):
        """Test that proxied tools are not tracked locally (tracked on backend)"""
        mock_settings = DbtMcpSettings.model_construct(
            do_not_track=None,
            send_anonymous_usage_data=None,
            dbt_prod_env_id=1,
        )
        mock_credentials_provider = MockCredentialsProvider(mock_settings)

        tracker = DefaultUsageTracker(
            credentials_provider=mock_credentials_provider,
            session_id=uuid.uuid4(),
        )

        with patch("dbt_mcp.tracking.tracking.log_proto") as mock_log_proto:
            # Test each proxied tool
            for proxied_tool in proxied_tools:
                await tracker.emit_tool_called_event(
                    tool_called_event=ToolCalledEvent(
                        tool_name=proxied_tool.value,
                        arguments={"query": "SELECT 1"},
                        start_time_ms=0,
                        end_time_ms=1,
                        error_message=None,
                    ),
                )

        # log_proto should never be called for proxied tools
        mock_log_proto.assert_not_called()

    @pytest.mark.asyncio
    async def test_emit_tool_called_event_non_proxied_tools_are_tracked(self):
        """Test that non-proxied tools are still tracked normally"""
        mock_settings = DbtMcpSettings.model_construct(
            do_not_track=None,
            send_anonymous_usage_data=None,
            dbt_prod_env_id=1,
        )
        mock_credentials_provider = MockCredentialsProvider(mock_settings)

        tracker = DefaultUsageTracker(
            credentials_provider=mock_credentials_provider,
            session_id=uuid.uuid4(),
        )

        with (
            patch("dbt_mcp.tracking.tracking.log_proto") as mock_log_proto,
            patch(
                "dbt_mcp.tracking.tracking.DefaultUsageTracker._get_local_user_id",
                return_value=None,
            ),
        ):
            # Use a non-proxied tool (e.g., list_metrics)
            await tracker.emit_tool_called_event(
                tool_called_event=ToolCalledEvent(
                    tool_name="list_metrics",
                    arguments={},
                    start_time_ms=0,
                    end_time_ms=1,
                    error_message=None,
                ),
            )

        # log_proto should be called for non-proxied tools
        mock_log_proto.assert_called_once()
        tool_called = mock_log_proto.call_args.args[0]
        assert tool_called.tool_name == "list_metrics"

```
Page 3/4FirstPrevNextLast