#
tokens: 36569/50000 6/305 files (page 4/4)
lines: off (toggle) GitHub
raw markdown copy
This is page 4 of 4. Use http://codebase.md/dbt-labs/dbt-mcp?page={x} to view the full context.

# Directory Structure

```
├── .changes
│   ├── header.tpl.md
│   ├── unreleased
│   │   ├── .gitkeep
│   │   ├── Bug Fix-20251028-143835.yaml
│   │   ├── Enhancement or New Feature-20251014-175047.yaml
│   │   └── Under the Hood-20251030-151902.yaml
│   ├── v0.1.3.md
│   ├── v0.10.0.md
│   ├── v0.10.1.md
│   ├── v0.10.2.md
│   ├── v0.10.3.md
│   ├── v0.2.0.md
│   ├── v0.2.1.md
│   ├── v0.2.10.md
│   ├── v0.2.11.md
│   ├── v0.2.12.md
│   ├── v0.2.13.md
│   ├── v0.2.14.md
│   ├── v0.2.15.md
│   ├── v0.2.16.md
│   ├── v0.2.17.md
│   ├── v0.2.18.md
│   ├── v0.2.19.md
│   ├── v0.2.2.md
│   ├── v0.2.20.md
│   ├── v0.2.3.md
│   ├── v0.2.4.md
│   ├── v0.2.5.md
│   ├── v0.2.6.md
│   ├── v0.2.7.md
│   ├── v0.2.8.md
│   ├── v0.2.9.md
│   ├── v0.3.0.md
│   ├── v0.4.0.md
│   ├── v0.4.1.md
│   ├── v0.4.2.md
│   ├── v0.5.0.md
│   ├── v0.6.0.md
│   ├── v0.6.1.md
│   ├── v0.6.2.md
│   ├── v0.7.0.md
│   ├── v0.8.0.md
│   ├── v0.8.1.md
│   ├── v0.8.2.md
│   ├── v0.8.3.md
│   ├── v0.8.4.md
│   ├── v0.9.0.md
│   ├── v0.9.1.md
│   └── v1.0.0.md
├── .changie.yaml
├── .env.example
├── .github
│   ├── actions
│   │   └── setup-python
│   │       └── action.yml
│   ├── CODEOWNERS
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   └── feature_request.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── changelog-check.yml
│       ├── codeowners-check.yml
│       ├── create-release-pr.yml
│       ├── release.yml
│       └── run-checks-pr.yaml
├── .gitignore
├── .pre-commit-config.yaml
├── .task
│   └── checksum
│       └── d2
├── .tool-versions
├── .vscode
│   ├── launch.json
│   └── settings.json
├── CHANGELOG.md
├── CONTRIBUTING.md
├── docs
│   ├── d2.png
│   └── diagram.d2
├── evals
│   └── semantic_layer
│       └── test_eval_semantic_layer.py
├── examples
│   ├── .DS_Store
│   ├── aws_strands_agent
│   │   ├── __init__.py
│   │   ├── .DS_Store
│   │   ├── dbt_data_scientist
│   │   │   ├── __init__.py
│   │   │   ├── .env.example
│   │   │   ├── agent.py
│   │   │   ├── prompts.py
│   │   │   ├── quick_mcp_test.py
│   │   │   ├── test_all_tools.py
│   │   │   └── tools
│   │   │       ├── __init__.py
│   │   │       ├── dbt_compile.py
│   │   │       ├── dbt_mcp.py
│   │   │       └── dbt_model_analyzer.py
│   │   ├── LICENSE
│   │   ├── README.md
│   │   └── requirements.txt
│   ├── google_adk_agent
│   │   ├── __init__.py
│   │   ├── main.py
│   │   ├── pyproject.toml
│   │   └── README.md
│   ├── langgraph_agent
│   │   ├── __init__.py
│   │   ├── .python-version
│   │   ├── main.py
│   │   ├── pyproject.toml
│   │   ├── README.md
│   │   └── uv.lock
│   ├── openai_agent
│   │   ├── __init__.py
│   │   ├── .gitignore
│   │   ├── .python-version
│   │   ├── main_streamable.py
│   │   ├── main.py
│   │   ├── pyproject.toml
│   │   ├── README.md
│   │   └── uv.lock
│   ├── openai_responses
│   │   ├── __init__.py
│   │   ├── .gitignore
│   │   ├── .python-version
│   │   ├── main.py
│   │   ├── pyproject.toml
│   │   ├── README.md
│   │   └── uv.lock
│   ├── pydantic_ai_agent
│   │   ├── __init__.py
│   │   ├── .gitignore
│   │   ├── .python-version
│   │   ├── main.py
│   │   ├── pyproject.toml
│   │   └── README.md
│   └── remote_mcp
│       ├── .python-version
│       ├── main.py
│       ├── pyproject.toml
│       ├── README.md
│       └── uv.lock
├── LICENSE
├── pyproject.toml
├── README.md
├── src
│   ├── client
│   │   ├── __init__.py
│   │   ├── main.py
│   │   └── tools.py
│   ├── dbt_mcp
│   │   ├── __init__.py
│   │   ├── .gitignore
│   │   ├── config
│   │   │   ├── config_providers.py
│   │   │   ├── config.py
│   │   │   ├── dbt_project.py
│   │   │   ├── dbt_yaml.py
│   │   │   ├── headers.py
│   │   │   ├── settings.py
│   │   │   └── transport.py
│   │   ├── dbt_admin
│   │   │   ├── __init__.py
│   │   │   ├── client.py
│   │   │   ├── constants.py
│   │   │   ├── run_results_errors
│   │   │   │   ├── __init__.py
│   │   │   │   ├── config.py
│   │   │   │   └── parser.py
│   │   │   └── tools.py
│   │   ├── dbt_cli
│   │   │   ├── binary_type.py
│   │   │   └── tools.py
│   │   ├── dbt_codegen
│   │   │   ├── __init__.py
│   │   │   └── tools.py
│   │   ├── discovery
│   │   │   ├── client.py
│   │   │   └── tools.py
│   │   ├── errors
│   │   │   ├── __init__.py
│   │   │   ├── admin_api.py
│   │   │   ├── base.py
│   │   │   ├── cli.py
│   │   │   ├── common.py
│   │   │   ├── discovery.py
│   │   │   ├── semantic_layer.py
│   │   │   └── sql.py
│   │   ├── gql
│   │   │   └── errors.py
│   │   ├── lsp
│   │   │   ├── __init__.py
│   │   │   ├── lsp_binary_manager.py
│   │   │   ├── lsp_client.py
│   │   │   ├── lsp_connection.py
│   │   │   └── tools.py
│   │   ├── main.py
│   │   ├── mcp
│   │   │   ├── create.py
│   │   │   └── server.py
│   │   ├── oauth
│   │   │   ├── client_id.py
│   │   │   ├── context_manager.py
│   │   │   ├── dbt_platform.py
│   │   │   ├── fastapi_app.py
│   │   │   ├── logging.py
│   │   │   ├── login.py
│   │   │   ├── refresh_strategy.py
│   │   │   ├── token_provider.py
│   │   │   └── token.py
│   │   ├── prompts
│   │   │   ├── __init__.py
│   │   │   ├── admin_api
│   │   │   │   ├── cancel_job_run.md
│   │   │   │   ├── get_job_details.md
│   │   │   │   ├── get_job_run_artifact.md
│   │   │   │   ├── get_job_run_details.md
│   │   │   │   ├── get_job_run_error.md
│   │   │   │   ├── list_job_run_artifacts.md
│   │   │   │   ├── list_jobs_runs.md
│   │   │   │   ├── list_jobs.md
│   │   │   │   ├── retry_job_run.md
│   │   │   │   └── trigger_job_run.md
│   │   │   ├── dbt_cli
│   │   │   │   ├── args
│   │   │   │   │   ├── full_refresh.md
│   │   │   │   │   ├── limit.md
│   │   │   │   │   ├── resource_type.md
│   │   │   │   │   ├── selectors.md
│   │   │   │   │   ├── sql_query.md
│   │   │   │   │   └── vars.md
│   │   │   │   ├── build.md
│   │   │   │   ├── compile.md
│   │   │   │   ├── docs.md
│   │   │   │   ├── list.md
│   │   │   │   ├── parse.md
│   │   │   │   ├── run.md
│   │   │   │   ├── show.md
│   │   │   │   └── test.md
│   │   │   ├── dbt_codegen
│   │   │   │   ├── args
│   │   │   │   │   ├── case_sensitive_cols.md
│   │   │   │   │   ├── database_name.md
│   │   │   │   │   ├── generate_columns.md
│   │   │   │   │   ├── include_data_types.md
│   │   │   │   │   ├── include_descriptions.md
│   │   │   │   │   ├── leading_commas.md
│   │   │   │   │   ├── materialized.md
│   │   │   │   │   ├── model_name.md
│   │   │   │   │   ├── model_names.md
│   │   │   │   │   ├── schema_name.md
│   │   │   │   │   ├── source_name.md
│   │   │   │   │   ├── table_name.md
│   │   │   │   │   ├── table_names.md
│   │   │   │   │   ├── tables.md
│   │   │   │   │   └── upstream_descriptions.md
│   │   │   │   ├── generate_model_yaml.md
│   │   │   │   ├── generate_source.md
│   │   │   │   └── generate_staging_model.md
│   │   │   ├── discovery
│   │   │   │   ├── get_all_models.md
│   │   │   │   ├── get_all_sources.md
│   │   │   │   ├── get_exposure_details.md
│   │   │   │   ├── get_exposures.md
│   │   │   │   ├── get_mart_models.md
│   │   │   │   ├── get_model_children.md
│   │   │   │   ├── get_model_details.md
│   │   │   │   ├── get_model_health.md
│   │   │   │   └── get_model_parents.md
│   │   │   ├── lsp
│   │   │   │   ├── args
│   │   │   │   │   ├── column_name.md
│   │   │   │   │   └── model_id.md
│   │   │   │   └── get_column_lineage.md
│   │   │   ├── prompts.py
│   │   │   └── semantic_layer
│   │   │       ├── get_dimensions.md
│   │   │       ├── get_entities.md
│   │   │       ├── get_metrics_compiled_sql.md
│   │   │       ├── list_metrics.md
│   │   │       └── query_metrics.md
│   │   ├── py.typed
│   │   ├── semantic_layer
│   │   │   ├── client.py
│   │   │   ├── gql
│   │   │   │   ├── gql_request.py
│   │   │   │   └── gql.py
│   │   │   ├── levenshtein.py
│   │   │   ├── tools.py
│   │   │   └── types.py
│   │   ├── sql
│   │   │   └── tools.py
│   │   ├── telemetry
│   │   │   └── logging.py
│   │   ├── tools
│   │   │   ├── annotations.py
│   │   │   ├── definitions.py
│   │   │   ├── policy.py
│   │   │   ├── register.py
│   │   │   ├── tool_names.py
│   │   │   └── toolsets.py
│   │   └── tracking
│   │       └── tracking.py
│   └── remote_mcp
│       ├── __init__.py
│       └── session.py
├── Taskfile.yml
├── tests
│   ├── __init__.py
│   ├── env_vars.py
│   ├── integration
│   │   ├── __init__.py
│   │   ├── dbt_codegen
│   │   │   ├── __init__.py
│   │   │   └── test_dbt_codegen.py
│   │   ├── discovery
│   │   │   └── test_discovery.py
│   │   ├── initialization
│   │   │   ├── __init__.py
│   │   │   └── test_initialization.py
│   │   ├── lsp
│   │   │   └── test_lsp_connection.py
│   │   ├── remote_mcp
│   │   │   └── test_remote_mcp.py
│   │   ├── remote_tools
│   │   │   └── test_remote_tools.py
│   │   ├── semantic_layer
│   │   │   └── test_semantic_layer.py
│   │   └── tracking
│   │       └── test_tracking.py
│   ├── mocks
│   │   └── config.py
│   └── unit
│       ├── __init__.py
│       ├── config
│       │   ├── __init__.py
│       │   ├── test_config.py
│       │   └── test_transport.py
│       ├── dbt_admin
│       │   ├── __init__.py
│       │   ├── test_client.py
│       │   ├── test_error_fetcher.py
│       │   └── test_tools.py
│       ├── dbt_cli
│       │   ├── __init__.py
│       │   ├── test_cli_integration.py
│       │   └── test_tools.py
│       ├── dbt_codegen
│       │   ├── __init__.py
│       │   └── test_tools.py
│       ├── discovery
│       │   ├── __init__.py
│       │   ├── conftest.py
│       │   ├── test_exposures_fetcher.py
│       │   └── test_sources_fetcher.py
│       ├── lsp
│       │   ├── __init__.py
│       │   ├── test_lsp_client.py
│       │   ├── test_lsp_connection.py
│       │   └── test_lsp_tools.py
│       ├── oauth
│       │   ├── test_credentials_provider.py
│       │   ├── test_fastapi_app_pagination.py
│       │   └── test_token.py
│       ├── tools
│       │   ├── test_disable_tools.py
│       │   ├── test_tool_names.py
│       │   ├── test_tool_policies.py
│       │   └── test_toolsets.py
│       └── tracking
│           └── test_tracking.py
├── ui
│   ├── .gitignore
│   ├── assets
│   │   ├── dbt_logo BLK.svg
│   │   └── dbt_logo WHT.svg
│   ├── eslint.config.js
│   ├── index.html
│   ├── package.json
│   ├── pnpm-lock.yaml
│   ├── pnpm-workspace.yaml
│   ├── README.md
│   ├── src
│   │   ├── App.css
│   │   ├── App.tsx
│   │   ├── global.d.ts
│   │   ├── index.css
│   │   ├── main.tsx
│   │   └── vite-env.d.ts
│   ├── tsconfig.app.json
│   ├── tsconfig.json
│   ├── tsconfig.node.json
│   └── vite.config.ts
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/tests/integration/lsp/test_lsp_connection.py:
--------------------------------------------------------------------------------

```python
"""Integration-style tests for LSP connection using real instances instead of mocks.

These tests use real sockets, asyncio primitives, and actual data flow
to provide more realistic test coverage compared to heavily mocked unit tests.
"""

import asyncio
import json
import socket

import pytest

from dbt_mcp.lsp.lsp_connection import (
    LSPConnection,
    LspEventName,
    JsonRpcMessage,
)


class TestRealSocketOperations:
    """Tests using real sockets to verify actual network communication."""

    def test_setup_socket_real(self, tmp_path):
        """Test socket setup with real socket binding."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Use real socket
        conn.setup_socket()

        try:
            # Verify real socket was created and bound
            assert conn._socket is not None
            assert isinstance(conn._socket, socket.socket)
            assert conn.port > 0  # OS assigned a port
            assert conn.host == "127.0.0.1"

            # Verify socket is actually listening
            sockname = conn._socket.getsockname()
            assert sockname[0] == "127.0.0.1"
            assert sockname[1] == conn.port

        finally:
            # Cleanup
            if conn._socket:
                conn._socket.close()

    def test_socket_reuse_address(self, tmp_path):
        """Test that SO_REUSEADDR is set on real socket."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")
        conn.setup_socket()

        try:
            # Verify SO_REUSEADDR is set (value varies by platform, just check it's non-zero)
            reuse = conn._socket.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
            assert reuse != 0

        finally:
            if conn._socket:
                conn._socket.close()

    @pytest.mark.asyncio
    async def test_socket_accept_with_real_client(self, tmp_path):
        """Test socket accept with real client connection."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test", connection_timeout=2.0)
        conn.setup_socket()

        try:
            # Create real client socket that connects to the server
            client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

            async def client_connect():
                await asyncio.sleep(0.1)  # Let server start listening
                await asyncio.get_running_loop().run_in_executor(
                    None, client_socket.connect, (conn.host, conn.port)
                )

            async def server_accept():
                conn._socket.settimeout(conn.connection_timeout)
                connection, addr = await asyncio.get_running_loop().run_in_executor(
                    None, conn._socket.accept
                )
                return connection, addr

            # Run both concurrently
            client_task = asyncio.create_task(client_connect())
            server_result = await server_accept()

            await client_task

            connection, client_addr = server_result
            assert connection is not None
            assert client_addr[0] in ("127.0.0.1", "::1")  # IPv4 or IPv6 localhost

            # Cleanup
            connection.close()
            client_socket.close()

        finally:
            if conn._socket:
                conn._socket.close()


class TestRealAsyncioQueues:
    """Tests using real asyncio queues to verify message queueing."""

    def test_send_message_with_real_queue(self, tmp_path):
        """Test message sending with real asyncio queue."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # The _outgoing_queue is already a real asyncio.Queue
        assert isinstance(conn._outgoing_queue, asyncio.Queue)
        assert conn._outgoing_queue.empty()

        # Send a message
        message = JsonRpcMessage(id=1, method="test", params={"key": "value"})
        conn._send_message(message)

        # Verify message was actually queued
        assert not conn._outgoing_queue.empty()
        data = conn._outgoing_queue.get_nowait()

        # Verify LSP protocol format
        assert isinstance(data, bytes)
        assert b"Content-Length:" in data
        assert b"\r\n\r\n" in data
        assert b'"jsonrpc"' in data
        assert b'"2.0"' in data
        assert b'"test"' in data

    def test_multiple_messages_queue_order(self, tmp_path):
        """Test that multiple messages maintain FIFO order in real queue."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Send multiple messages
        msg1 = JsonRpcMessage(id=1, method="first")
        msg2 = JsonRpcMessage(id=2, method="second")
        msg3 = JsonRpcMessage(id=3, method="third")

        conn._send_message(msg1)
        conn._send_message(msg2)
        conn._send_message(msg3)

        # Verify queue size
        assert conn._outgoing_queue.qsize() == 3

        # Verify FIFO order
        data1 = conn._outgoing_queue.get_nowait()
        data2 = conn._outgoing_queue.get_nowait()
        data3 = conn._outgoing_queue.get_nowait()

        assert b'"first"' in data1
        assert b'"second"' in data2
        assert b'"third"' in data3

        # Queue should be empty
        assert conn._outgoing_queue.empty()


class TestRealAsyncioFutures:
    """Tests using real asyncio futures to verify async behavior."""

    @pytest.mark.asyncio
    async def test_handle_response_with_real_future(self, tmp_path):
        """Test handling response with real asyncio future."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Create real future in current event loop
        future = asyncio.get_running_loop().create_future()
        conn.state.pending_requests[42] = future

        # Handle response in the same loop
        message = JsonRpcMessage(id=42, result={"success": True, "data": "test"})
        conn._handle_incoming_message(message)

        # Wait for future to be resolved (should be immediate via call_soon_threadsafe)
        result = await asyncio.wait_for(future, timeout=1.0)

        assert result == {"success": True, "data": "test"}
        assert 42 not in conn.state.pending_requests

    @pytest.mark.asyncio
    async def test_handle_error_with_real_future(self, tmp_path):
        """Test handling error response with real asyncio future."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Create real future
        future = asyncio.get_running_loop().create_future()
        conn.state.pending_requests[42] = future

        # Handle error response
        message = JsonRpcMessage(
            id=42, error={"code": -32601, "message": "Method not found"}
        )
        conn._handle_incoming_message(message)

        # Future should be rejected with exception
        with pytest.raises(RuntimeError, match="LSP error"):
            await asyncio.wait_for(future, timeout=1.0)

        assert 42 not in conn.state.pending_requests

    @pytest.mark.asyncio
    async def test_notification_futures_real(self, tmp_path):
        """Test waiting for notifications with real futures."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Register to wait for a notification
        future = conn.wait_for_notification(LspEventName.compileComplete)

        # Verify it's a real future
        assert isinstance(future, asyncio.Future)
        assert not future.done()

        # Simulate receiving the notification
        message = JsonRpcMessage(
            method="dbt/lspCompileComplete", params={"success": True, "errors": []}
        )
        conn._handle_incoming_message(message)

        # Wait for notification
        result = await asyncio.wait_for(future, timeout=1.0)

        assert result == {"success": True, "errors": []}
        assert conn.state.compiled is True


class TestRealSocketCommunication:
    """Tests using real socket pairs to verify end-to-end communication."""

    @pytest.mark.asyncio
    async def test_socket_pair_communication(self, tmp_path):
        """Test bidirectional communication using socketpair."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Create a real socket pair (connected sockets)
        server_socket, client_socket = socket.socketpair()
        conn._connection = server_socket

        try:
            # Send a message through the connection
            message = JsonRpcMessage(id=1, method="test", params={"foo": "bar"})
            conn._send_message(message)

            # Get the data from the queue
            data = conn._outgoing_queue.get_nowait()

            # Actually send it through the socket
            await asyncio.get_running_loop().run_in_executor(
                None, server_socket.sendall, data
            )

            # Read it back on the client side
            received_data = await asyncio.get_running_loop().run_in_executor(
                None, client_socket.recv, 4096
            )

            # Verify we got the complete LSP message
            assert b"Content-Length:" in received_data
            assert b"\r\n\r\n" in received_data
            assert b'"test"' in received_data
            assert b'"foo"' in received_data
            assert b'"bar"' in received_data

        finally:
            server_socket.close()
            client_socket.close()

    @pytest.mark.asyncio
    async def test_message_roundtrip_real(self, tmp_path):
        """Test complete message send and parse roundtrip."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Create socket pair
        server_socket, client_socket = socket.socketpair()
        conn._connection = server_socket

        try:
            # Original message
            original_message = JsonRpcMessage(
                id=123,
                method="textDocument/completion",
                params={
                    "textDocument": {"uri": "file:///test.sql"},
                    "position": {"line": 10, "character": 5},
                },
            )

            # Send through connection
            conn._send_message(original_message)
            data = conn._outgoing_queue.get_nowait()
            await asyncio.get_running_loop().run_in_executor(
                None, server_socket.sendall, data
            )

            # Receive on client side
            received_data = await asyncio.get_running_loop().run_in_executor(
                None, client_socket.recv, 4096
            )

            # Parse it back
            parsed_message, remaining = conn._parse_message(received_data)

            # Verify roundtrip integrity
            assert parsed_message is not None
            assert parsed_message.id == original_message.id
            assert parsed_message.method == original_message.method
            assert parsed_message.params == original_message.params
            assert remaining == b""

        finally:
            server_socket.close()
            client_socket.close()

    @pytest.mark.asyncio
    async def test_multiple_messages_streaming(self, tmp_path):
        """Test streaming multiple messages through real socket."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Create socket pair
        server_socket, client_socket = socket.socketpair()
        conn._connection = server_socket

        try:
            # Set non-blocking for client to avoid hangs
            client_socket.setblocking(False)

            # Send multiple messages
            messages = [
                JsonRpcMessage(id=1, method="initialize"),
                JsonRpcMessage(method="initialized", params={}),
                JsonRpcMessage(id=2, method="textDocument/didOpen"),
            ]

            for msg in messages:
                conn._send_message(msg)
                data = conn._outgoing_queue.get_nowait()
                await asyncio.get_running_loop().run_in_executor(
                    None, server_socket.sendall, data
                )

            # Receive all data on client side with timeout
            received_data = b""
            client_socket.setblocking(True)
            client_socket.settimeout(1.0)

            try:
                while True:
                    chunk = await asyncio.get_running_loop().run_in_executor(
                        None, client_socket.recv, 4096
                    )
                    if not chunk:
                        break
                    received_data += chunk

                    # Try to parse - if we have all 3 messages, we're done
                    temp_buffer = received_data
                    temp_count = 0
                    while True:
                        msg, temp_buffer = conn._parse_message(temp_buffer)
                        if msg is None:
                            break
                        temp_count += 1

                    if temp_count >= 3:
                        break
            except TimeoutError:
                pass  # Expected when all data is received

            # Parse all messages
            buffer = received_data
            parsed_messages = []

            while buffer:
                msg, buffer = conn._parse_message(buffer)
                if msg is None:
                    break
                parsed_messages.append(msg)

            # Verify all messages were received and parsed correctly
            assert len(parsed_messages) == 3
            assert parsed_messages[0].id == 1
            assert parsed_messages[0].method == "initialize"
            assert parsed_messages[1].method == "initialized"
            assert parsed_messages[2].id == 2

        finally:
            server_socket.close()
            client_socket.close()


class TestRealMessageParsing:
    """Tests parsing with real byte streams."""

    def test_parse_real_lsp_message(self, tmp_path):
        """Test parsing a real LSP protocol message."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Create a real LSP message exactly as it would be sent
        content = json.dumps(
            {
                "jsonrpc": "2.0",
                "id": 1,
                "result": {
                    "capabilities": {
                        "textDocumentSync": 2,
                        "completionProvider": {"triggerCharacters": ["."]},
                    }
                },
            }
        )
        content_bytes = content.encode("utf-8")
        header = f"Content-Length: {len(content_bytes)}\r\n\r\n"
        full_message = header.encode("utf-8") + content_bytes

        # Parse it
        message, remaining = conn._parse_message(full_message)

        assert message is not None
        assert message.id == 1
        assert "capabilities" in message.result
        assert message.result["capabilities"]["textDocumentSync"] == 2
        assert remaining == b""

    def test_parse_chunked_message_real(self, tmp_path):
        """Test parsing message that arrives in multiple chunks."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Create a message
        content = json.dumps({"jsonrpc": "2.0", "id": 1, "method": "test"})
        content_bytes = content.encode("utf-8")
        header = f"Content-Length: {len(content_bytes)}\r\n\r\n"
        full_message = header.encode("utf-8") + content_bytes

        # Split into chunks (simulate network chunking)
        chunk1 = full_message[:20]
        chunk2 = full_message[20:40]
        chunk3 = full_message[40:]

        # Parse first chunk - should be incomplete
        msg1, buffer = conn._parse_message(chunk1)
        assert msg1 is None
        assert buffer == chunk1

        # Add second chunk - still incomplete
        buffer += chunk2
        msg2, buffer = conn._parse_message(buffer)
        assert msg2 is None

        # Add final chunk - should complete
        buffer += chunk3
        msg3, buffer = conn._parse_message(buffer)
        assert msg3 is not None
        assert msg3.id == 1
        assert msg3.method == "test"
        assert buffer == b""


class TestRealConcurrentOperations:
    """Tests with real concurrent async operations."""

    @pytest.mark.asyncio
    async def test_concurrent_request_futures(self, tmp_path):
        """Test handling multiple concurrent requests with real futures."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Create multiple real futures for concurrent requests
        futures = {}
        for i in range(10):
            future = asyncio.get_running_loop().create_future()
            futures[i] = future
            conn.state.pending_requests[i] = future

        # Simulate responses arriving concurrently
        async def respond(request_id: int, delay: float):
            await asyncio.sleep(delay)
            message = JsonRpcMessage(id=request_id, result={"request_id": request_id})
            conn._handle_incoming_message(message)

        # Start all responses with random delays
        response_tasks = [asyncio.create_task(respond(i, i * 0.01)) for i in range(10)]

        # Wait for all futures to resolve
        results = await asyncio.gather(*[futures[i] for i in range(10)])

        # Verify all completed correctly
        assert len(results) == 10
        for i, result in enumerate(results):
            assert result["request_id"] == i

        # All requests should be removed
        assert len(conn.state.pending_requests) == 0

        # Cleanup
        await asyncio.gather(*response_tasks)

    @pytest.mark.asyncio
    async def test_concurrent_notifications_real(self, tmp_path):
        """Test multiple futures waiting for the same notification."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Create multiple waiters for the same event
        future1 = conn.wait_for_notification(LspEventName.compileComplete)
        future2 = conn.wait_for_notification(LspEventName.compileComplete)
        future3 = conn.wait_for_notification(LspEventName.compileComplete)

        # All should be real futures
        assert all(isinstance(f, asyncio.Future) for f in [future1, future2, future3])

        # Send the notification
        message = JsonRpcMessage(
            method="dbt/lspCompileComplete", params={"status": "success"}
        )
        conn._handle_incoming_message(message)

        # All futures should resolve
        results = await asyncio.wait_for(
            asyncio.gather(future1, future2, future3), timeout=1.0
        )

        assert all(r == {"status": "success"} for r in results)


class TestRealStateManagement:
    """Tests using real state objects."""

    def test_real_state_initialization(self, tmp_path):
        """Test that connection uses real LspConnectionState."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Verify state is a real instance
        from dbt_mcp.lsp.lsp_connection import LspConnectionState

        assert isinstance(conn.state, LspConnectionState)
        assert conn.state.initialized is False
        assert conn.state.compiled is False
        assert isinstance(conn.state.pending_requests, dict)
        assert isinstance(conn.state.pending_notifications, dict)

    def test_real_request_id_generation(self, tmp_path):
        """Test real request ID counter."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Get sequential IDs
        ids = [conn.state.get_next_request_id() for _ in range(100)]

        # Verify they're sequential (starting point may vary if other tests ran)
        # Just verify they are sequential and unique
        first_id = ids[0]
        assert ids[-1] == first_id + 99
        assert ids == list(range(first_id, first_id + 100))
        assert len(set(ids)) == 100  # All unique

    def test_real_state_updates(self, tmp_path):
        """Test that state updates work with real instances."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Update state
        conn.state.initialized = True
        conn.state.capabilities = {"test": True}
        conn.state.compiled = True

        # Verify updates persist
        assert conn.state.initialized is True
        assert conn.state.capabilities == {"test": True}
        assert conn.state.compiled is True

```

--------------------------------------------------------------------------------
/ui/src/App.css:
--------------------------------------------------------------------------------

```css
/* Reset and base styles */
* {
  box-sizing: border-box;
}

body {
  width: 100%;
  margin: 0;
  padding: 0;
  font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Roboto', 'Oxygen',
    'Ubuntu', 'Cantarell', 'Fira Sans', 'Droid Sans', 'Helvetica Neue',
    sans-serif;
  line-height: 1.6;
  -webkit-font-smoothing: antialiased;
  -moz-osx-font-smoothing: grayscale;
  background-color: #fff;
  color: #1c1a19;
}

p {
  margin-bottom: 1rem;
}

@media (prefers-color-scheme: dark) {
  body {
    background-color: #1c1a19;
    color: #f6f6f6;
  }

  p {
    margin-bottom: 1rem;
  }
}

/* Logo */
.logo-container {
  position: fixed;
  top: 1rem;
  left: 1rem;
  z-index: 1000;
}

.logo {
  height: 2rem;
  width: auto;
  transition: opacity 0.2s ease-in-out;
}

.logo-light {
  display: block;
}

.logo-dark {
  display: none;
}

/* Main layout */
.app-container {
  min-height: 100vh;
  display: flex;
  justify-content: center;
  align-items: flex-start;
  padding: 2rem 1rem;
}

.app-content {
  width: 100%;
  max-width: 600px;
  display: flex;
  flex-direction: column;
  gap: 2rem;
}

/* Header */
.app-header {
  text-align: center;
  margin-bottom: 1rem;
}

.app-header h1 {
  margin: 0 0 0.5rem 0;
  font-size: 2.5rem;
  font-weight: 700;
  letter-spacing: -0.025em;
}

.app-header p {
  margin: 0;
  font-size: 1.125rem;
  opacity: 0.7;
}

/* Sections */
section {
  background: #fff;
  border-radius: 12px;
  border: 1px solid #ebe9e9;
  overflow: visible;
  box-shadow: 0 1px 3px 0 rgba(0, 0, 0, 0.1), 0 1px 2px 0 rgba(0, 0, 0, 0.06);
}

/* Specific overflow handling for sections with dropdowns */
.project-selection-section {
  overflow: visible;
}

.section-header {
  padding: 1.5rem 1.5rem 0 1.5rem;
  border-bottom: 1px solid #ebe9e9;
  margin-bottom: 1.5rem;
}

.section-header h2 {
  margin: 0 0 0.5rem 0;
  font-size: 1.5rem;
  font-weight: 600;
}

.section-header h3 {
  margin: 0 0 0.5rem 0;
  font-size: 1.25rem;
  font-weight: 600;
}

.section-header p {
  margin: 0 0 1.5rem 0;
  opacity: 0.7;
  font-size: 0.875rem;
}

/* Form content */
.form-content {
  padding: 0 1.5rem 1.5rem 1.5rem;
}

.form-group {
  margin-bottom: 1rem;
}

.form-label {
  display: block;
  margin-bottom: 0.5rem;
  font-weight: 500;
  font-size: 0.875rem;
}

.form-select {
  width: 100%;
  padding: 0.875rem 3rem 0.875rem 1rem;
  border: 1.5px solid #ebe9e9;
  border-radius: 12px;
  font-size: 1rem;
  font-weight: 500;
  background-color: #fff;
  background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%231c1a19' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M6 8l4 4 4-4'/%3e%3c/svg%3e");
  background-position: right 0.75rem center;
  background-repeat: no-repeat;
  background-size: 1.25rem 1.25rem;
  cursor: pointer;
  transition: all 0.2s ease-in-out;
  appearance: none;
  -webkit-appearance: none;
  -moz-appearance: none;
}

.form-select:focus {
  outline: none;
  border-color: #3b82f6;
  background-color: white;
  box-shadow:
    0 0 0 3px rgba(59, 130, 246, 0.12),
    0 4px 6px -1px rgba(0, 0, 0, 0.1),
    0 2px 4px -1px rgba(0, 0, 0, 0.06);
  transform: translateY(-1px);
}

.form-select:hover:not(:focus) {
  border-color: #9ca3af;
  background-color: white;
  box-shadow:
    0 2px 4px -1px rgba(0, 0, 0, 0.1),
    0 1px 2px -1px rgba(0, 0, 0, 0.06);
}

.form-select:disabled {
  background-color: #f3f4f6;
  border-color: #e5e7eb;
  cursor: not-allowed;
  opacity: 0.7;
}

/* Custom dropdown */
.custom-dropdown {
  position: relative;
  width: 100%;
  z-index: 999999;
  /* Ensure proper stacking context */
  isolation: isolate;
}

.dropdown-trigger {
  width: 100%;
  padding: 0.875rem 3rem 0.875rem 1rem;
  border: 1.5px solid #ebe9e9;
  border-radius: 12px;
  font-size: 1rem;
  font-weight: 500;
  background-color: #fff;
  background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%231c1a19' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M6 8l4 4 4-4'/%3e%3c/svg%3e");
  background-position: right 0.75rem center;
  background-repeat: no-repeat;
  background-size: 1.25rem 1.25rem;
  cursor: pointer;
  transition: all 0.2s ease-in-out;
  text-align: left;
  color: #1c1a19;
}

.dropdown-trigger:focus {
  outline: none;
  border-color: #3b82f6;
  background-color: white;
  box-shadow:
    0 0 0 3px rgba(59, 130, 246, 0.12),
    0 4px 6px -1px rgba(0, 0, 0, 0.1),
    0 2px 4px -1px rgba(0, 0, 0, 0.06);
  transform: translateY(-1px);
}

.dropdown-trigger:hover:not(:focus) {
  border-color: #9ca3af;
  background-color: white;
  box-shadow:
    0 2px 4px -1px rgba(0, 0, 0, 0.1),
    0 1px 2px -1px rgba(0, 0, 0, 0.06);
}

.dropdown-trigger.open {
  border-color: #3b82f6;
  background-color: white;
  background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%233b82f6' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M14 12l-4-4-4 4'/%3e%3c/svg%3e");
  box-shadow:
    0 0 0 3px rgba(59, 130, 246, 0.12),
    0 4px 6px -1px rgba(0, 0, 0, 0.1),
    0 2px 4px -1px rgba(0, 0, 0, 0.06);
  border-bottom-left-radius: 4px;
  border-bottom-right-radius: 4px;
}

.dropdown-trigger.placeholder {
  color: #9ca3af;
  font-weight: 400;
}

.dropdown-options {
  position: absolute;
  top: 100%;
  left: 0;
  right: 0;
  background: white;
  border: 1.5px solid;
  border-top: none;
  border-bottom-left-radius: 12px;
  border-bottom-right-radius: 12px;
  box-shadow:
    0 0 0 3px rgba(59, 130, 246, 0.12),
    0 10px 15px -3px rgba(0, 0, 0, 0.1),
    0 4px 6px -2px rgba(0, 0, 0, 0.05);
  z-index: 999999;
  animation: dropdownSlideIn 0.15s ease-out;
  /* Ensure proper rendering and isolation */
  isolation: isolate;
  contain: layout style;
  /* Add scrolling for long lists */
  max-height: 300px;
  overflow-y: auto;
}

/* Removed dropdown-options-fixed - using simple absolute positioning */

@keyframes dropdownSlideIn {
  0% {
    opacity: 0;
    transform: translateY(-8px);
  }
  100% {
    opacity: 1;
    transform: translateY(0);
  }
}

.dropdown-option {
  padding: 0.875rem 1rem;
  cursor: pointer;
  transition: all 0.15s ease-in-out;
  border: none;
  background: none;
  width: 100%;
  text-align: left;
  font-size: 1rem;
  color: #374151;
  display: flex;
  flex-direction: column;
  gap: 0.125rem;
}

.dropdown-option:hover {
  background-color: #f8fafc;
  color: #1f2937;
}

.dropdown-option:focus {
  outline: none;
  background-color: #eff6ff;
  color: #1e40af;
}

.dropdown-option:active {
  background-color: #dbeafe;
}

.dropdown-option.selected {
  background-color: #f3f4f6;
  color: #374151;
}

.dropdown-option.selected:hover {
  background-color: #e5e7eb;
}

.option-primary {
  font-weight: 500;
  line-height: 1.4;
}

.option-secondary {
  font-size: 0.875rem;
  opacity: 0.7;
  font-weight: 400;
}

.dropdown-option.selected .option-secondary {
  opacity: 0.9;
}

/* Dropdown scrollbar styling */
.dropdown-options::-webkit-scrollbar {
  width: 8px;
}

.dropdown-options::-webkit-scrollbar-track {
  background: #f8fafc;
  border-radius: 4px;
}

.dropdown-options::-webkit-scrollbar-thumb {
  background: #cbd5e1;
  border-radius: 4px;
  border: 1px solid #f8fafc;
}

.dropdown-options::-webkit-scrollbar-thumb:hover {
  background: #94a3b8;
}

/* Loading state */
.loading-state {
  display: flex;
  align-items: center;
  gap: 0.75rem;
  padding: 1rem;
  background-color: #fff;
  border: 1px solid #ebe9e9;
  border-radius: 8px;
  margin: 1rem 1.5rem;
}

.spinner {
  width: 20px;
  height: 20px;
  border: 2px solid #ebe9e9;
  border-top: 2px solid #1c1a19;
  border-radius: 50%;
  animation: spin 1s linear infinite;
}

@keyframes spin {
  0% {
    transform: rotate(0deg);
  }

  100% {
    transform: rotate(360deg);
  }
}

/* Error state */
.error-state {
  padding: 1rem;
  background-color: #fef2f2;
  border: 1px solid #fecaca;
  border-radius: 8px;
  margin: 1rem 1.5rem;
}

.error-state strong {
  display: block;
  margin-bottom: 0.25rem;
  font-weight: 600;
}

.error-state p {
  margin: 0;
  font-size: 0.875rem;
  opacity: 0.8;
}

/* OAuth Error Section */
.error-section {
  background: #fff;
  border: 1px solid #fecaca;
}

.error-details {
  padding: 0 1.5rem 1.5rem 1.5rem;
  display: flex;
  flex-direction: column;
  gap: 1rem;
}

.error-item {
  display: flex;
  flex-direction: column;
  gap: 0.5rem;
}

.error-item strong {
  font-weight: 500;
  font-size: 0.875rem;
  color: #991b1b;
}

.error-code {
  display: inline-block;
  padding: 0.5rem 0.75rem;
  background-color: #fef2f2;
  border: 1px solid #fecaca;
  border-radius: 6px;
  font-family: 'SF Mono', Monaco, 'Cascadia Code', 'Roboto Mono', Consolas, 'Courier New', monospace;
  font-size: 0.875rem;
  color: #991b1b;
  font-weight: 500;
}

.error-description {
  margin: 0;
  padding: 0.75rem;
  background-color: #fef2f2;
  border: 1px solid #fecaca;
  border-radius: 6px;
  color: #991b1b;
  font-size: 0.875rem;
  line-height: 1.5;
}

.error-actions {
  margin-top: 0.5rem;
  padding: 1rem;
  background-color: #fffbeb;
  border: 1px solid #fde68a;
  border-radius: 6px;
}

.error-actions p {
  margin: 0;
  color: #92400e;
  font-size: 0.875rem;
  line-height: 1.5;
}

/* Context details */
.context-details {
  padding: 0 1.5rem 1.5rem 1.5rem;
  display: flex;
  flex-direction: column;
  gap: 1rem;
}

.context-item {
  display: flex;
  flex-direction: column;
  gap: 0.25rem;
}

.context-item strong {
  font-weight: 500;
  font-size: 0.875rem;
  opacity: 0.7;
}

.environment-details {
  display: flex;
  align-items: center;
  gap: 0.5rem;
}

.env-name {
  font-weight: 500;
}

.env-type {
  font-size: 0.875rem;
  opacity: 0.6;
}

/* Actions section */
.actions-section {
  padding: 1.5rem;
  text-align: center;
  background-color: #f9fafb;
}

/* Button container */
.button-container {
  display: flex;
  justify-content: center;
  align-items: center;
}

/* Button */
.primary-button {
  display: inline-flex;
  align-items: center;
  padding: 0.75rem 1.5rem;
  background-color: #1c1a19;
  color: #fff;
  border: 1px solid #1c1a19;
  border-radius: 8px;
  font-size: 1rem;
  font-weight: 500;
  cursor: pointer;
  transition:
    background-color 0.15s ease-in-out,
    transform 0.15s ease-in-out,
    opacity 0.15s ease-in-out;
}

.primary-button:hover {
  background-color: #2d2a28;
  border-color: #2d2a28;
  transform: translateY(-1px);
}

.primary-button:focus {
  outline: none;
  box-shadow: 0 0 0 3px rgba(28, 26, 25, 0.2);
  border-color: #2d2a28;
}

.primary-button:active {
  transform: translateY(0);
  background-color: #3d3a38;
}

.primary-button:disabled {
  background-color: #d1d5db;
  border-color: #d1d5db;
  color: #6b7280;
  cursor: not-allowed;
  transform: none;
  box-shadow: none;
  opacity: 0.65;
}

  .primary-button:disabled:hover,
.primary-button:disabled:focus,
.primary-button:disabled:active {
  background-color: #d1d5db;
  border-color: #d1d5db;
  color: #6b7280;
  transform: none;
  box-shadow: none;
}

/* Completion section */
.completion-section {
  padding: 0;
}

.completion-card {
  padding: 2rem 1.5rem;
  text-align: center;
}

.completion-card h2 {
  margin: 0 0 1rem 0;
  font-size: 1.75rem;
  font-weight: 600;
}

.completion-card p {
  margin: 0;
  font-size: 1rem;
  line-height: 1.6;
}

/* Response section */
.response-section {
  padding: 1.5rem;
}

.response-text {
  background-color: #fff;
  border: 1px solid #ebe9e9;
  border-radius: 8px;
  padding: 1rem;
  font-family: 'SF Mono', Monaco, 'Cascadia Code', 'Roboto Mono', Consolas, 'Courier New', monospace;
  font-size: 0.875rem;
  line-height: 1.5;
  white-space: pre-wrap;
  word-break: break-word;
  overflow-x: auto;
}

/* Responsive design */
@media (max-width: 768px) {
  .logo-container {
    top: 0.5rem;
    left: 0.5rem;
  }

  .logo {
    height: 1.5rem;
  }

  .app-container {
    padding: 1rem 0.5rem;
  }

  .app-content {
    max-width: 100%;
  }

  .app-header h1 {
    font-size: 2rem;
  }

  .section-header {
    padding: 1rem 1rem 0 1rem;
    margin-bottom: 1rem;
  }

  .form-content,
  .context-details,
  .actions-section,
  .response-section {
    padding-left: 1rem;
    padding-right: 1rem;
  }

  .loading-state,
  .error-state {
    margin-left: 1rem;
    margin-right: 1rem;
  }
}

@media (max-width: 480px) {
  .logo {
    height: 1.25rem;
  }

  .app-container {
    padding: 0.5rem 0.25rem;
  }

  .app-header h1 {
    font-size: 1.75rem;
  }

  .primary-button {
    width: 100%;
  }
}

/* Light mode styles */
@media (prefers-color-scheme: light) {
  body {
    background-color: #fff;
    color: #1c1a19;
  }

  /* Sections */
  section {
    background: #fff;
    border-color: #ebe9e9;
  }

  .section-header {
    border-bottom-color: #ebe9e9;
  }

  .section-header h2,
  .section-header h3 {
    color: #1c1a19;
  }

  .section-header p {
    color: #1c1a19;
  }

  /* Form elements */
  .form-label {
    color: #1c1a19;
  }

  .form-select {
    background-color: #fff;
    border-color: #ebe9e9;
    color: #1c1a19;
    background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%231c1a19' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M6 8l4 4 4-4'/%3e%3c/svg%3e");
  }

  .form-select:focus {
    background-color: #fff;
    border-color: #ebe9e9;
  }

  .form-select:hover:not(:focus) {
    background-color: #fff;
    border-color: #ebe9e9;
  }

  .form-select:disabled {
    background-color: #f9f9f9;
    border-color: #ebe9e9;
  }

  /* Custom dropdown */
  .dropdown-trigger {
    background-color: #fff;
    border-color: #ebe9e9;
    color: #1c1a19;
    background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%231c1a19' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M6 8l4 4 4-4'/%3e%3c/svg%3e");
  }

  .dropdown-trigger:focus {
    background-color: #fff;
    border-color: #ebe9e9;
  }

  .dropdown-trigger:hover:not(:focus) {
    background-color: #fff;
    border-color: #ebe9e9;
  }

  .dropdown-trigger.open {
    background-color: #fff;
    border-color: #ebe9e9;
    background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%231c1a19' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M14 12l-4-4-4 4'/%3e%3c/svg%3e");
  }

  .dropdown-options {
    background: #fff;
    border-color: #ebe9e9;
  }

  .dropdown-option {
    color: #1c1a19;
  }

  .dropdown-option:hover {
    background-color: #f9f9f9;
    color: #1c1a19;
  }

  .dropdown-option:focus {
    background-color: #f9f9f9;
    color: #1c1a19;
  }

  .dropdown-option:active {
    background-color: #f3f3f3;
  }

  .dropdown-option.selected {
    background-color: #f9f9f9;
    color: #1c1a19;
  }

  .dropdown-option.selected:hover {
    background-color: #f3f3f3;
  }

  /* Loading state */
  .loading-state {
    background-color: #fff;
    border: 1px solid #ebe9e9;
    color: #1c1a19;
  }

  .spinner {
    border-color: #ebe9e9;
    border-top-color: #1c1a19;
  }

  /* Error state */
  .error-state {
    background-color: #fef2f2;
    border-color: #fecaca;
    color: #991b1b;
  }

  .error-state strong {
    color: #991b1b;
  }

  .error-state p {
    color: #991b1b;
  }

  /* Context details */
  .context-item strong {
    color: #1c1a19;
  }

  .env-name {
    color: #1c1a19;
  }

  .env-type {
    color: #1c1a19;
  }

  /* Actions section */
  .actions-section {
    background-color: #fff;
  }

  /* Response section */
  .response-text {
    background-color: #fff;
    border-color: #ebe9e9;
    color: #1c1a19;
  }

  /* App header */
  .app-header h1 {
    color: #1c1a19;
  }

  .app-header p {
    color: #1c1a19;
  }

  /* Button light mode */
  .primary-button {
    background-color: #1c1a19;
    color: #fff;
    border-color: #1c1a19;
  }

  .primary-button:hover {
    background-color: #2d2a28;
    border-color: #2d2a28;
  }

  .primary-button:focus {
    box-shadow: 0 0 0 3px rgba(28, 26, 25, 0.2);
    border-color: #2d2a28;
  }

  .primary-button:active {
    background-color: #3d3a38;
  }

  .primary-button:disabled {
    background-color: #d6d3d1;
    border-color: #e7e5e4;
    color: #78716c;
    opacity: 0.7;
  }

  .primary-button:disabled:hover,
  .primary-button:disabled:focus,
  .primary-button:disabled:active {
    background-color: #d6d3d1;
    border-color: #e7e5e4;
    color: #78716c;
  }
}

/* Dark mode styles */
@media (prefers-color-scheme: dark) {
  /* Logo theme switching */
  .logo-light {
    display: none;
  }

  .logo-dark {
    display: block;
  }

  /* Sections */
  section {
    background: #1c1a19;
    border-color: #4e4a49;
  }

  .section-header {
    border-bottom-color: #4e4a49;
  }

  .section-header h2,
  .section-header h3 {
    color: #f6f6f6;
  }

  .section-header p {
    color: #f6f6f6;
  }

  /* Form elements */
  .form-label {
    color: #f6f6f6;
  }

  .form-select {
    background-color: #1c1a19;
    border-color: #4e4a49;
    color: #f6f6f6;
    background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%23f6f6f6' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M6 8l4 4 4-4'/%3e%3c/svg%3e");
  }

  .form-select:focus {
    background-color: #1c1a19;
    border-color: #4e4a49;
  }

  .form-select:hover:not(:focus) {
    background-color: #1c1a19;
    border-color: #4e4a49;
  }

  .form-select:disabled {
    background-color: #374151;
    border-color: #4e4a49;
  }

  /* Custom dropdown */
  .dropdown-trigger {
    background-color: #1c1a19;
    border-color: #4e4a49;
    color: #f6f6f6;
    background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%23f6f6f6' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M6 8l4 4 4-4'/%3e%3c/svg%3e");
  }

  .dropdown-trigger:focus {
    background-color: #1c1a19;
    border-color: #4e4a49;
  }

  .dropdown-trigger:hover:not(:focus) {
    background-color: #1c1a19;
    border-color: #4e4a49;
  }

  .dropdown-trigger.open {
    background-color: #1c1a19;
    border-color: #4e4a49;
    background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%23f6f6f6' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M14 12l-4-4-4 4'/%3e%3c/svg%3e");
  }

  .dropdown-trigger.placeholder {
    color: #9ca3af;
  }

  .dropdown-options {
    background: #1c1a19;
    border-color: #4e4a49;
  }

  .dropdown-option {
    color: #f6f6f6;
  }

  .dropdown-option:hover {
    background-color: #374151;
    color: #f6f6f6;
  }

  .dropdown-option:focus {
    background-color: #374151;
    color: #f6f6f6;
  }

  .dropdown-option:active {
    background-color: #4b5563;
  }

  .dropdown-option.selected {
    background-color: #4e4a49;
    color: #f6f6f6;
  }

  .dropdown-option.selected:hover {
    background-color: #6b7280;
  }

  /* Dropdown scrollbar styling for dark mode */
  .dropdown-options::-webkit-scrollbar-track {
    background: #374151;
  }

  .dropdown-options::-webkit-scrollbar-thumb {
    background: #6b7280;
    border: 1px solid #374151;
  }

  .dropdown-options::-webkit-scrollbar-thumb:hover {
    background: #9ca3af;
  }

  /* Loading state */
  .loading-state {
    background-color: #1c1a19;
    border: 1px solid #4e4a49;
    color: #f6f6f6;
  }

  .spinner {
    border-color: #4e4a49;
    border-top-color: #f6f6f6;
  }

  /* Error state */
  .error-state {
    background-color: #7f1d1d;
    border-color: #4e4a49;
    color: #f6f6f6;
  }

  .error-state strong {
    color: #f6f6f6;
  }

  .error-state p {
    color: #f6f6f6;
  }

  /* OAuth Error Section Dark Mode */
  .error-section {
    background: #1c1a19;
    border-color: #991b1b;
  }

  .error-item strong {
    color: #fca5a5;
  }

  .error-code {
    background-color: #450a0a;
    border-color: #7f1d1d;
    color: #fca5a5;
  }

  .error-description {
    background-color: #450a0a;
    border-color: #7f1d1d;
    color: #fca5a5;
  }

  .error-actions {
    background-color: #422006;
    border-color: #92400e;
  }

  .error-actions p {
    color: #fde68a;
  }

  /* Context details */
  .context-item strong {
    color: #f6f6f6;
  }

  .env-name {
    color: #f6f6f6;
  }

  .env-type {
    color: #f6f6f6;
  }

  /* Actions section */
  .actions-section {
    background-color: #374151;
  }

  /* Response section */
  .response-text {
    background-color: #374151;
    border-color: #4e4a49;
    color: #f6f6f6;
  }

  /* App header */
  .app-header h1 {
    color: #f6f6f6;
  }

  .app-header p {
    color: #f6f6f6;
  }

  /* Button dark mode */
  .primary-button {
    background-color: #fdfdfd;
    color: #374151;
    border-color: #4e4a49;
  }

  .primary-button:hover {
    background-color: #f3f4f6;
    border-color: #6b7280;
  }

  .primary-button:focus {
    box-shadow: 0 0 0 3px rgba(246, 246, 246, 0.1);
    border-color: #9ca3af;
  }

  .primary-button:active {
    background-color: #e5e7eb;
  }

  .primary-button:disabled {
    background-color: #2f2f30;
    border-color: #3f3f40;
    color: #8b949e;
    opacity: 0.55;
  }

  .primary-button:disabled:hover,
  .primary-button:disabled:focus,
  .primary-button:disabled:active {
    background-color: #2f2f30;
    border-color: #3f3f40;
    color: #8b949e;
  }
}

```

--------------------------------------------------------------------------------
/tests/unit/discovery/test_exposures_fetcher.py:
--------------------------------------------------------------------------------

```python
from unittest.mock import patch

import pytest

from dbt_mcp.discovery.client import ExposuresFetcher


@pytest.fixture
def exposures_fetcher(mock_api_client):
    return ExposuresFetcher(api_client=mock_api_client)


async def test_fetch_exposures_single_page(exposures_fetcher, mock_api_client):
    mock_response = {
        "data": {
            "environment": {
                "definition": {
                    "exposures": {
                        "pageInfo": {"hasNextPage": False, "endCursor": None},
                        "edges": [
                            {
                                "node": {
                                    "name": "test_exposure",
                                    "uniqueId": "exposure.test.test_exposure",
                                    "exposureType": "application",
                                    "maturity": "high",
                                    "ownerEmail": "[email protected]",
                                    "ownerName": "Test Owner",
                                    "url": "https://example.com",
                                    "meta": {},
                                    "freshnessStatus": "Unknown",
                                    "description": "Test exposure",
                                    "label": None,
                                    "parents": [
                                        {"uniqueId": "model.test.parent_model"}
                                    ],
                                }
                            }
                        ],
                    }
                }
            }
        }
    }

    mock_api_client.execute_query.return_value = mock_response

    with patch("dbt_mcp.discovery.client.raise_gql_error"):
        result = await exposures_fetcher.fetch_exposures()

    assert len(result) == 1
    assert result[0]["name"] == "test_exposure"
    assert result[0]["uniqueId"] == "exposure.test.test_exposure"
    assert result[0]["exposureType"] == "application"
    assert result[0]["maturity"] == "high"
    assert result[0]["ownerEmail"] == "[email protected]"
    assert result[0]["ownerName"] == "Test Owner"
    assert result[0]["url"] == "https://example.com"
    assert result[0]["meta"] == {}
    assert result[0]["freshnessStatus"] == "Unknown"
    assert result[0]["description"] == "Test exposure"
    assert result[0]["parents"] == [{"uniqueId": "model.test.parent_model"}]

    mock_api_client.execute_query.assert_called_once()
    args, kwargs = mock_api_client.execute_query.call_args
    assert args[1]["environmentId"] == 123
    assert args[1]["first"] == 100


async def test_fetch_exposures_multiple_pages(exposures_fetcher, mock_api_client):
    page1_response = {
        "data": {
            "environment": {
                "definition": {
                    "exposures": {
                        "pageInfo": {"hasNextPage": True, "endCursor": "cursor123"},
                        "edges": [
                            {
                                "node": {
                                    "name": "exposure1",
                                    "uniqueId": "exposure.test.exposure1",
                                    "exposureType": "application",
                                    "maturity": "high",
                                    "ownerEmail": "[email protected]",
                                    "ownerName": "Test Owner 1",
                                    "url": "https://example1.com",
                                    "meta": {},
                                    "freshnessStatus": "Unknown",
                                    "description": "Test exposure 1",
                                    "label": None,
                                    "parents": [],
                                }
                            }
                        ],
                    }
                }
            }
        }
    }

    page2_response = {
        "data": {
            "environment": {
                "definition": {
                    "exposures": {
                        "pageInfo": {"hasNextPage": False, "endCursor": "cursor456"},
                        "edges": [
                            {
                                "node": {
                                    "name": "exposure2",
                                    "uniqueId": "exposure.test.exposure2",
                                    "exposureType": "dashboard",
                                    "maturity": "medium",
                                    "ownerEmail": "[email protected]",
                                    "ownerName": "Test Owner 2",
                                    "url": "https://example2.com",
                                    "meta": {"key": "value"},
                                    "freshnessStatus": "Fresh",
                                    "description": "Test exposure 2",
                                    "label": "Label 2",
                                    "parents": [
                                        {"uniqueId": "model.test.parent_model2"}
                                    ],
                                }
                            }
                        ],
                    }
                }
            }
        }
    }

    mock_api_client.execute_query.side_effect = [page1_response, page2_response]

    with patch("dbt_mcp.discovery.client.raise_gql_error"):
        result = await exposures_fetcher.fetch_exposures()

    assert len(result) == 2
    assert result[0]["name"] == "exposure1"
    assert result[1]["name"] == "exposure2"
    assert result[1]["meta"] == {"key": "value"}
    assert result[1]["label"] == "Label 2"

    assert mock_api_client.execute_query.call_count == 2

    # Check first call (no cursor)
    first_call = mock_api_client.execute_query.call_args_list[0]
    assert first_call[0][1]["environmentId"] == 123
    assert first_call[0][1]["first"] == 100
    assert "after" not in first_call[0][1]

    # Check second call (with cursor)
    second_call = mock_api_client.execute_query.call_args_list[1]
    assert second_call[0][1]["environmentId"] == 123
    assert second_call[0][1]["first"] == 100
    assert second_call[0][1]["after"] == "cursor123"


async def test_fetch_exposures_empty_response(exposures_fetcher, mock_api_client):
    mock_response = {
        "data": {
            "environment": {
                "definition": {
                    "exposures": {
                        "pageInfo": {"hasNextPage": False, "endCursor": None},
                        "edges": [],
                    }
                }
            }
        }
    }

    mock_api_client.execute_query.return_value = mock_response

    with patch("dbt_mcp.discovery.client.raise_gql_error"):
        result = await exposures_fetcher.fetch_exposures()

    assert len(result) == 0
    assert isinstance(result, list)


async def test_fetch_exposures_handles_malformed_edges(
    exposures_fetcher, mock_api_client
):
    mock_response = {
        "data": {
            "environment": {
                "definition": {
                    "exposures": {
                        "pageInfo": {"hasNextPage": False, "endCursor": None},
                        "edges": [
                            {
                                "node": {
                                    "name": "valid_exposure",
                                    "uniqueId": "exposure.test.valid_exposure",
                                    "exposureType": "application",
                                    "maturity": "high",
                                    "ownerEmail": "[email protected]",
                                    "ownerName": "Test Owner",
                                    "url": "https://example.com",
                                    "meta": {},
                                    "freshnessStatus": "Unknown",
                                    "description": "Valid exposure",
                                    "label": None,
                                    "parents": [],
                                }
                            },
                            {"invalid": "edge"},  # Missing "node" key
                            {"node": "not_a_dict"},  # Node is not a dict
                            {
                                "node": {
                                    "name": "another_valid_exposure",
                                    "uniqueId": "exposure.test.another_valid_exposure",
                                    "exposureType": "dashboard",
                                    "maturity": "low",
                                    "ownerEmail": "[email protected]",
                                    "ownerName": "Test Owner 2",
                                    "url": "https://example2.com",
                                    "meta": {},
                                    "freshnessStatus": "Stale",
                                    "description": "Another valid exposure",
                                    "label": None,
                                    "parents": [],
                                }
                            },
                        ],
                    }
                }
            }
        }
    }

    mock_api_client.execute_query.return_value = mock_response

    with patch("dbt_mcp.discovery.client.raise_gql_error"):
        result = await exposures_fetcher.fetch_exposures()

    # Should only get the valid exposures (malformed edges should be filtered out)
    assert len(result) == 2
    assert result[0]["name"] == "valid_exposure"
    assert result[1]["name"] == "another_valid_exposure"


async def test_fetch_exposure_details_by_unique_ids_single(
    exposures_fetcher, mock_api_client
):
    mock_response = {
        "data": {
            "environment": {
                "definition": {
                    "exposures": {
                        "edges": [
                            {
                                "node": {
                                    "name": "customer_dashboard",
                                    "uniqueId": "exposure.analytics.customer_dashboard",
                                    "exposureType": "dashboard",
                                    "maturity": "high",
                                    "ownerEmail": "[email protected]",
                                    "ownerName": "Analytics Team",
                                    "url": "https://dashboard.example.com/customers",
                                    "meta": {"team": "analytics", "priority": "high"},
                                    "freshnessStatus": "Fresh",
                                    "description": "Customer analytics dashboard",
                                    "label": "Customer Dashboard",
                                    "parents": [
                                        {"uniqueId": "model.analytics.customers"},
                                        {
                                            "uniqueId": "model.analytics.customer_metrics"
                                        },
                                    ],
                                }
                            }
                        ]
                    }
                }
            }
        }
    }

    mock_api_client.execute_query.return_value = mock_response

    with patch("dbt_mcp.discovery.client.raise_gql_error"):
        result = await exposures_fetcher.fetch_exposure_details(
            unique_ids=["exposure.analytics.customer_dashboard"]
        )

    assert isinstance(result, list)
    assert len(result) == 1
    exposure = result[0]
    assert exposure["name"] == "customer_dashboard"
    assert exposure["uniqueId"] == "exposure.analytics.customer_dashboard"
    assert exposure["exposureType"] == "dashboard"
    assert exposure["maturity"] == "high"
    assert exposure["ownerEmail"] == "[email protected]"
    assert exposure["ownerName"] == "Analytics Team"
    assert exposure["url"] == "https://dashboard.example.com/customers"
    assert exposure["meta"] == {"team": "analytics", "priority": "high"}
    assert exposure["freshnessStatus"] == "Fresh"
    assert exposure["description"] == "Customer analytics dashboard"
    assert exposure["label"] == "Customer Dashboard"
    assert len(exposure["parents"]) == 2
    assert exposure["parents"][0]["uniqueId"] == "model.analytics.customers"
    assert exposure["parents"][1]["uniqueId"] == "model.analytics.customer_metrics"

    mock_api_client.execute_query.assert_called_once()
    args, kwargs = mock_api_client.execute_query.call_args
    assert args[1]["environmentId"] == 123
    assert args[1]["first"] == 1
    assert args[1]["filter"] == {"uniqueIds": ["exposure.analytics.customer_dashboard"]}


async def test_fetch_exposure_details_by_unique_ids_multiple(
    exposures_fetcher, mock_api_client
):
    mock_response = {
        "data": {
            "environment": {
                "definition": {
                    "exposures": {
                        "edges": [
                            {
                                "node": {
                                    "name": "customer_dashboard",
                                    "uniqueId": "exposure.analytics.customer_dashboard",
                                    "exposureType": "dashboard",
                                    "maturity": "high",
                                    "ownerEmail": "[email protected]",
                                    "ownerName": "Analytics Team",
                                    "url": "https://dashboard.example.com/customers",
                                    "meta": {"team": "analytics", "priority": "high"},
                                    "freshnessStatus": "Fresh",
                                    "description": "Customer analytics dashboard",
                                    "label": "Customer Dashboard",
                                    "parents": [],
                                }
                            },
                            {
                                "node": {
                                    "name": "sales_report",
                                    "uniqueId": "exposure.sales.sales_report",
                                    "exposureType": "analysis",
                                    "maturity": "medium",
                                    "ownerEmail": "[email protected]",
                                    "ownerName": "Sales Team",
                                    "url": None,
                                    "meta": {},
                                    "freshnessStatus": "Stale",
                                    "description": "Monthly sales analysis report",
                                    "label": None,
                                    "parents": [{"uniqueId": "model.sales.sales_data"}],
                                }
                            },
                        ]
                    }
                }
            }
        }
    }

    mock_api_client.execute_query.return_value = mock_response

    with patch("dbt_mcp.discovery.client.raise_gql_error"):
        result = await exposures_fetcher.fetch_exposure_details(
            unique_ids=[
                "exposure.analytics.customer_dashboard",
                "exposure.sales.sales_report",
            ]
        )

    assert isinstance(result, list)
    assert len(result) == 2

    # Check first exposure
    exposure1 = result[0]
    assert exposure1["name"] == "customer_dashboard"
    assert exposure1["uniqueId"] == "exposure.analytics.customer_dashboard"
    assert exposure1["exposureType"] == "dashboard"

    # Check second exposure
    exposure2 = result[1]
    assert exposure2["name"] == "sales_report"
    assert exposure2["uniqueId"] == "exposure.sales.sales_report"
    assert exposure2["exposureType"] == "analysis"

    mock_api_client.execute_query.assert_called_once()
    args, kwargs = mock_api_client.execute_query.call_args
    assert args[1]["environmentId"] == 123
    assert args[1]["first"] == 2
    assert args[1]["filter"] == {
        "uniqueIds": [
            "exposure.analytics.customer_dashboard",
            "exposure.sales.sales_report",
        ]
    }


async def test_fetch_exposure_details_by_name(exposures_fetcher, mock_api_client):
    # Mock the response for fetch_exposures (which gets called when filtering by name)
    mock_exposures_response = {
        "data": {
            "environment": {
                "definition": {
                    "exposures": {
                        "pageInfo": {"hasNextPage": False, "endCursor": None},
                        "edges": [
                            {
                                "node": {
                                    "name": "sales_report",
                                    "uniqueId": "exposure.sales.sales_report",
                                    "exposureType": "analysis",
                                    "maturity": "medium",
                                    "ownerEmail": "[email protected]",
                                    "ownerName": "Sales Team",
                                    "url": None,
                                    "meta": {},
                                    "freshnessStatus": "Stale",
                                    "description": "Monthly sales analysis report",
                                    "label": None,
                                    "parents": [{"uniqueId": "model.sales.sales_data"}],
                                }
                            },
                            {
                                "node": {
                                    "name": "other_exposure",
                                    "uniqueId": "exposure.other.other_exposure",
                                    "exposureType": "dashboard",
                                    "maturity": "high",
                                    "ownerEmail": "[email protected]",
                                    "ownerName": "Other Team",
                                    "url": None,
                                    "meta": {},
                                    "freshnessStatus": "Fresh",
                                    "description": "Other exposure",
                                    "label": None,
                                    "parents": [],
                                }
                            },
                        ],
                    }
                }
            }
        }
    }

    mock_api_client.execute_query.return_value = mock_exposures_response

    with patch("dbt_mcp.discovery.client.raise_gql_error"):
        result = await exposures_fetcher.fetch_exposure_details(
            exposure_name="sales_report"
        )

    assert isinstance(result, list)
    assert len(result) == 1
    exposure = result[0]
    assert exposure["name"] == "sales_report"
    assert exposure["uniqueId"] == "exposure.sales.sales_report"
    assert exposure["exposureType"] == "analysis"
    assert exposure["maturity"] == "medium"
    assert exposure["url"] is None
    assert exposure["meta"] == {}
    assert exposure["freshnessStatus"] == "Stale"
    assert exposure["label"] is None

    # Should have called the GET_EXPOSURES query (not GET_EXPOSURE_DETAILS)
    mock_api_client.execute_query.assert_called_once()
    args, kwargs = mock_api_client.execute_query.call_args
    assert args[1]["environmentId"] == 123
    assert args[1]["first"] == 100  # PAGE_SIZE for fetch_exposures


async def test_fetch_exposure_details_not_found(exposures_fetcher, mock_api_client):
    mock_response = {
        "data": {"environment": {"definition": {"exposures": {"edges": []}}}}
    }

    mock_api_client.execute_query.return_value = mock_response

    with patch("dbt_mcp.discovery.client.raise_gql_error"):
        result = await exposures_fetcher.fetch_exposure_details(
            unique_ids=["exposure.nonexistent.exposure"]
        )

    assert result == []


async def test_get_exposure_filters_unique_ids(exposures_fetcher):
    filters = exposures_fetcher._get_exposure_filters(
        unique_ids=["exposure.test.test_exposure"]
    )
    assert filters == {"uniqueIds": ["exposure.test.test_exposure"]}


async def test_get_exposure_filters_multiple_unique_ids(exposures_fetcher):
    filters = exposures_fetcher._get_exposure_filters(
        unique_ids=["exposure.test.test1", "exposure.test.test2"]
    )
    assert filters == {"uniqueIds": ["exposure.test.test1", "exposure.test.test2"]}


async def test_get_exposure_filters_name_raises_error(exposures_fetcher):
    from dbt_mcp.errors import InvalidParameterError

    with pytest.raises(
        InvalidParameterError, match="ExposureFilter only supports uniqueIds"
    ):
        exposures_fetcher._get_exposure_filters(exposure_name="test_exposure")


async def test_get_exposure_filters_no_params(exposures_fetcher):
    from dbt_mcp.errors import InvalidParameterError

    with pytest.raises(
        InvalidParameterError,
        match="unique_ids must be provided for exposure filtering",
    ):
        exposures_fetcher._get_exposure_filters()


async def test_fetch_exposure_details_by_name_not_found(
    exposures_fetcher, mock_api_client
):
    # Mock empty response for fetch_exposures
    mock_response = {
        "data": {
            "environment": {
                "definition": {
                    "exposures": {
                        "pageInfo": {"hasNextPage": False, "endCursor": None},
                        "edges": [],
                    }
                }
            }
        }
    }

    mock_api_client.execute_query.return_value = mock_response

    with patch("dbt_mcp.discovery.client.raise_gql_error"):
        result = await exposures_fetcher.fetch_exposure_details(
            exposure_name="nonexistent_exposure"
        )

    assert result == []

```

--------------------------------------------------------------------------------
/src/dbt_mcp/lsp/lsp_connection.py:
--------------------------------------------------------------------------------

```python
"""LSP Connection Manager for dbt Fusion LSP.

This module manages the lifecycle of LSP processes and handles JSON-RPC
communication according to the Language Server Protocol specification.
"""

import asyncio
from enum import Enum
import itertools
import json
import logging
import socket
import subprocess
from collections.abc import Iterator, Sequence
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import uuid
from dataclasses import asdict

logger = logging.getLogger(__name__)


class LspEventName(str, Enum):
    """LSP event names."""

    compileComplete = "dbt/lspCompileComplete"
    logMessage = "window/logMessage"
    progress = "$/progress"
    workspaceDiagnostics = "workspace/diagnostics"
    fileDiagnostics = "textDocument/publishDiagnostics"


def event_name_from_string(string: str) -> LspEventName | None:
    """Create an LSP event name from a string."""
    try:
        return LspEventName(string)
    except ValueError:
        return None


@dataclass
class JsonRpcMessage:
    """Represents a JSON-RPC 2.0 message."""

    jsonrpc: str = "2.0"
    id: int | str | None = None
    method: str | None = None
    params: dict[str, Any] | list[Any] | None = None
    result: Any = None
    error: dict[str, Any] | None = None

    def to_dict(self, none_values: bool = False) -> dict[str, Any]:
        """Convert the message to a dictionary."""

        def dict_factory(x: list[tuple[str, Any]]) -> dict[str, Any]:
            return dict(x) if none_values else {k: v for k, v in x if v is not None}

        return asdict(self, dict_factory=dict_factory)


@dataclass
class LspConnectionState:
    """Tracks the state of an LSP connection."""

    initialized: bool = False
    shutting_down: bool = False
    capabilities: dict[str, Any] = field(default_factory=dict)
    pending_requests: dict[int | str, asyncio.Future] = field(default_factory=dict)
    pending_notifications: dict[LspEventName, list[asyncio.Future]] = field(
        default_factory=dict
    )
    compiled: bool = False
    # start at 20 to avoid collisions between ids of requests we are waiting for and the lsp server requests from us
    request_id_counter: Iterator[int] = field(
        default_factory=lambda: itertools.count(20)
    )

    def get_next_request_id(self) -> int:
        return next(self.request_id_counter)


class LSPConnection:
    """LSP process lifecycle and communication via socket.

    This class handles:
    - Starting and stopping LSP server processes
    - Socket-based JSON-RPC communication
    - Request/response correlation
    - Error handling and cleanup
    """

    def __init__(
        self,
        binary_path: str,
        cwd: str,
        args: Sequence[str] | None = None,
        connection_timeout: float = 10,
        default_request_timeout: float = 60,
    ):
        """Initialize the LSP connection manager.

        Args:
            binary_path: Path to the LSP server binary
            cwd: Working directory for the LSP process
            args: Optional command-line arguments for the LSP server
            connection_timeout: Timeout in seconds for establishing the initial socket
                              connection (default: 10). Used during server startup.
            default_request_timeout: Default timeout in seconds for LSP request operations
                                   (default: 60). Used when no timeout is specified for
                                   individual requests.
        """
        self.binary_path = Path(binary_path)
        self.args = list(args) if args else []
        self.cwd = cwd
        self.host = "127.0.0.1"
        self.port = 0

        self.process: asyncio.subprocess.Process | None = None
        self.state = LspConnectionState()

        # Socket components
        self._socket: socket.socket | None = None
        self._connection: socket.socket | None = None

        # Asyncio components for I/O
        self._reader_task: asyncio.Task | None = None
        self._writer_task: asyncio.Task | None = None
        self._stdout_reader_task: asyncio.Task | None = None
        self._stderr_reader_task: asyncio.Task | None = None
        self._stop_event = asyncio.Event()
        self._outgoing_queue: asyncio.Queue[bytes] = asyncio.Queue()

        # Timeouts
        self.connection_timeout = connection_timeout
        self.default_request_timeout = default_request_timeout

        logger.debug(f"LSP Connection initialized with binary: {self.binary_path}")

    async def start(self) -> None:
        """Start the LSP server process and socket communication tasks."""
        if self.process is not None:
            logger.warning("LSP process is already running")
            return

        try:
            self.setup_socket()

            await self.launch_lsp_process()

            # Wait for connection with timeout (run socket.accept in executor)
            if self._socket:
                self._socket.settimeout(self.connection_timeout)
                try:
                    (
                        self._connection,
                        client_addr,
                    ) = await asyncio.get_running_loop().run_in_executor(
                        None, self._socket.accept
                    )
                    if self._connection:
                        self._connection.settimeout(
                            None
                        )  # Set to blocking for read/write
                    logger.debug(f"LSP server connected from {client_addr}")
                except TimeoutError:
                    raise RuntimeError("Timeout waiting for LSP server to connect")

            # Start I/O tasks
            self._stop_event.clear()
            self._reader_task = asyncio.get_running_loop().create_task(
                self._read_loop()
            )
            self._writer_task = asyncio.get_running_loop().create_task(
                self._write_loop()
            )

        except Exception as e:
            logger.error(f"Failed to start LSP server: {e}")
            await self.stop()
            raise

    def setup_socket(self) -> None:
        """Set up the socket for LSP server communication.

        Creates a TCP socket, binds it to the configured host and port,
        and starts listening for incoming connections. If port is 0,
        the OS will auto-assign an available port.
        """
        # Create socket and bind
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._socket.bind((self.host, self.port))
        self._socket.listen(1)

        # Get the actual port if auto-assigned
        _, actual_port = self._socket.getsockname()
        self.port = actual_port
        logger.debug(f"Socket listening on {self.host}:{self.port}")

    async def launch_lsp_process(self) -> None:
        """Launch the LSP server process.

        Starts the LSP server as a subprocess with socket communication enabled.
        The process is started with stdout and stderr capture for monitoring.
        The server will connect back to the socket set up by setup_socket().
        """
        # Prepare command with socket info
        cmd = [
            str(self.binary_path),
            "--socket",
            f"{self.port}",
            "--project-dir",
            self.cwd,
            *self.args,
        ]

        logger.debug(f"Starting LSP server: {' '.join(cmd)}")
        self.process = await asyncio.create_subprocess_exec(*cmd)

        logger.info(f"LSP server started with PID: {self.process.pid}")

    async def stop(self) -> None:
        """Stop the LSP server process and cleanup resources."""
        logger.info("Stopping LSP server...")

        # Signal tasks to stop
        self._stop_event.set()

        # Cancel I/O tasks
        if self._reader_task and not self._reader_task.done():
            self._reader_task.cancel()
            try:
                await self._reader_task
            except asyncio.CancelledError:
                pass

        if self._writer_task and not self._writer_task.done():
            self._writer_task.cancel()
            try:
                await self._writer_task
            except asyncio.CancelledError:
                pass

        # Cancel stdout/stderr reader tasks
        if self._stdout_reader_task and not self._stdout_reader_task.done():
            self._stdout_reader_task.cancel()
            try:
                await self._stdout_reader_task
            except asyncio.CancelledError:
                pass

        if self._stderr_reader_task and not self._stderr_reader_task.done():
            self._stderr_reader_task.cancel()
            try:
                await self._stderr_reader_task
            except asyncio.CancelledError:
                pass

        # Send shutdown request if initialized
        if self.process and not self.state.shutting_down:
            self.state.shutting_down = True
            try:
                self._send_shutdown_request()
                await asyncio.sleep(0.5)  # Give server time to process shutdown
            except Exception as e:
                logger.warning(f"Error sending shutdown request: {e}")

        # Close socket connection
        if self._connection:
            try:
                self._connection.close()
            except Exception as e:
                logger.error(f"Error closing socket connection: {e}")
            finally:
                self._connection = None

        # Close listening socket
        if self._socket:
            try:
                self._socket.close()
            except Exception as e:
                logger.error(f"Error closing socket: {e}")
            finally:
                self._socket = None

        # Terminate the process
        if self.process:
            try:
                self.process.terminate()
                try:
                    await self.process.wait()
                except subprocess.TimeoutExpired:
                    logger.warning("LSP process didn't terminate, killing...")
                    self.process.kill()
                    await self.process.wait()
            except Exception as e:
                logger.error(f"Error terminating LSP process: {e}")
            finally:
                self.process = None

        # Clear state
        self.state = LspConnectionState()

        logger.info("LSP server stopped")

    async def initialize(
        self, root_uri: str | None = None, timeout: float = 10
    ) -> None:
        """Initialize the LSP connection.

        Sends the initialize request to the LSP server and waits for the response.
        The server capabilities are stored in the connection state.

        Args:
            root_uri: The root URI of the workspace (optional)
            timeout: Timeout in seconds for the initialize request (default: 10)
        """
        if self.state.initialized:
            raise RuntimeError("LSP server is already initialized")

        params = {
            "processId": None,
            "rootUri": root_uri,
            "clientInfo": {
                "name": "dbt-mcp",
                "version": "1.0.0",
            },
            "capabilities": {},
            "initializationOptions": {
                "project-dir": "file:///",
                "command-prefix": str(uuid.uuid4()),
            },
        }

        # Send initialize request
        result = await self.send_request("initialize", params, timeout=timeout)

        # Store capabilities
        self.state.capabilities = result.get("capabilities", {})
        self.state.initialized = True

        # Send initialized notification
        self.send_notification("initialized", {})

        logger.info("LSP server initialized successfully")

    async def _read_loop(self) -> None:
        """Background task that reads messages from the LSP server via socket."""
        if not self._connection:
            logger.warning("LSP server socket is not available")
            return

        buffer = b""

        while not self._stop_event.is_set():
            try:
                # Read data from socket (run in executor to avoid blocking)
                self._connection.settimeout(0.1)  # Short timeout to check stop event
                try:
                    chunk = await asyncio.get_running_loop().run_in_executor(
                        None, self._connection.recv, 4096
                    )
                except TimeoutError:
                    continue

                if not chunk:
                    logger.warning("LSP server socket closed")
                    break

                buffer += chunk

                # Try to parse messages from buffer
                while True:
                    message, remaining = self._parse_message(buffer)
                    if message is None:
                        break

                    buffer = remaining

                    # Process the message
                    self._handle_incoming_message(message)

            except asyncio.CancelledError:
                # Task was cancelled, exit cleanly
                break
            except Exception as e:
                if not self._stop_event.is_set():
                    logger.error(f"Error in reader task: {e}")
                break

    async def _write_loop(self) -> None:
        """Background task that writes messages to the LSP server via socket."""
        if not self._connection:
            return

        while not self._stop_event.is_set():
            try:
                # Get message from queue (with timeout to check stop event)
                try:
                    data = await asyncio.wait_for(
                        self._outgoing_queue.get(), timeout=0.1
                    )
                except TimeoutError:
                    continue

                # Write to socket (run in executor to avoid blocking)
                await asyncio.get_running_loop().run_in_executor(
                    None, self._connection.sendall, data
                )

            except asyncio.CancelledError:
                # Task was cancelled, exit cleanly
                break
            except Exception as e:
                if not self._stop_event.is_set():
                    logger.error(f"Error in writer task: {e}")
                break

    def _parse_message(self, buffer: bytes) -> tuple[JsonRpcMessage | None, bytes]:
        """Parse a JSON-RPC message from the buffer.

        LSP uses HTTP-like headers followed by JSON content:
        Content-Length: <length>\\r\\n
        \\r\\n
        <json-content>
        """
        # Look for Content-Length header
        header_end = buffer.find(b"\r\n\r\n")
        if header_end == -1:
            return None, buffer

        # Parse headers
        headers = buffer[:header_end].decode("utf-8")
        content_length = None

        for line in headers.split("\r\n"):
            if line.startswith("Content-Length:"):
                try:
                    content_length = int(line.split(":")[1].strip())
                except (IndexError, ValueError):
                    logger.error(f"Invalid Content-Length header: {line}")
                    return None, buffer[header_end + 4 :]

        if content_length is None:
            logger.error("Missing Content-Length header")
            return None, buffer[header_end + 4 :]

        # Check if we have the full message
        content_start = header_end + 4
        content_end = content_start + content_length

        if len(buffer) < content_end:
            return None, buffer

        # Parse JSON content
        try:
            content = buffer[content_start:content_end].decode("utf-8")
            data = json.loads(content)
            message = JsonRpcMessage(**data)

            return message, buffer[content_end:]

        except (json.JSONDecodeError, UnicodeDecodeError) as e:
            logger.error(f"Failed to parse message: {e}")
            return None, buffer[content_end:]

    def _handle_incoming_message(self, message: JsonRpcMessage) -> None:
        """Handle an incoming message from the LSP server."""

        # Handle responses to requests
        if message.id is not None:
            # Thread-safe: pop with default avoids race condition between check and pop
            future = self.state.pending_requests.pop(message.id, None)
            if future is not None:
                logger.debug(f"Received response for request {message.to_dict()}")

                # Use call_soon_threadsafe to safely resolve futures across event loop contexts
                # This prevents "Task got Future attached to a different loop" errors when
                # the future was created in one loop but is being resolved from another loop
                # Get the loop from the future itself to ensure we schedule on the correct loop
                future_loop = future.get_loop()
                if message.error:
                    future_loop.call_soon_threadsafe(
                        future.set_exception,
                        RuntimeError(f"LSP error: {message.error}"),
                    )
                else:
                    future_loop.call_soon_threadsafe(future.set_result, message.result)
                return
            else:
                # it's an unknown request, we respond with an empty result
                logger.debug(f"LSP request {message.to_dict()}")
                self._send_message(
                    JsonRpcMessage(id=message.id, result=None), none_values=True
                )

        if message.method is None:
            return

        # it's a known event type we want to explicitly handle
        if lsp_event_name := event_name_from_string(message.method):
            # Check if this is an event we're waiting for
            # Thread-safe: pop with default avoids race condition
            futures = self.state.pending_notifications.pop(lsp_event_name, None)
            if futures is not None:
                logger.debug(f"Received event {lsp_event_name} - {message.to_dict()}")
                # Use call_soon_threadsafe for notification futures as well
                for future in futures:
                    future_loop = future.get_loop()
                    future_loop.call_soon_threadsafe(future.set_result, message.params)

            match lsp_event_name:
                case LspEventName.compileComplete:
                    logger.info("Recorded compile complete event")
                    self.state.compiled = True
                case _:
                    logger.debug(f"LSP event {message.method}")
                    pass

        else:
            # it's an unknown notification, log it and move on
            logger.debug(f"LSP event {message.method}")

    async def send_request(
        self,
        method: str,
        params: dict[str, Any] | list[Any] | None = None,
        timeout: float | None = None,
    ) -> dict[str, Any]:
        """Send a request to the LSP server.

        Args:
            method: The JSON-RPC method name
            params: Optional parameters for the method
            timeout: Timeout in seconds for this request. If not specified, uses
                    default_request_timeout from the connection configuration.

        Returns:
            A dictionary containing the response result or error information
        """
        if not self.process:
            raise RuntimeError("LSP server is not running")

        # Create request message
        request_id = self.state.get_next_request_id()
        message = JsonRpcMessage(
            id=request_id,
            method=method,
            params=params,
        )

        # Create future for response using the current running loop
        # This prevents "Task got Future attached to a different loop" errors
        # when send_request is called from a different loop context than where
        # the connection was initialized
        future = asyncio.get_running_loop().create_future()
        self.state.pending_requests[request_id] = future

        # Send the message
        self._send_message(message)

        try:
            return await asyncio.wait_for(
                future, timeout=timeout or self.default_request_timeout
            )
        except Exception as e:
            return {"error": str(e)}

    def send_notification(
        self,
        method: str,
        params: dict[str, Any] | list[Any] | None = None,
    ) -> None:
        """Send a notification to the LSP server.

        Args:
            method: The JSON-RPC method name
            params: Optional parameters for the method
        """
        if not self.process:
            raise RuntimeError("LSP server is not running")

        # Create notification message (no ID)
        message = JsonRpcMessage(
            method=method,
            params=params,
        )

        # Send the message
        self._send_message(message)

    def wait_for_notification(
        self, event_name: LspEventName
    ) -> asyncio.Future[dict[str, Any]]:
        """Wait for a notification from the LSP server.

        Args:
            event_name: The LSP event name to wait for

        Returns:
            A Future that will be resolved with the notification params when received
        """
        future = asyncio.get_running_loop().create_future()
        self.state.pending_notifications.setdefault(event_name, []).append(future)

        return future

    def _send_message(self, message: JsonRpcMessage, none_values: bool = False) -> None:
        """Send a message to the LSP server."""
        # Serialize message
        content = json.dumps(message.to_dict(none_values=none_values))
        content_bytes = content.encode("utf-8")

        # Create LSP message with headers
        header = f"Content-Length: {len(content_bytes)}\r\n\r\n"
        header_bytes = header.encode("utf-8")

        data = header_bytes + content_bytes

        logger.debug(f"Sending message: {content}")

        # Queue for sending (put_nowait is safe from sync context)
        self._outgoing_queue.put_nowait(data)

    def _send_shutdown_request(self) -> None:
        """Send shutdown request to the LSP server."""
        try:
            # Send shutdown request
            message = JsonRpcMessage(
                id=self.state.get_next_request_id(),
                method="shutdown",
            )
            self._send_message(message)

            # Send exit notification
            exit_message = JsonRpcMessage(
                method="exit",
            )
            self._send_message(exit_message)

        except Exception as e:
            logger.error(f"Error sending shutdown: {e}")

    def is_running(self) -> bool:
        """Check if the LSP server is running."""
        return self.process is not None and self.process.returncode is None

```

--------------------------------------------------------------------------------
/src/dbt_mcp/discovery/client.py:
--------------------------------------------------------------------------------

```python
import textwrap
from typing import Literal, TypedDict

import requests

from dbt_mcp.config.config_providers import ConfigProvider, DiscoveryConfig
from dbt_mcp.errors import GraphQLError, InvalidParameterError
from dbt_mcp.gql.errors import raise_gql_error

PAGE_SIZE = 100
MAX_NODE_QUERY_LIMIT = 1000


class GraphQLQueries:
    GET_MODELS = textwrap.dedent("""
        query GetModels(
            $environmentId: BigInt!,
            $modelsFilter: ModelAppliedFilter,
            $after: String,
            $first: Int,
            $sort: AppliedModelSort
        ) {
            environment(id: $environmentId) {
                applied {
                    models(filter: $modelsFilter, after: $after, first: $first, sort: $sort) {
                        pageInfo {
                            endCursor
                        }
                        edges {
                            node {
                                name
                                uniqueId
                                description
                            }
                        }
                    }
                }
            }
        }
    """)

    GET_MODEL_HEALTH = textwrap.dedent("""
        query GetModelDetails(
            $environmentId: BigInt!,
            $modelsFilter: ModelAppliedFilter
            $first: Int,
        ) {
            environment(id: $environmentId) {
                applied {
                    models(filter: $modelsFilter, first: $first) {
                        edges {
                            node {
                                name
                                uniqueId
                                executionInfo {
                                    lastRunGeneratedAt
                                    lastRunStatus
                                    executeCompletedAt
                                    executeStartedAt
                                }
                                tests {
                                    name
                                    description
                                    columnName
                                    testType
                                    executionInfo {
                                        lastRunGeneratedAt
                                        lastRunStatus
                                        executeCompletedAt
                                        executeStartedAt
                                    }
                                }
                                ancestors(types: [Model, Source, Seed, Snapshot]) {
                                  ... on ModelAppliedStateNestedNode {
                                    name
                                    uniqueId
                                    resourceType
                                    materializedType
                                    modelexecutionInfo: executionInfo {
                                      lastRunStatus
                                      executeCompletedAt
                                      }
                                  }
                                  ... on SnapshotAppliedStateNestedNode {
                                    name
                                    uniqueId
                                    resourceType
                                    snapshotExecutionInfo: executionInfo {
                                      lastRunStatus
                                      executeCompletedAt
                                    }
                                  }
                                  ... on SeedAppliedStateNestedNode {
                                    name
                                    uniqueId
                                    resourceType
                                    seedExecutionInfo: executionInfo {
                                      lastRunStatus
                                      executeCompletedAt
                                    }
                                  }
                                  ... on SourceAppliedStateNestedNode {
                                    sourceName
                                    name
                                    resourceType
                                    freshness {
                                      maxLoadedAt
                                      maxLoadedAtTimeAgoInS
                                      freshnessStatus
                                    }
                                  }
                              }
                            }
                        }
                    }
                }
            }
        }
    """)

    GET_MODEL_DETAILS = textwrap.dedent("""
        query GetModelDetails(
            $environmentId: BigInt!,
            $modelsFilter: ModelAppliedFilter
            $first: Int,
        ) {
            environment(id: $environmentId) {
                applied {
                    models(filter: $modelsFilter, first: $first) {
                        edges {
                            node {
                                name
                                uniqueId
                                compiledCode
                                description
                                database
                                schema
                                alias
                                catalog {
                                    columns {
                                        description
                                        name
                                        type
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    """)

    COMMON_FIELDS_PARENTS_CHILDREN = textwrap.dedent("""
        {
        ... on ExposureAppliedStateNestedNode {
            resourceType
            name
            description
        }
        ... on ExternalModelNode {
            resourceType
            description
            name
        }
        ... on MacroDefinitionNestedNode {
            resourceType
            name
            description
        }
        ... on MetricDefinitionNestedNode {
            resourceType
            name
            description
        }
        ... on ModelAppliedStateNestedNode {
            resourceType
            name
            description
        }
        ... on SavedQueryDefinitionNestedNode {
            resourceType
            name
            description
        }
        ... on SeedAppliedStateNestedNode {
            resourceType
            name
            description
        }
        ... on SemanticModelDefinitionNestedNode {
            resourceType
            name
            description
        }
        ... on SnapshotAppliedStateNestedNode {
            resourceType
            name
            description
        }
        ... on SourceAppliedStateNestedNode {
            resourceType
            sourceName
            uniqueId
            name
            description
        }
        ... on TestAppliedStateNestedNode {
            resourceType
            name
            description
        }
    """)

    GET_MODEL_PARENTS = (
        textwrap.dedent("""
        query GetModelParents(
            $environmentId: BigInt!,
            $modelsFilter: ModelAppliedFilter
            $first: Int,
        ) {
            environment(id: $environmentId) {
                applied {
                    models(filter: $modelsFilter, first: $first) {
                        pageInfo {
                            endCursor
                        }
                        edges {
                            node {
                                parents
    """)
        + COMMON_FIELDS_PARENTS_CHILDREN
        + textwrap.dedent("""
                                }
                            }
                        }
                    }
                }
            }
        }
    """)
    )

    GET_MODEL_CHILDREN = (
        textwrap.dedent("""
        query GetModelChildren(
            $environmentId: BigInt!,
            $modelsFilter: ModelAppliedFilter
            $first: Int,
        ) {
            environment(id: $environmentId) {
                applied {
                    models(filter: $modelsFilter, first: $first) {
                        pageInfo {
                            endCursor
                        }
                        edges {
                            node {
                                children
    """)
        + COMMON_FIELDS_PARENTS_CHILDREN
        + textwrap.dedent("""
                                }
                            }
                        }
                    }
                }
            }
        }
    """)
    )

    GET_SOURCES = textwrap.dedent("""
        query GetSources(
            $environmentId: BigInt!,
            $sourcesFilter: SourceAppliedFilter,
            $after: String,
            $first: Int
        ) {
            environment(id: $environmentId) {
                applied {
                    sources(filter: $sourcesFilter, after: $after, first: $first) {
                        pageInfo {
                            hasNextPage
                            endCursor
                        }
                        edges {
                            node {
                                name
                                uniqueId
                                identifier
                                description
                                sourceName
                                resourceType
                                database
                                schema
                                freshness {
                                    maxLoadedAt
                                    maxLoadedAtTimeAgoInS
                                    freshnessStatus
                                }
                            }
                        }
                    }
                }
            }
        }
    """)

    GET_EXPOSURES = textwrap.dedent("""
        query Exposures($environmentId: BigInt!, $first: Int, $after: String) {
            environment(id: $environmentId) {
                definition {
                    exposures(first: $first, after: $after) {
                        totalCount
                        pageInfo {
                            hasNextPage
                            endCursor
                        }
                        edges {
                            node {
                                name
                                uniqueId
                                url
                                description
                            }
                        }
                    }
                }
            }
        }
    """)

    GET_EXPOSURE_DETAILS = textwrap.dedent("""
        query ExposureDetails($environmentId: BigInt!, $filter: ExposureFilter, $first: Int) {
            environment(id: $environmentId) {
                definition {
                    exposures(first: $first, filter: $filter) {
                        edges {
                            node {
                                name
                                maturity
                                label
                                ownerEmail
                                ownerName
                                uniqueId
                                url
                                meta
                                freshnessStatus
                                exposureType
                                description
                                parents {
                                    uniqueId
                                }
                            }
                        }
                    }
                }
            }
        }
    """)


class MetadataAPIClient:
    def __init__(self, config_provider: ConfigProvider[DiscoveryConfig]):
        self.config_provider = config_provider

    async def execute_query(self, query: str, variables: dict) -> dict:
        config = await self.config_provider.get_config()
        url = config.url
        headers = config.headers_provider.get_headers()
        response = requests.post(
            url=url,
            json={"query": query, "variables": variables},
            headers=headers,
        )
        return response.json()


class ModelFilter(TypedDict, total=False):
    modelingLayer: Literal["marts"] | None


class SourceFilter(TypedDict, total=False):
    sourceNames: list[str]
    uniqueIds: list[str] | None


class ModelsFetcher:
    def __init__(self, api_client: MetadataAPIClient):
        self.api_client = api_client

    async def get_environment_id(self) -> int:
        config = await self.api_client.config_provider.get_config()
        return config.environment_id

    def _parse_response_to_json(self, result: dict) -> list[dict]:
        raise_gql_error(result)
        edges = result["data"]["environment"]["applied"]["models"]["edges"]
        parsed_edges: list[dict] = []
        if not edges:
            return parsed_edges
        if result.get("errors"):
            raise GraphQLError(f"GraphQL query failed: {result['errors']}")
        for edge in edges:
            if not isinstance(edge, dict) or "node" not in edge:
                continue
            node = edge["node"]
            if not isinstance(node, dict):
                continue
            parsed_edges.append(node)
        return parsed_edges

    def _get_model_filters(
        self, model_name: str | None = None, unique_id: str | None = None
    ) -> dict[str, list[str] | str]:
        if unique_id:
            return {"uniqueIds": [unique_id]}
        elif model_name:
            return {"identifier": model_name}
        else:
            raise InvalidParameterError(
                "Either model_name or unique_id must be provided"
            )

    async def fetch_models(self, model_filter: ModelFilter | None = None) -> list[dict]:
        has_next_page = True
        after_cursor: str = ""
        all_edges: list[dict] = []
        while has_next_page and len(all_edges) < MAX_NODE_QUERY_LIMIT:
            variables = {
                "environmentId": await self.get_environment_id(),
                "after": after_cursor,
                "first": PAGE_SIZE,
                "modelsFilter": model_filter or {},
                "sort": {"field": "queryUsageCount", "direction": "desc"},
            }

            result = await self.api_client.execute_query(
                GraphQLQueries.GET_MODELS, variables
            )
            all_edges.extend(self._parse_response_to_json(result))

            previous_after_cursor = after_cursor
            after_cursor = result["data"]["environment"]["applied"]["models"][
                "pageInfo"
            ]["endCursor"]
            if previous_after_cursor == after_cursor:
                has_next_page = False

        return all_edges

    async def fetch_model_details(
        self, model_name: str | None = None, unique_id: str | None = None
    ) -> dict:
        model_filters = self._get_model_filters(model_name, unique_id)
        variables = {
            "environmentId": await self.get_environment_id(),
            "modelsFilter": model_filters,
            "first": 1,
        }
        result = await self.api_client.execute_query(
            GraphQLQueries.GET_MODEL_DETAILS, variables
        )
        raise_gql_error(result)
        edges = result["data"]["environment"]["applied"]["models"]["edges"]
        if not edges:
            return {}
        return edges[0]["node"]

    async def fetch_model_parents(
        self, model_name: str | None = None, unique_id: str | None = None
    ) -> list[dict]:
        model_filters = self._get_model_filters(model_name, unique_id)
        variables = {
            "environmentId": await self.get_environment_id(),
            "modelsFilter": model_filters,
            "first": 1,
        }
        result = await self.api_client.execute_query(
            GraphQLQueries.GET_MODEL_PARENTS, variables
        )
        raise_gql_error(result)
        edges = result["data"]["environment"]["applied"]["models"]["edges"]
        if not edges:
            return []
        return edges[0]["node"]["parents"]

    async def fetch_model_children(
        self, model_name: str | None = None, unique_id: str | None = None
    ) -> list[dict]:
        model_filters = self._get_model_filters(model_name, unique_id)
        variables = {
            "environmentId": await self.get_environment_id(),
            "modelsFilter": model_filters,
            "first": 1,
        }
        result = await self.api_client.execute_query(
            GraphQLQueries.GET_MODEL_CHILDREN, variables
        )
        raise_gql_error(result)
        edges = result["data"]["environment"]["applied"]["models"]["edges"]
        if not edges:
            return []
        return edges[0]["node"]["children"]

    async def fetch_model_health(
        self, model_name: str | None = None, unique_id: str | None = None
    ) -> list[dict]:
        model_filters = self._get_model_filters(model_name, unique_id)
        variables = {
            "environmentId": await self.get_environment_id(),
            "modelsFilter": model_filters,
            "first": 1,
        }
        result = await self.api_client.execute_query(
            GraphQLQueries.GET_MODEL_HEALTH, variables
        )
        raise_gql_error(result)
        edges = result["data"]["environment"]["applied"]["models"]["edges"]
        if not edges:
            return []
        return edges[0]["node"]


class ExposuresFetcher:
    def __init__(self, api_client: MetadataAPIClient):
        self.api_client = api_client

    async def get_environment_id(self) -> int:
        config = await self.api_client.config_provider.get_config()
        return config.environment_id

    def _parse_response_to_json(self, result: dict) -> list[dict]:
        raise_gql_error(result)
        edges = result["data"]["environment"]["definition"]["exposures"]["edges"]
        parsed_edges: list[dict] = []
        if not edges:
            return parsed_edges
        if result.get("errors"):
            raise GraphQLError(f"GraphQL query failed: {result['errors']}")
        for edge in edges:
            if not isinstance(edge, dict) or "node" not in edge:
                continue
            node = edge["node"]
            if not isinstance(node, dict):
                continue
            parsed_edges.append(node)
        return parsed_edges

    async def fetch_exposures(self) -> list[dict]:
        has_next_page = True
        after_cursor: str | None = None
        all_edges: list[dict] = []

        while has_next_page:
            variables: dict[str, int | str] = {
                "environmentId": await self.get_environment_id(),
                "first": PAGE_SIZE,
            }
            if after_cursor:
                variables["after"] = after_cursor

            result = await self.api_client.execute_query(
                GraphQLQueries.GET_EXPOSURES, variables
            )
            new_edges = self._parse_response_to_json(result)
            all_edges.extend(new_edges)

            page_info = result["data"]["environment"]["definition"]["exposures"][
                "pageInfo"
            ]
            has_next_page = page_info.get("hasNextPage", False)
            after_cursor = page_info.get("endCursor")

        return all_edges

    def _get_exposure_filters(
        self, exposure_name: str | None = None, unique_ids: list[str] | None = None
    ) -> dict[str, list[str]]:
        if unique_ids:
            return {"uniqueIds": unique_ids}
        elif exposure_name:
            raise InvalidParameterError(
                "ExposureFilter only supports uniqueIds. Please use unique_ids parameter instead of exposure_name."
            )
        else:
            raise InvalidParameterError(
                "unique_ids must be provided for exposure filtering"
            )

    async def fetch_exposure_details(
        self, exposure_name: str | None = None, unique_ids: list[str] | None = None
    ) -> list[dict]:
        if exposure_name and not unique_ids:
            # Since ExposureFilter doesn't support filtering by name,
            # we need to fetch all exposures and find the one with matching name
            all_exposures = await self.fetch_exposures()
            for exposure in all_exposures:
                if exposure.get("name") == exposure_name:
                    return [exposure]
            return []
        elif unique_ids:
            exposure_filters = self._get_exposure_filters(unique_ids=unique_ids)
            variables = {
                "environmentId": await self.get_environment_id(),
                "filter": exposure_filters,
                "first": len(unique_ids),  # Request as many as we're filtering for
            }
            result = await self.api_client.execute_query(
                GraphQLQueries.GET_EXPOSURE_DETAILS, variables
            )
            raise_gql_error(result)
            edges = result["data"]["environment"]["definition"]["exposures"]["edges"]
            if not edges:
                return []
            return [edge["node"] for edge in edges]
        else:
            raise InvalidParameterError(
                "Either exposure_name or unique_ids must be provided"
            )


class SourcesFetcher:
    def __init__(self, api_client: MetadataAPIClient):
        self.api_client = api_client

    async def get_environment_id(self) -> int:
        config = await self.api_client.config_provider.get_config()
        return config.environment_id

    def _parse_response_to_json(self, result: dict) -> list[dict]:
        raise_gql_error(result)
        edges = result["data"]["environment"]["applied"]["sources"]["edges"]
        parsed_edges: list[dict] = []
        if not edges:
            return parsed_edges
        if result.get("errors"):
            raise GraphQLError(f"GraphQL query failed: {result['errors']}")
        for edge in edges:
            if not isinstance(edge, dict) or "node" not in edge:
                continue
            node = edge["node"]
            if not isinstance(node, dict):
                continue
            parsed_edges.append(node)
        return parsed_edges

    async def fetch_sources(
        self,
        source_names: list[str] | None = None,
        unique_ids: list[str] | None = None,
    ) -> list[dict]:
        source_filter: SourceFilter = {}
        if source_names is not None:
            source_filter["sourceNames"] = source_names
        if unique_ids is not None:
            source_filter["uniqueIds"] = unique_ids

        has_next_page = True
        after_cursor: str = ""
        all_edges: list[dict] = []

        while has_next_page and len(all_edges) < MAX_NODE_QUERY_LIMIT:
            variables = {
                "environmentId": await self.get_environment_id(),
                "after": after_cursor,
                "first": PAGE_SIZE,
                "sourcesFilter": source_filter,
            }

            result = await self.api_client.execute_query(
                GraphQLQueries.GET_SOURCES, variables
            )
            all_edges.extend(self._parse_response_to_json(result))

            page_info = result["data"]["environment"]["applied"]["sources"]["pageInfo"]
            has_next_page = page_info.get("hasNextPage", False)
            after_cursor = page_info.get("endCursor")

        return all_edges

```

--------------------------------------------------------------------------------
/tests/unit/lsp/test_lsp_connection.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for the LSP connection module."""

import asyncio
import socket
import subprocess
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from dbt_mcp.lsp.lsp_connection import (
    LSPConnection,
    LspConnectionState,
    LspEventName,
    JsonRpcMessage,
    event_name_from_string,
)


class TestJsonRpcMessage:
    """Test JsonRpcMessage dataclass."""

    def test_to_dict_with_request(self):
        """Test converting a request message to dictionary."""
        msg = JsonRpcMessage(id=1, method="initialize", params={"processId": None})
        result = msg.to_dict()

        assert result == {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "initialize",
            "params": {"processId": None},
        }

    def test_to_dict_with_response(self):
        """Test converting a response message to dictionary."""
        msg = JsonRpcMessage(id=1, result={"capabilities": {}})
        result = msg.to_dict()

        assert result == {"jsonrpc": "2.0", "id": 1, "result": {"capabilities": {}}}

    def test_to_dict_with_error(self):
        """Test converting an error message to dictionary."""
        msg = JsonRpcMessage(
            id=1, error={"code": -32601, "message": "Method not found"}
        )
        result = msg.to_dict()

        assert result == {
            "jsonrpc": "2.0",
            "id": 1,
            "error": {"code": -32601, "message": "Method not found"},
        }

    def test_to_dict_notification(self):
        """Test converting a notification message to dictionary."""
        msg = JsonRpcMessage(
            method="window/logMessage", params={"type": 3, "message": "Server started"}
        )
        result = msg.to_dict()

        assert result == {
            "jsonrpc": "2.0",
            "method": "window/logMessage",
            "params": {"type": 3, "message": "Server started"},
        }

    def test_from_dict(self):
        """Test creating message from dictionary."""
        data = {
            "jsonrpc": "2.0",
            "id": 42,
            "method": "textDocument/completion",
            "params": {"textDocument": {"uri": "file:///test.sql"}},
        }
        msg = JsonRpcMessage(**data)

        assert msg.jsonrpc == "2.0"
        assert msg.id == 42
        assert msg.method == "textDocument/completion"
        assert msg.params == {"textDocument": {"uri": "file:///test.sql"}}


class TestLspEventName:
    """Test LspEventName enum and helpers."""

    def test_event_name_from_string_valid(self):
        """Test converting valid string to event name."""
        assert (
            event_name_from_string("dbt/lspCompileComplete")
            == LspEventName.compileComplete
        )
        assert event_name_from_string("window/logMessage") == LspEventName.logMessage
        assert event_name_from_string("$/progress") == LspEventName.progress

    def test_event_name_from_string_invalid(self):
        """Test converting invalid string returns None."""
        assert event_name_from_string("invalid/event") is None
        assert event_name_from_string("") is None


class TestLspConnectionState:
    """Test LspConnectionState dataclass."""

    def test_initial_state(self):
        """Test initial state values."""
        state = LspConnectionState()

        assert state.initialized is False
        assert state.shutting_down is False
        assert state.capabilities is not None
        assert len(state.capabilities) == 0
        assert state.pending_requests == {}
        assert state.pending_notifications == {}
        assert state.compiled is False

    def test_get_next_request_id(self):
        """Test request ID generation."""
        state = LspConnectionState()

        # Should start at 20 to avoid collisions
        id1 = state.get_next_request_id()
        id2 = state.get_next_request_id()
        id3 = state.get_next_request_id()

        assert id1 == 20
        assert id2 == 21
        assert id3 == 22


class TestLSPConnectionInitialization:
    """Test LSP connection initialization and validation."""

    def test_init_valid_binary(self, tmp_path):
        """Test initialization with valid binary path."""
        # Create a dummy binary file
        binary_path = tmp_path / "lsp-server"
        binary_path.touch()

        conn = LSPConnection(
            binary_path=str(binary_path),
            cwd="/test/dir",
            args=["--arg1", "--arg2"],
            connection_timeout=15,
            default_request_timeout=60,
        )

        assert conn.binary_path == binary_path
        assert conn.cwd == "/test/dir"
        assert conn.args == ["--arg1", "--arg2"]
        assert conn.host == "127.0.0.1"
        assert conn.port == 0
        assert conn.connection_timeout == 15
        assert conn.default_request_timeout == 60
        assert conn.process is None
        assert isinstance(conn.state, LspConnectionState)


class TestSocketSetup:
    """Test socket setup and lifecycle."""

    def test_setup_socket_success(self, tmp_path):
        """Test successful socket setup."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        with patch("socket.socket") as mock_socket_class:
            mock_socket = MagicMock()
            mock_socket.getsockname.return_value = ("127.0.0.1", 54321)
            mock_socket_class.return_value = mock_socket

            conn.setup_socket()

            # Verify socket setup
            mock_socket_class.assert_called_once_with(
                socket.AF_INET, socket.SOCK_STREAM
            )
            mock_socket.setsockopt.assert_called_once_with(
                socket.SOL_SOCKET, socket.SO_REUSEADDR, 1
            )
            mock_socket.bind.assert_called_once_with(("127.0.0.1", 0))
            mock_socket.listen.assert_called_once_with(1)

            assert conn.port == 54321
            assert conn._socket == mock_socket


class TestProcessLaunching:
    """Test LSP process launching and termination."""

    @pytest.mark.asyncio
    async def test_launch_lsp_process_success(self, tmp_path):
        """Test successful process launch."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test/dir")
        conn.port = 12345

        with patch("asyncio.create_subprocess_exec") as mock_create_subprocess:
            mock_process = MagicMock()
            mock_process.pid = 9999
            mock_create_subprocess.return_value = mock_process

            await conn.launch_lsp_process()

            # Verify process was started with correct arguments
            mock_create_subprocess.assert_called_once_with(
                str(binary_path), "--socket", "12345", "--project-dir", "/test/dir"
            )

            assert conn.process == mock_process


class TestStartStop:
    """Test start/stop lifecycle."""

    @pytest.mark.asyncio
    async def test_start_success(self, tmp_path):
        """Test successful server start."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Mock socket setup
        mock_socket = MagicMock()
        mock_connection = MagicMock()
        mock_socket.getsockname.return_value = ("127.0.0.1", 54321)

        # Mock process
        mock_process = MagicMock()
        mock_process.pid = 9999

        with (
            patch("socket.socket", return_value=mock_socket),
            patch("asyncio.create_subprocess_exec", return_value=mock_process),
            patch.object(conn, "_read_loop", new_callable=AsyncMock),
            patch.object(conn, "_write_loop", new_callable=AsyncMock),
        ):
            # Mock socket accept
            async def mock_accept_wrapper():
                return mock_connection, ("127.0.0.1", 12345)

            with patch("asyncio.get_running_loop") as mock_loop:
                mock_loop.return_value.run_in_executor.return_value = (
                    mock_accept_wrapper()
                )
                mock_loop.return_value.create_task.side_effect = (
                    lambda coro: asyncio.create_task(coro)
                )

                await conn.start()

                assert conn.process == mock_process
                assert conn._connection == mock_connection
                assert conn._reader_task is not None
                assert conn._writer_task is not None

    @pytest.mark.asyncio
    async def test_start_already_running(self, tmp_path):
        """Test starting when already running."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")
        conn.process = MagicMock()  # Simulate already running

        with (
            patch("socket.socket"),
            patch("asyncio.create_subprocess_exec") as mock_create_subprocess,
        ):
            await conn.start()

            # Should not create a new process
            mock_create_subprocess.assert_not_called()

    @pytest.mark.asyncio
    async def test_start_timeout(self, tmp_path):
        """Test start timeout when server doesn't connect."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test", connection_timeout=0.1)

        mock_socket = MagicMock()
        mock_socket.getsockname.return_value = ("127.0.0.1", 54321)
        mock_process = MagicMock()

        with (
            patch("socket.socket", return_value=mock_socket),
            patch("asyncio.create_subprocess_exec", return_value=mock_process),
        ):
            # Simulate timeout in socket.accept
            mock_socket.accept.side_effect = TimeoutError

            with patch("asyncio.get_running_loop") as mock_loop:
                mock_loop.return_value.run_in_executor.side_effect = TimeoutError

                with pytest.raises(
                    RuntimeError, match="Timeout waiting for LSP server to connect"
                ):
                    await conn.start()

    @pytest.mark.asyncio
    async def test_stop_complete_cleanup(self, tmp_path):
        """Test complete cleanup on stop."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Setup mocks for running state
        conn.process = MagicMock()
        conn.process.terminate = MagicMock()
        conn.process.wait = AsyncMock()
        conn.process.kill = MagicMock()

        conn._socket = MagicMock()
        conn._connection = MagicMock()

        # Create mock tasks with proper async behavior
        async def mock_task():
            pass

        conn._reader_task = asyncio.create_task(mock_task())
        conn._writer_task = asyncio.create_task(mock_task())

        # Let tasks complete
        await asyncio.sleep(0.01)

        # Store references before they are set to None
        mock_connection = conn._connection
        mock_socket = conn._socket
        mock_process = conn.process

        with patch.object(conn, "_send_shutdown_request") as mock_shutdown:
            await conn.stop()

            # Verify cleanup methods were called
            mock_shutdown.assert_called_once()
            mock_connection.close.assert_called_once()
            mock_socket.close.assert_called_once()
            mock_process.terminate.assert_called_once()

            # Verify everything was set to None
            assert conn.process is None
            assert conn._socket is None
            assert conn._connection is None

    @pytest.mark.asyncio
    async def test_stop_force_kill(self, tmp_path):
        """Test force kill when process doesn't terminate."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Setup mock process that doesn't terminate
        mock_process = MagicMock()
        mock_process.terminate = MagicMock()
        mock_process.wait = AsyncMock(
            side_effect=[subprocess.TimeoutExpired("cmd", 1), None]
        )
        mock_process.kill = MagicMock()
        conn.process = mock_process

        await conn.stop()

        # Verify force kill was called
        mock_process.terminate.assert_called_once()
        mock_process.kill.assert_called_once()


class TestInitializeMethod:
    """Test LSP initialization handshake."""

    @pytest.mark.asyncio
    async def test_initialize_success(self, tmp_path):
        """Test successful initialization."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")
        conn.process = MagicMock()  # Simulate running

        # Mock send_request to return capabilities
        mock_result = {
            "capabilities": {
                "textDocumentSync": 2,
                "completionProvider": {"triggerCharacters": [".", ":"]},
            }
        }

        with (
            patch.object(
                conn, "send_request", new_callable=AsyncMock
            ) as mock_send_request,
            patch.object(conn, "send_notification") as mock_send_notification,
        ):
            mock_send_request.return_value = mock_result

            await conn.initialize(root_uri="file:///workspace", timeout=5)

            # Verify initialize request was sent
            mock_send_request.assert_called_once()
            call_args = mock_send_request.call_args
            assert call_args[0][0] == "initialize"
            assert call_args[1]["timeout"] == 5

            params = call_args[0][1]
            assert params["rootUri"] == "file:///workspace"
            assert params["clientInfo"]["name"] == "dbt-mcp"

            # Verify initialized notification was sent
            mock_send_notification.assert_called_once_with("initialized", {})

            # Verify state was updated
            assert conn.state.initialized is True
            assert conn.state.capabilities == mock_result["capabilities"]

    @pytest.mark.asyncio
    async def test_initialize_already_initialized(self, tmp_path):
        """Test initialization when already initialized."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")
        conn.process = MagicMock()
        conn.state.initialized = True

        with pytest.raises(RuntimeError, match="LSP server is already initialized"):
            await conn.initialize()


class TestMessageParsing:
    """Test JSON-RPC message parsing."""

    def test_parse_message_complete(self, tmp_path):
        """Test parsing a complete message."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Create a valid LSP message
        content = '{"jsonrpc":"2.0","id":1,"result":{"test":true}}'
        header = f"Content-Length: {len(content)}\r\n\r\n"
        buffer = (header + content).encode("utf-8")

        message, remaining = conn._parse_message(buffer)

        assert message is not None
        assert message.id == 1
        assert message.result == {"test": True}
        assert remaining == b""

    def test_parse_message_incomplete_header(self, tmp_path):
        """Test parsing with incomplete header."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        buffer = b"Content-Length: 50\r\n"  # Missing \r\n\r\n

        message, remaining = conn._parse_message(buffer)

        assert message is None
        assert remaining == buffer

    def test_parse_message_incomplete_content(self, tmp_path):
        """Test parsing with incomplete content."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        content = '{"jsonrpc":"2.0","id":1,"result":{"test":true}}'
        header = f"Content-Length: {len(content)}\r\n\r\n"
        # Only include part of the content
        buffer = (header + content[:10]).encode("utf-8")

        message, remaining = conn._parse_message(buffer)

        assert message is None
        assert remaining == buffer

    def test_parse_message_invalid_json(self, tmp_path):
        """Test parsing with invalid JSON content."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        content = '{"invalid json'
        header = f"Content-Length: {len(content)}\r\n\r\n"
        buffer = (header + content).encode("utf-8")

        message, remaining = conn._parse_message(buffer)

        assert message is None
        assert remaining == b""  # Invalid message is discarded

    def test_parse_message_missing_content_length(self, tmp_path):
        """Test parsing with missing Content-Length header."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        buffer = b'Some-Header: value\r\n\r\n{"test":true}'

        message, remaining = conn._parse_message(buffer)

        assert message is None
        assert remaining == b'{"test":true}'  # Header consumed, content remains

    def test_parse_message_multiple_messages(self, tmp_path):
        """Test parsing multiple messages from buffer."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Create two messages
        content1 = '{"jsonrpc":"2.0","id":1,"result":true}'
        content2 = '{"jsonrpc":"2.0","id":2,"result":false}'
        header1 = f"Content-Length: {len(content1)}\r\n\r\n"
        header2 = f"Content-Length: {len(content2)}\r\n\r\n"
        buffer = (header1 + content1 + header2 + content2).encode("utf-8")

        # Parse first message
        message1, remaining1 = conn._parse_message(buffer)
        assert message1 is not None
        assert message1.id == 1
        assert message1.result is True

        # Parse second message
        message2, remaining2 = conn._parse_message(remaining1)
        assert message2 is not None
        assert message2.id == 2
        assert message2.result is False
        assert remaining2 == b""


class TestMessageHandling:
    """Test incoming message handling."""

    def test_handle_response_message(self, tmp_path):
        """Test handling response to a request."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Create a pending request
        future = asyncio.Future()
        conn.state.pending_requests[42] = future

        # Handle response message
        message = JsonRpcMessage(id=42, result={"success": True})

        with patch.object(future, "get_loop") as mock_get_loop:
            mock_loop = MagicMock()
            mock_get_loop.return_value = mock_loop

            conn._handle_incoming_message(message)

            # Verify future was resolved
            mock_loop.call_soon_threadsafe.assert_called_once()
            args = mock_loop.call_soon_threadsafe.call_args[0]
            assert args[0] == future.set_result
            assert args[1] == {"success": True}

            # Verify request was removed from pending
            assert 42 not in conn.state.pending_requests

    def test_handle_error_response(self, tmp_path):
        """Test handling error response."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Create a pending request
        future = asyncio.Future()
        conn.state.pending_requests[42] = future

        # Handle error response
        message = JsonRpcMessage(
            id=42, error={"code": -32601, "message": "Method not found"}
        )

        with patch.object(future, "get_loop") as mock_get_loop:
            mock_loop = MagicMock()
            mock_get_loop.return_value = mock_loop

            conn._handle_incoming_message(message)

            # Verify future was rejected
            mock_loop.call_soon_threadsafe.assert_called_once()
            args = mock_loop.call_soon_threadsafe.call_args[0]
            assert args[0] == future.set_exception

    def test_handle_unknown_response(self, tmp_path):
        """Test handling response for unknown request ID."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Handle response with unknown ID
        message = JsonRpcMessage(id=999, result={"test": True})

        with patch.object(conn, "_send_message") as mock_send:
            conn._handle_incoming_message(message)

            # Should send empty response back
            mock_send.assert_called_once()
            sent_msg = mock_send.call_args[0][0]
            assert isinstance(sent_msg, JsonRpcMessage)
            assert sent_msg.id == 999
            assert sent_msg.result is None

    def test_handle_notification(self, tmp_path):
        """Test handling notification messages."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Create futures waiting for compile complete event
        future1 = asyncio.Future()
        future2 = asyncio.Future()
        conn.state.pending_notifications[LspEventName.compileComplete] = [
            future1,
            future2,
        ]

        # Handle compile complete notification
        message = JsonRpcMessage(
            method="dbt/lspCompileComplete", params={"success": True}
        )

        with (
            patch.object(future1, "get_loop") as mock_get_loop1,
            patch.object(future2, "get_loop") as mock_get_loop2,
        ):
            mock_loop1 = MagicMock()
            mock_loop2 = MagicMock()
            mock_get_loop1.return_value = mock_loop1
            mock_get_loop2.return_value = mock_loop2

            conn._handle_incoming_message(message)

            # Verify futures were resolved
            mock_loop1.call_soon_threadsafe.assert_called_once_with(
                future1.set_result, {"success": True}
            )
            mock_loop2.call_soon_threadsafe.assert_called_once_with(
                future2.set_result, {"success": True}
            )

            # Verify compile state was set
            assert conn.state.compiled is True

    def test_handle_unknown_notification(self, tmp_path):
        """Test handling unknown notification."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Handle unknown notification
        message = JsonRpcMessage(method="unknown/notification", params={"data": "test"})

        # Should not raise, just log
        conn._handle_incoming_message(message)


class TestSendRequest:
    """Test sending requests to LSP server."""

    @pytest.mark.asyncio
    async def test_send_request_success(self, tmp_path):
        """Test successful request sending."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")
        conn.process = MagicMock()  # Simulate running

        with (
            patch.object(conn, "_send_message") as mock_send,
            patch("asyncio.wait_for", new_callable=AsyncMock) as mock_wait_for,
        ):
            mock_wait_for.return_value = {"result": "success"}

            result = await conn.send_request(
                "testMethod", {"param": "value"}, timeout=5
            )

            # Verify message was sent
            mock_send.assert_called_once()
            sent_msg = mock_send.call_args[0][0]
            assert isinstance(sent_msg, JsonRpcMessage)
            assert sent_msg.method == "testMethod"
            assert sent_msg.params == {"param": "value"}
            assert sent_msg.id is not None

            # Verify result
            assert result == {"result": "success"}

    @pytest.mark.asyncio
    async def test_send_request_not_running(self, tmp_path):
        """Test sending request when server not running."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")
        # process is None - not running

        with pytest.raises(RuntimeError, match="LSP server is not running"):
            await conn.send_request("testMethod")

    @pytest.mark.asyncio
    async def test_send_request_timeout(self, tmp_path):
        """Test request timeout."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test", default_request_timeout=1)
        conn.process = MagicMock()

        with patch.object(conn, "_send_message"):
            # Create a future that never resolves
            future = asyncio.Future()
            conn.state.pending_requests[20] = future

            # Use real wait_for to test timeout
            result = await conn.send_request("testMethod", timeout=0.01)

            assert "error" in result


class TestSendNotification:
    """Test sending notifications to LSP server."""

    def test_send_notification_success(self, tmp_path):
        """Test successful notification sending."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")
        conn.process = MagicMock()

        with patch.object(conn, "_send_message") as mock_send:
            conn.send_notification(
                "window/showMessage", {"type": 3, "message": "Hello"}
            )

            # Verify message was sent
            mock_send.assert_called_once()
            sent_msg = mock_send.call_args[0][0]
            assert isinstance(sent_msg, JsonRpcMessage)
            assert sent_msg.method == "window/showMessage"
            assert sent_msg.params == {"type": 3, "message": "Hello"}
            assert sent_msg.id is None  # Notifications have no ID

    def test_send_notification_not_running(self, tmp_path):
        """Test sending notification when server not running."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")
        # process is None - not running

        with pytest.raises(RuntimeError, match="LSP server is not running"):
            conn.send_notification("testMethod")


class TestWaitForNotification:
    """Test waiting for notifications."""

    def test_wait_for_notification(self, tmp_path):
        """Test registering to wait for a notification."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        with patch("asyncio.get_running_loop") as mock_get_loop:
            mock_loop = MagicMock()
            mock_future = MagicMock()
            mock_loop.create_future.return_value = mock_future
            mock_get_loop.return_value = mock_loop

            result = conn.wait_for_notification(LspEventName.compileComplete)

            # Verify future was created and registered
            assert result == mock_future
            assert LspEventName.compileComplete in conn.state.pending_notifications
            assert (
                mock_future
                in conn.state.pending_notifications[LspEventName.compileComplete]
            )


class TestSendMessage:
    """Test low-level message sending."""

    def test_send_message_with_jsonrpc_message(self, tmp_path):
        """Test sending JsonRpcMessage."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")
        conn._outgoing_queue = MagicMock()

        message = JsonRpcMessage(id=1, method="test", params={"key": "value"})

        conn._send_message(message)

        # Verify message was queued
        conn._outgoing_queue.put_nowait.assert_called_once()
        data = conn._outgoing_queue.put_nowait.call_args[0][0]

        # Parse the data to verify format
        assert b"Content-Length:" in data
        assert b"\r\n\r\n" in data
        # JSON might have spaces after colons, check for both variants
        assert b'"jsonrpc"' in data and b'"2.0"' in data
        assert b'"method"' in data and b'"test"' in data


class TestShutdown:
    """Test shutdown sequence."""

    def test_send_shutdown_request(self, tmp_path):
        """Test sending shutdown and exit messages."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        with patch.object(conn, "_send_message") as mock_send:
            conn._send_shutdown_request()

            # Verify two messages were sent
            assert mock_send.call_count == 2

            # First should be shutdown request
            shutdown_msg = mock_send.call_args_list[0][0][0]
            assert isinstance(shutdown_msg, JsonRpcMessage)
            assert shutdown_msg.method == "shutdown"
            assert shutdown_msg.id is not None

            # Second should be exit notification
            exit_msg = mock_send.call_args_list[1][0][0]
            assert isinstance(exit_msg, JsonRpcMessage)
            assert exit_msg.method == "exit"
            assert exit_msg.id is None


class TestIsRunning:
    """Test is_running method."""

    def test_is_running_true(self, tmp_path):
        """Test when process is running."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")
        conn.process = MagicMock()
        conn.process.returncode = None

        assert conn.is_running() is True

    def test_is_running_false_no_process(self, tmp_path):
        """Test when no process."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        assert conn.is_running() is False

    def test_is_running_false_process_exited(self, tmp_path):
        """Test when process has exited."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")
        conn.process = MagicMock()
        conn.process.returncode = 0

        assert conn.is_running() is False


class TestReadWriteLoops:
    """Test async I/O loops."""

    @pytest.mark.asyncio
    async def test_read_loop_processes_messages(self, tmp_path):
        """Test read loop processes incoming messages."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Setup mock connection
        mock_connection = MagicMock()
        conn._connection = mock_connection

        # Create test data
        content = '{"jsonrpc":"2.0","id":1,"result":true}'
        header = f"Content-Length: {len(content)}\r\n\r\n"
        test_data = (header + content).encode("utf-8")

        # Mock recv to return data once then empty
        recv_calls = [test_data, b""]

        async def mock_recv_wrapper(size):
            if recv_calls:
                return recv_calls.pop(0)
            return b""

        with (
            patch("asyncio.get_running_loop") as mock_get_loop,
            patch.object(conn, "_handle_incoming_message") as mock_handle,
        ):
            mock_loop = MagicMock()
            mock_get_loop.return_value = mock_loop
            mock_loop.run_in_executor.side_effect = (
                lambda _, func, *args: mock_recv_wrapper(*args)
            )

            # Run read loop (will exit when recv returns empty)
            await conn._read_loop()

            # Verify message was handled
            mock_handle.assert_called_once()
            handled_msg = mock_handle.call_args[0][0]
            assert handled_msg.id == 1
            assert handled_msg.result is True

    @pytest.mark.asyncio
    async def test_write_loop_sends_messages(self, tmp_path):
        """Test write loop sends queued messages."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Setup mock connection
        mock_connection = MagicMock()
        conn._connection = mock_connection

        # Queue test data
        test_data = b"test message data"
        conn._outgoing_queue.put_nowait(test_data)

        # Set stop event after first iteration
        async def stop_after_one():
            await asyncio.sleep(0.01)
            conn._stop_event.set()

        with patch("asyncio.get_running_loop") as mock_get_loop:
            mock_loop = MagicMock()
            mock_get_loop.return_value = mock_loop
            mock_loop.run_in_executor.return_value = asyncio.sleep(0)

            # Run both coroutines
            await asyncio.gather(
                conn._write_loop(), stop_after_one(), return_exceptions=True
            )

            # Verify data was sent
            mock_loop.run_in_executor.assert_called()
            call_args = mock_loop.run_in_executor.call_args_list[-1]
            assert call_args[0][1] == mock_connection.sendall
            assert call_args[0][2] == test_data


class TestEdgeCases:
    """Test edge cases and error conditions."""

    @pytest.mark.asyncio
    async def test_concurrent_requests(self, tmp_path):
        """Test handling concurrent requests."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")
        conn.process = MagicMock()

        # Track sent messages
        sent_messages = []

        def track_message(msg):
            sent_messages.append(msg)

        with patch.object(conn, "_send_message", side_effect=track_message):
            # Create futures for multiple requests
            future1 = asyncio.create_task(
                conn.send_request("method1", JsonRpcMessage(id=1))
            )
            future2 = asyncio.create_task(
                conn.send_request("method2", JsonRpcMessage(id=2))
            )
            future3 = asyncio.create_task(
                conn.send_request("method3", JsonRpcMessage(id=3))
            )

            # Let tasks start
            await asyncio.sleep(0.01)

            # Verify all messages were sent with unique IDs
            assert len(sent_messages) == 3
            ids = [msg.id for msg in sent_messages]
            assert len(set(ids)) == 3  # All IDs are unique

            # Simulate responses
            for msg in sent_messages:
                if msg.id in conn.state.pending_requests:
                    future = conn.state.pending_requests[msg.id]
                    future.set_result({"response": msg.id})

            # Wait for all requests
            results = await asyncio.gather(future1, future2, future3)

            # Verify each got correct response
            assert all("response" in r for r in results)

    @pytest.mark.asyncio
    async def test_stop_with_pending_requests(self, tmp_path):
        """Test stopping with pending requests."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")
        conn.process = MagicMock()
        conn.process.terminate = MagicMock()
        conn.process.wait = AsyncMock()

        # Add pending requests
        future1 = asyncio.Future()
        future2 = asyncio.Future()
        conn.state.pending_requests[1] = future1
        conn.state.pending_requests[2] = future2

        await conn.stop()

        # Verify state was cleared
        assert len(conn.state.pending_requests) == 0

    def test_message_with_unicode(self, tmp_path):
        """Test handling messages with unicode content."""
        binary_path = tmp_path / "lsp"
        binary_path.touch()

        conn = LSPConnection(str(binary_path), "/test")

        # Create message with unicode
        content = '{"jsonrpc":"2.0","method":"test","params":{"text":"Hello 世界 🚀"}}'
        header = f"Content-Length: {len(content.encode('utf-8'))}\r\n\r\n"
        buffer = header.encode("utf-8") + content.encode("utf-8")

        message, remaining = conn._parse_message(buffer)

        assert message is not None
        assert message.method == "test"
        assert message.params["text"] == "Hello 世界 🚀"
        assert remaining == b""

```
Page 4/4FirstPrevNextLast