This is page 4 of 4. Use http://codebase.md/dbt-labs/dbt-mcp?page={x} to view the full context.
# Directory Structure
```
├── .changes
│ ├── header.tpl.md
│ ├── unreleased
│ │ ├── .gitkeep
│ │ ├── Bug Fix-20251028-143835.yaml
│ │ ├── Enhancement or New Feature-20251014-175047.yaml
│ │ └── Under the Hood-20251030-151902.yaml
│ ├── v0.1.3.md
│ ├── v0.10.0.md
│ ├── v0.10.1.md
│ ├── v0.10.2.md
│ ├── v0.10.3.md
│ ├── v0.2.0.md
│ ├── v0.2.1.md
│ ├── v0.2.10.md
│ ├── v0.2.11.md
│ ├── v0.2.12.md
│ ├── v0.2.13.md
│ ├── v0.2.14.md
│ ├── v0.2.15.md
│ ├── v0.2.16.md
│ ├── v0.2.17.md
│ ├── v0.2.18.md
│ ├── v0.2.19.md
│ ├── v0.2.2.md
│ ├── v0.2.20.md
│ ├── v0.2.3.md
│ ├── v0.2.4.md
│ ├── v0.2.5.md
│ ├── v0.2.6.md
│ ├── v0.2.7.md
│ ├── v0.2.8.md
│ ├── v0.2.9.md
│ ├── v0.3.0.md
│ ├── v0.4.0.md
│ ├── v0.4.1.md
│ ├── v0.4.2.md
│ ├── v0.5.0.md
│ ├── v0.6.0.md
│ ├── v0.6.1.md
│ ├── v0.6.2.md
│ ├── v0.7.0.md
│ ├── v0.8.0.md
│ ├── v0.8.1.md
│ ├── v0.8.2.md
│ ├── v0.8.3.md
│ ├── v0.8.4.md
│ ├── v0.9.0.md
│ ├── v0.9.1.md
│ └── v1.0.0.md
├── .changie.yaml
├── .env.example
├── .github
│ ├── actions
│ │ └── setup-python
│ │ └── action.yml
│ ├── CODEOWNERS
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ └── feature_request.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── changelog-check.yml
│ ├── codeowners-check.yml
│ ├── create-release-pr.yml
│ ├── release.yml
│ └── run-checks-pr.yaml
├── .gitignore
├── .pre-commit-config.yaml
├── .task
│ └── checksum
│ └── d2
├── .tool-versions
├── .vscode
│ ├── launch.json
│ └── settings.json
├── CHANGELOG.md
├── CONTRIBUTING.md
├── docs
│ ├── d2.png
│ └── diagram.d2
├── evals
│ └── semantic_layer
│ └── test_eval_semantic_layer.py
├── examples
│ ├── .DS_Store
│ ├── aws_strands_agent
│ │ ├── __init__.py
│ │ ├── .DS_Store
│ │ ├── dbt_data_scientist
│ │ │ ├── __init__.py
│ │ │ ├── .env.example
│ │ │ ├── agent.py
│ │ │ ├── prompts.py
│ │ │ ├── quick_mcp_test.py
│ │ │ ├── test_all_tools.py
│ │ │ └── tools
│ │ │ ├── __init__.py
│ │ │ ├── dbt_compile.py
│ │ │ ├── dbt_mcp.py
│ │ │ └── dbt_model_analyzer.py
│ │ ├── LICENSE
│ │ ├── README.md
│ │ └── requirements.txt
│ ├── google_adk_agent
│ │ ├── __init__.py
│ │ ├── main.py
│ │ ├── pyproject.toml
│ │ └── README.md
│ ├── langgraph_agent
│ │ ├── __init__.py
│ │ ├── .python-version
│ │ ├── main.py
│ │ ├── pyproject.toml
│ │ ├── README.md
│ │ └── uv.lock
│ ├── openai_agent
│ │ ├── __init__.py
│ │ ├── .gitignore
│ │ ├── .python-version
│ │ ├── main_streamable.py
│ │ ├── main.py
│ │ ├── pyproject.toml
│ │ ├── README.md
│ │ └── uv.lock
│ ├── openai_responses
│ │ ├── __init__.py
│ │ ├── .gitignore
│ │ ├── .python-version
│ │ ├── main.py
│ │ ├── pyproject.toml
│ │ ├── README.md
│ │ └── uv.lock
│ ├── pydantic_ai_agent
│ │ ├── __init__.py
│ │ ├── .gitignore
│ │ ├── .python-version
│ │ ├── main.py
│ │ ├── pyproject.toml
│ │ └── README.md
│ └── remote_mcp
│ ├── .python-version
│ ├── main.py
│ ├── pyproject.toml
│ ├── README.md
│ └── uv.lock
├── LICENSE
├── pyproject.toml
├── README.md
├── src
│ ├── client
│ │ ├── __init__.py
│ │ ├── main.py
│ │ └── tools.py
│ ├── dbt_mcp
│ │ ├── __init__.py
│ │ ├── .gitignore
│ │ ├── config
│ │ │ ├── config_providers.py
│ │ │ ├── config.py
│ │ │ ├── dbt_project.py
│ │ │ ├── dbt_yaml.py
│ │ │ ├── headers.py
│ │ │ ├── settings.py
│ │ │ └── transport.py
│ │ ├── dbt_admin
│ │ │ ├── __init__.py
│ │ │ ├── client.py
│ │ │ ├── constants.py
│ │ │ ├── run_results_errors
│ │ │ │ ├── __init__.py
│ │ │ │ ├── config.py
│ │ │ │ └── parser.py
│ │ │ └── tools.py
│ │ ├── dbt_cli
│ │ │ ├── binary_type.py
│ │ │ └── tools.py
│ │ ├── dbt_codegen
│ │ │ ├── __init__.py
│ │ │ └── tools.py
│ │ ├── discovery
│ │ │ ├── client.py
│ │ │ └── tools.py
│ │ ├── errors
│ │ │ ├── __init__.py
│ │ │ ├── admin_api.py
│ │ │ ├── base.py
│ │ │ ├── cli.py
│ │ │ ├── common.py
│ │ │ ├── discovery.py
│ │ │ ├── semantic_layer.py
│ │ │ └── sql.py
│ │ ├── gql
│ │ │ └── errors.py
│ │ ├── lsp
│ │ │ ├── __init__.py
│ │ │ ├── lsp_binary_manager.py
│ │ │ ├── lsp_client.py
│ │ │ ├── lsp_connection.py
│ │ │ └── tools.py
│ │ ├── main.py
│ │ ├── mcp
│ │ │ ├── create.py
│ │ │ └── server.py
│ │ ├── oauth
│ │ │ ├── client_id.py
│ │ │ ├── context_manager.py
│ │ │ ├── dbt_platform.py
│ │ │ ├── fastapi_app.py
│ │ │ ├── logging.py
│ │ │ ├── login.py
│ │ │ ├── refresh_strategy.py
│ │ │ ├── token_provider.py
│ │ │ └── token.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── admin_api
│ │ │ │ ├── cancel_job_run.md
│ │ │ │ ├── get_job_details.md
│ │ │ │ ├── get_job_run_artifact.md
│ │ │ │ ├── get_job_run_details.md
│ │ │ │ ├── get_job_run_error.md
│ │ │ │ ├── list_job_run_artifacts.md
│ │ │ │ ├── list_jobs_runs.md
│ │ │ │ ├── list_jobs.md
│ │ │ │ ├── retry_job_run.md
│ │ │ │ └── trigger_job_run.md
│ │ │ ├── dbt_cli
│ │ │ │ ├── args
│ │ │ │ │ ├── full_refresh.md
│ │ │ │ │ ├── limit.md
│ │ │ │ │ ├── resource_type.md
│ │ │ │ │ ├── selectors.md
│ │ │ │ │ ├── sql_query.md
│ │ │ │ │ └── vars.md
│ │ │ │ ├── build.md
│ │ │ │ ├── compile.md
│ │ │ │ ├── docs.md
│ │ │ │ ├── list.md
│ │ │ │ ├── parse.md
│ │ │ │ ├── run.md
│ │ │ │ ├── show.md
│ │ │ │ └── test.md
│ │ │ ├── dbt_codegen
│ │ │ │ ├── args
│ │ │ │ │ ├── case_sensitive_cols.md
│ │ │ │ │ ├── database_name.md
│ │ │ │ │ ├── generate_columns.md
│ │ │ │ │ ├── include_data_types.md
│ │ │ │ │ ├── include_descriptions.md
│ │ │ │ │ ├── leading_commas.md
│ │ │ │ │ ├── materialized.md
│ │ │ │ │ ├── model_name.md
│ │ │ │ │ ├── model_names.md
│ │ │ │ │ ├── schema_name.md
│ │ │ │ │ ├── source_name.md
│ │ │ │ │ ├── table_name.md
│ │ │ │ │ ├── table_names.md
│ │ │ │ │ ├── tables.md
│ │ │ │ │ └── upstream_descriptions.md
│ │ │ │ ├── generate_model_yaml.md
│ │ │ │ ├── generate_source.md
│ │ │ │ └── generate_staging_model.md
│ │ │ ├── discovery
│ │ │ │ ├── get_all_models.md
│ │ │ │ ├── get_all_sources.md
│ │ │ │ ├── get_exposure_details.md
│ │ │ │ ├── get_exposures.md
│ │ │ │ ├── get_mart_models.md
│ │ │ │ ├── get_model_children.md
│ │ │ │ ├── get_model_details.md
│ │ │ │ ├── get_model_health.md
│ │ │ │ └── get_model_parents.md
│ │ │ ├── lsp
│ │ │ │ ├── args
│ │ │ │ │ ├── column_name.md
│ │ │ │ │ └── model_id.md
│ │ │ │ └── get_column_lineage.md
│ │ │ ├── prompts.py
│ │ │ └── semantic_layer
│ │ │ ├── get_dimensions.md
│ │ │ ├── get_entities.md
│ │ │ ├── get_metrics_compiled_sql.md
│ │ │ ├── list_metrics.md
│ │ │ └── query_metrics.md
│ │ ├── py.typed
│ │ ├── semantic_layer
│ │ │ ├── client.py
│ │ │ ├── gql
│ │ │ │ ├── gql_request.py
│ │ │ │ └── gql.py
│ │ │ ├── levenshtein.py
│ │ │ ├── tools.py
│ │ │ └── types.py
│ │ ├── sql
│ │ │ └── tools.py
│ │ ├── telemetry
│ │ │ └── logging.py
│ │ ├── tools
│ │ │ ├── annotations.py
│ │ │ ├── definitions.py
│ │ │ ├── policy.py
│ │ │ ├── register.py
│ │ │ ├── tool_names.py
│ │ │ └── toolsets.py
│ │ └── tracking
│ │ └── tracking.py
│ └── remote_mcp
│ ├── __init__.py
│ └── session.py
├── Taskfile.yml
├── tests
│ ├── __init__.py
│ ├── env_vars.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── dbt_codegen
│ │ │ ├── __init__.py
│ │ │ └── test_dbt_codegen.py
│ │ ├── discovery
│ │ │ └── test_discovery.py
│ │ ├── initialization
│ │ │ ├── __init__.py
│ │ │ └── test_initialization.py
│ │ ├── lsp
│ │ │ └── test_lsp_connection.py
│ │ ├── remote_mcp
│ │ │ └── test_remote_mcp.py
│ │ ├── remote_tools
│ │ │ └── test_remote_tools.py
│ │ ├── semantic_layer
│ │ │ └── test_semantic_layer.py
│ │ └── tracking
│ │ └── test_tracking.py
│ ├── mocks
│ │ └── config.py
│ └── unit
│ ├── __init__.py
│ ├── config
│ │ ├── __init__.py
│ │ ├── test_config.py
│ │ └── test_transport.py
│ ├── dbt_admin
│ │ ├── __init__.py
│ │ ├── test_client.py
│ │ ├── test_error_fetcher.py
│ │ └── test_tools.py
│ ├── dbt_cli
│ │ ├── __init__.py
│ │ ├── test_cli_integration.py
│ │ └── test_tools.py
│ ├── dbt_codegen
│ │ ├── __init__.py
│ │ └── test_tools.py
│ ├── discovery
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_exposures_fetcher.py
│ │ └── test_sources_fetcher.py
│ ├── lsp
│ │ ├── __init__.py
│ │ ├── test_lsp_client.py
│ │ ├── test_lsp_connection.py
│ │ └── test_lsp_tools.py
│ ├── oauth
│ │ ├── test_credentials_provider.py
│ │ ├── test_fastapi_app_pagination.py
│ │ └── test_token.py
│ ├── tools
│ │ ├── test_disable_tools.py
│ │ ├── test_tool_names.py
│ │ ├── test_tool_policies.py
│ │ └── test_toolsets.py
│ └── tracking
│ └── test_tracking.py
├── ui
│ ├── .gitignore
│ ├── assets
│ │ ├── dbt_logo BLK.svg
│ │ └── dbt_logo WHT.svg
│ ├── eslint.config.js
│ ├── index.html
│ ├── package.json
│ ├── pnpm-lock.yaml
│ ├── pnpm-workspace.yaml
│ ├── README.md
│ ├── src
│ │ ├── App.css
│ │ ├── App.tsx
│ │ ├── global.d.ts
│ │ ├── index.css
│ │ ├── main.tsx
│ │ └── vite-env.d.ts
│ ├── tsconfig.app.json
│ ├── tsconfig.json
│ ├── tsconfig.node.json
│ └── vite.config.ts
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/tests/integration/lsp/test_lsp_connection.py:
--------------------------------------------------------------------------------
```python
"""Integration-style tests for LSP connection using real instances instead of mocks.
These tests use real sockets, asyncio primitives, and actual data flow
to provide more realistic test coverage compared to heavily mocked unit tests.
"""
import asyncio
import json
import socket
import pytest
from dbt_mcp.lsp.lsp_connection import (
LSPConnection,
LspEventName,
JsonRpcMessage,
)
class TestRealSocketOperations:
"""Tests using real sockets to verify actual network communication."""
def test_setup_socket_real(self, tmp_path):
"""Test socket setup with real socket binding."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Use real socket
conn.setup_socket()
try:
# Verify real socket was created and bound
assert conn._socket is not None
assert isinstance(conn._socket, socket.socket)
assert conn.port > 0 # OS assigned a port
assert conn.host == "127.0.0.1"
# Verify socket is actually listening
sockname = conn._socket.getsockname()
assert sockname[0] == "127.0.0.1"
assert sockname[1] == conn.port
finally:
# Cleanup
if conn._socket:
conn._socket.close()
def test_socket_reuse_address(self, tmp_path):
"""Test that SO_REUSEADDR is set on real socket."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
conn.setup_socket()
try:
# Verify SO_REUSEADDR is set (value varies by platform, just check it's non-zero)
reuse = conn._socket.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
assert reuse != 0
finally:
if conn._socket:
conn._socket.close()
@pytest.mark.asyncio
async def test_socket_accept_with_real_client(self, tmp_path):
"""Test socket accept with real client connection."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test", connection_timeout=2.0)
conn.setup_socket()
try:
# Create real client socket that connects to the server
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
async def client_connect():
await asyncio.sleep(0.1) # Let server start listening
await asyncio.get_running_loop().run_in_executor(
None, client_socket.connect, (conn.host, conn.port)
)
async def server_accept():
conn._socket.settimeout(conn.connection_timeout)
connection, addr = await asyncio.get_running_loop().run_in_executor(
None, conn._socket.accept
)
return connection, addr
# Run both concurrently
client_task = asyncio.create_task(client_connect())
server_result = await server_accept()
await client_task
connection, client_addr = server_result
assert connection is not None
assert client_addr[0] in ("127.0.0.1", "::1") # IPv4 or IPv6 localhost
# Cleanup
connection.close()
client_socket.close()
finally:
if conn._socket:
conn._socket.close()
class TestRealAsyncioQueues:
"""Tests using real asyncio queues to verify message queueing."""
def test_send_message_with_real_queue(self, tmp_path):
"""Test message sending with real asyncio queue."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# The _outgoing_queue is already a real asyncio.Queue
assert isinstance(conn._outgoing_queue, asyncio.Queue)
assert conn._outgoing_queue.empty()
# Send a message
message = JsonRpcMessage(id=1, method="test", params={"key": "value"})
conn._send_message(message)
# Verify message was actually queued
assert not conn._outgoing_queue.empty()
data = conn._outgoing_queue.get_nowait()
# Verify LSP protocol format
assert isinstance(data, bytes)
assert b"Content-Length:" in data
assert b"\r\n\r\n" in data
assert b'"jsonrpc"' in data
assert b'"2.0"' in data
assert b'"test"' in data
def test_multiple_messages_queue_order(self, tmp_path):
"""Test that multiple messages maintain FIFO order in real queue."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Send multiple messages
msg1 = JsonRpcMessage(id=1, method="first")
msg2 = JsonRpcMessage(id=2, method="second")
msg3 = JsonRpcMessage(id=3, method="third")
conn._send_message(msg1)
conn._send_message(msg2)
conn._send_message(msg3)
# Verify queue size
assert conn._outgoing_queue.qsize() == 3
# Verify FIFO order
data1 = conn._outgoing_queue.get_nowait()
data2 = conn._outgoing_queue.get_nowait()
data3 = conn._outgoing_queue.get_nowait()
assert b'"first"' in data1
assert b'"second"' in data2
assert b'"third"' in data3
# Queue should be empty
assert conn._outgoing_queue.empty()
class TestRealAsyncioFutures:
"""Tests using real asyncio futures to verify async behavior."""
@pytest.mark.asyncio
async def test_handle_response_with_real_future(self, tmp_path):
"""Test handling response with real asyncio future."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Create real future in current event loop
future = asyncio.get_running_loop().create_future()
conn.state.pending_requests[42] = future
# Handle response in the same loop
message = JsonRpcMessage(id=42, result={"success": True, "data": "test"})
conn._handle_incoming_message(message)
# Wait for future to be resolved (should be immediate via call_soon_threadsafe)
result = await asyncio.wait_for(future, timeout=1.0)
assert result == {"success": True, "data": "test"}
assert 42 not in conn.state.pending_requests
@pytest.mark.asyncio
async def test_handle_error_with_real_future(self, tmp_path):
"""Test handling error response with real asyncio future."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Create real future
future = asyncio.get_running_loop().create_future()
conn.state.pending_requests[42] = future
# Handle error response
message = JsonRpcMessage(
id=42, error={"code": -32601, "message": "Method not found"}
)
conn._handle_incoming_message(message)
# Future should be rejected with exception
with pytest.raises(RuntimeError, match="LSP error"):
await asyncio.wait_for(future, timeout=1.0)
assert 42 not in conn.state.pending_requests
@pytest.mark.asyncio
async def test_notification_futures_real(self, tmp_path):
"""Test waiting for notifications with real futures."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Register to wait for a notification
future = conn.wait_for_notification(LspEventName.compileComplete)
# Verify it's a real future
assert isinstance(future, asyncio.Future)
assert not future.done()
# Simulate receiving the notification
message = JsonRpcMessage(
method="dbt/lspCompileComplete", params={"success": True, "errors": []}
)
conn._handle_incoming_message(message)
# Wait for notification
result = await asyncio.wait_for(future, timeout=1.0)
assert result == {"success": True, "errors": []}
assert conn.state.compiled is True
class TestRealSocketCommunication:
"""Tests using real socket pairs to verify end-to-end communication."""
@pytest.mark.asyncio
async def test_socket_pair_communication(self, tmp_path):
"""Test bidirectional communication using socketpair."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Create a real socket pair (connected sockets)
server_socket, client_socket = socket.socketpair()
conn._connection = server_socket
try:
# Send a message through the connection
message = JsonRpcMessage(id=1, method="test", params={"foo": "bar"})
conn._send_message(message)
# Get the data from the queue
data = conn._outgoing_queue.get_nowait()
# Actually send it through the socket
await asyncio.get_running_loop().run_in_executor(
None, server_socket.sendall, data
)
# Read it back on the client side
received_data = await asyncio.get_running_loop().run_in_executor(
None, client_socket.recv, 4096
)
# Verify we got the complete LSP message
assert b"Content-Length:" in received_data
assert b"\r\n\r\n" in received_data
assert b'"test"' in received_data
assert b'"foo"' in received_data
assert b'"bar"' in received_data
finally:
server_socket.close()
client_socket.close()
@pytest.mark.asyncio
async def test_message_roundtrip_real(self, tmp_path):
"""Test complete message send and parse roundtrip."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Create socket pair
server_socket, client_socket = socket.socketpair()
conn._connection = server_socket
try:
# Original message
original_message = JsonRpcMessage(
id=123,
method="textDocument/completion",
params={
"textDocument": {"uri": "file:///test.sql"},
"position": {"line": 10, "character": 5},
},
)
# Send through connection
conn._send_message(original_message)
data = conn._outgoing_queue.get_nowait()
await asyncio.get_running_loop().run_in_executor(
None, server_socket.sendall, data
)
# Receive on client side
received_data = await asyncio.get_running_loop().run_in_executor(
None, client_socket.recv, 4096
)
# Parse it back
parsed_message, remaining = conn._parse_message(received_data)
# Verify roundtrip integrity
assert parsed_message is not None
assert parsed_message.id == original_message.id
assert parsed_message.method == original_message.method
assert parsed_message.params == original_message.params
assert remaining == b""
finally:
server_socket.close()
client_socket.close()
@pytest.mark.asyncio
async def test_multiple_messages_streaming(self, tmp_path):
"""Test streaming multiple messages through real socket."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Create socket pair
server_socket, client_socket = socket.socketpair()
conn._connection = server_socket
try:
# Set non-blocking for client to avoid hangs
client_socket.setblocking(False)
# Send multiple messages
messages = [
JsonRpcMessage(id=1, method="initialize"),
JsonRpcMessage(method="initialized", params={}),
JsonRpcMessage(id=2, method="textDocument/didOpen"),
]
for msg in messages:
conn._send_message(msg)
data = conn._outgoing_queue.get_nowait()
await asyncio.get_running_loop().run_in_executor(
None, server_socket.sendall, data
)
# Receive all data on client side with timeout
received_data = b""
client_socket.setblocking(True)
client_socket.settimeout(1.0)
try:
while True:
chunk = await asyncio.get_running_loop().run_in_executor(
None, client_socket.recv, 4096
)
if not chunk:
break
received_data += chunk
# Try to parse - if we have all 3 messages, we're done
temp_buffer = received_data
temp_count = 0
while True:
msg, temp_buffer = conn._parse_message(temp_buffer)
if msg is None:
break
temp_count += 1
if temp_count >= 3:
break
except TimeoutError:
pass # Expected when all data is received
# Parse all messages
buffer = received_data
parsed_messages = []
while buffer:
msg, buffer = conn._parse_message(buffer)
if msg is None:
break
parsed_messages.append(msg)
# Verify all messages were received and parsed correctly
assert len(parsed_messages) == 3
assert parsed_messages[0].id == 1
assert parsed_messages[0].method == "initialize"
assert parsed_messages[1].method == "initialized"
assert parsed_messages[2].id == 2
finally:
server_socket.close()
client_socket.close()
class TestRealMessageParsing:
"""Tests parsing with real byte streams."""
def test_parse_real_lsp_message(self, tmp_path):
"""Test parsing a real LSP protocol message."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Create a real LSP message exactly as it would be sent
content = json.dumps(
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"capabilities": {
"textDocumentSync": 2,
"completionProvider": {"triggerCharacters": ["."]},
}
},
}
)
content_bytes = content.encode("utf-8")
header = f"Content-Length: {len(content_bytes)}\r\n\r\n"
full_message = header.encode("utf-8") + content_bytes
# Parse it
message, remaining = conn._parse_message(full_message)
assert message is not None
assert message.id == 1
assert "capabilities" in message.result
assert message.result["capabilities"]["textDocumentSync"] == 2
assert remaining == b""
def test_parse_chunked_message_real(self, tmp_path):
"""Test parsing message that arrives in multiple chunks."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Create a message
content = json.dumps({"jsonrpc": "2.0", "id": 1, "method": "test"})
content_bytes = content.encode("utf-8")
header = f"Content-Length: {len(content_bytes)}\r\n\r\n"
full_message = header.encode("utf-8") + content_bytes
# Split into chunks (simulate network chunking)
chunk1 = full_message[:20]
chunk2 = full_message[20:40]
chunk3 = full_message[40:]
# Parse first chunk - should be incomplete
msg1, buffer = conn._parse_message(chunk1)
assert msg1 is None
assert buffer == chunk1
# Add second chunk - still incomplete
buffer += chunk2
msg2, buffer = conn._parse_message(buffer)
assert msg2 is None
# Add final chunk - should complete
buffer += chunk3
msg3, buffer = conn._parse_message(buffer)
assert msg3 is not None
assert msg3.id == 1
assert msg3.method == "test"
assert buffer == b""
class TestRealConcurrentOperations:
"""Tests with real concurrent async operations."""
@pytest.mark.asyncio
async def test_concurrent_request_futures(self, tmp_path):
"""Test handling multiple concurrent requests with real futures."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Create multiple real futures for concurrent requests
futures = {}
for i in range(10):
future = asyncio.get_running_loop().create_future()
futures[i] = future
conn.state.pending_requests[i] = future
# Simulate responses arriving concurrently
async def respond(request_id: int, delay: float):
await asyncio.sleep(delay)
message = JsonRpcMessage(id=request_id, result={"request_id": request_id})
conn._handle_incoming_message(message)
# Start all responses with random delays
response_tasks = [asyncio.create_task(respond(i, i * 0.01)) for i in range(10)]
# Wait for all futures to resolve
results = await asyncio.gather(*[futures[i] for i in range(10)])
# Verify all completed correctly
assert len(results) == 10
for i, result in enumerate(results):
assert result["request_id"] == i
# All requests should be removed
assert len(conn.state.pending_requests) == 0
# Cleanup
await asyncio.gather(*response_tasks)
@pytest.mark.asyncio
async def test_concurrent_notifications_real(self, tmp_path):
"""Test multiple futures waiting for the same notification."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Create multiple waiters for the same event
future1 = conn.wait_for_notification(LspEventName.compileComplete)
future2 = conn.wait_for_notification(LspEventName.compileComplete)
future3 = conn.wait_for_notification(LspEventName.compileComplete)
# All should be real futures
assert all(isinstance(f, asyncio.Future) for f in [future1, future2, future3])
# Send the notification
message = JsonRpcMessage(
method="dbt/lspCompileComplete", params={"status": "success"}
)
conn._handle_incoming_message(message)
# All futures should resolve
results = await asyncio.wait_for(
asyncio.gather(future1, future2, future3), timeout=1.0
)
assert all(r == {"status": "success"} for r in results)
class TestRealStateManagement:
"""Tests using real state objects."""
def test_real_state_initialization(self, tmp_path):
"""Test that connection uses real LspConnectionState."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Verify state is a real instance
from dbt_mcp.lsp.lsp_connection import LspConnectionState
assert isinstance(conn.state, LspConnectionState)
assert conn.state.initialized is False
assert conn.state.compiled is False
assert isinstance(conn.state.pending_requests, dict)
assert isinstance(conn.state.pending_notifications, dict)
def test_real_request_id_generation(self, tmp_path):
"""Test real request ID counter."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Get sequential IDs
ids = [conn.state.get_next_request_id() for _ in range(100)]
# Verify they're sequential (starting point may vary if other tests ran)
# Just verify they are sequential and unique
first_id = ids[0]
assert ids[-1] == first_id + 99
assert ids == list(range(first_id, first_id + 100))
assert len(set(ids)) == 100 # All unique
def test_real_state_updates(self, tmp_path):
"""Test that state updates work with real instances."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Update state
conn.state.initialized = True
conn.state.capabilities = {"test": True}
conn.state.compiled = True
# Verify updates persist
assert conn.state.initialized is True
assert conn.state.capabilities == {"test": True}
assert conn.state.compiled is True
```
--------------------------------------------------------------------------------
/ui/src/App.css:
--------------------------------------------------------------------------------
```css
/* Reset and base styles */
* {
box-sizing: border-box;
}
body {
width: 100%;
margin: 0;
padding: 0;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Roboto', 'Oxygen',
'Ubuntu', 'Cantarell', 'Fira Sans', 'Droid Sans', 'Helvetica Neue',
sans-serif;
line-height: 1.6;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
background-color: #fff;
color: #1c1a19;
}
p {
margin-bottom: 1rem;
}
@media (prefers-color-scheme: dark) {
body {
background-color: #1c1a19;
color: #f6f6f6;
}
p {
margin-bottom: 1rem;
}
}
/* Logo */
.logo-container {
position: fixed;
top: 1rem;
left: 1rem;
z-index: 1000;
}
.logo {
height: 2rem;
width: auto;
transition: opacity 0.2s ease-in-out;
}
.logo-light {
display: block;
}
.logo-dark {
display: none;
}
/* Main layout */
.app-container {
min-height: 100vh;
display: flex;
justify-content: center;
align-items: flex-start;
padding: 2rem 1rem;
}
.app-content {
width: 100%;
max-width: 600px;
display: flex;
flex-direction: column;
gap: 2rem;
}
/* Header */
.app-header {
text-align: center;
margin-bottom: 1rem;
}
.app-header h1 {
margin: 0 0 0.5rem 0;
font-size: 2.5rem;
font-weight: 700;
letter-spacing: -0.025em;
}
.app-header p {
margin: 0;
font-size: 1.125rem;
opacity: 0.7;
}
/* Sections */
section {
background: #fff;
border-radius: 12px;
border: 1px solid #ebe9e9;
overflow: visible;
box-shadow: 0 1px 3px 0 rgba(0, 0, 0, 0.1), 0 1px 2px 0 rgba(0, 0, 0, 0.06);
}
/* Specific overflow handling for sections with dropdowns */
.project-selection-section {
overflow: visible;
}
.section-header {
padding: 1.5rem 1.5rem 0 1.5rem;
border-bottom: 1px solid #ebe9e9;
margin-bottom: 1.5rem;
}
.section-header h2 {
margin: 0 0 0.5rem 0;
font-size: 1.5rem;
font-weight: 600;
}
.section-header h3 {
margin: 0 0 0.5rem 0;
font-size: 1.25rem;
font-weight: 600;
}
.section-header p {
margin: 0 0 1.5rem 0;
opacity: 0.7;
font-size: 0.875rem;
}
/* Form content */
.form-content {
padding: 0 1.5rem 1.5rem 1.5rem;
}
.form-group {
margin-bottom: 1rem;
}
.form-label {
display: block;
margin-bottom: 0.5rem;
font-weight: 500;
font-size: 0.875rem;
}
.form-select {
width: 100%;
padding: 0.875rem 3rem 0.875rem 1rem;
border: 1.5px solid #ebe9e9;
border-radius: 12px;
font-size: 1rem;
font-weight: 500;
background-color: #fff;
background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%231c1a19' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M6 8l4 4 4-4'/%3e%3c/svg%3e");
background-position: right 0.75rem center;
background-repeat: no-repeat;
background-size: 1.25rem 1.25rem;
cursor: pointer;
transition: all 0.2s ease-in-out;
appearance: none;
-webkit-appearance: none;
-moz-appearance: none;
}
.form-select:focus {
outline: none;
border-color: #3b82f6;
background-color: white;
box-shadow:
0 0 0 3px rgba(59, 130, 246, 0.12),
0 4px 6px -1px rgba(0, 0, 0, 0.1),
0 2px 4px -1px rgba(0, 0, 0, 0.06);
transform: translateY(-1px);
}
.form-select:hover:not(:focus) {
border-color: #9ca3af;
background-color: white;
box-shadow:
0 2px 4px -1px rgba(0, 0, 0, 0.1),
0 1px 2px -1px rgba(0, 0, 0, 0.06);
}
.form-select:disabled {
background-color: #f3f4f6;
border-color: #e5e7eb;
cursor: not-allowed;
opacity: 0.7;
}
/* Custom dropdown */
.custom-dropdown {
position: relative;
width: 100%;
z-index: 999999;
/* Ensure proper stacking context */
isolation: isolate;
}
.dropdown-trigger {
width: 100%;
padding: 0.875rem 3rem 0.875rem 1rem;
border: 1.5px solid #ebe9e9;
border-radius: 12px;
font-size: 1rem;
font-weight: 500;
background-color: #fff;
background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%231c1a19' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M6 8l4 4 4-4'/%3e%3c/svg%3e");
background-position: right 0.75rem center;
background-repeat: no-repeat;
background-size: 1.25rem 1.25rem;
cursor: pointer;
transition: all 0.2s ease-in-out;
text-align: left;
color: #1c1a19;
}
.dropdown-trigger:focus {
outline: none;
border-color: #3b82f6;
background-color: white;
box-shadow:
0 0 0 3px rgba(59, 130, 246, 0.12),
0 4px 6px -1px rgba(0, 0, 0, 0.1),
0 2px 4px -1px rgba(0, 0, 0, 0.06);
transform: translateY(-1px);
}
.dropdown-trigger:hover:not(:focus) {
border-color: #9ca3af;
background-color: white;
box-shadow:
0 2px 4px -1px rgba(0, 0, 0, 0.1),
0 1px 2px -1px rgba(0, 0, 0, 0.06);
}
.dropdown-trigger.open {
border-color: #3b82f6;
background-color: white;
background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%233b82f6' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M14 12l-4-4-4 4'/%3e%3c/svg%3e");
box-shadow:
0 0 0 3px rgba(59, 130, 246, 0.12),
0 4px 6px -1px rgba(0, 0, 0, 0.1),
0 2px 4px -1px rgba(0, 0, 0, 0.06);
border-bottom-left-radius: 4px;
border-bottom-right-radius: 4px;
}
.dropdown-trigger.placeholder {
color: #9ca3af;
font-weight: 400;
}
.dropdown-options {
position: absolute;
top: 100%;
left: 0;
right: 0;
background: white;
border: 1.5px solid;
border-top: none;
border-bottom-left-radius: 12px;
border-bottom-right-radius: 12px;
box-shadow:
0 0 0 3px rgba(59, 130, 246, 0.12),
0 10px 15px -3px rgba(0, 0, 0, 0.1),
0 4px 6px -2px rgba(0, 0, 0, 0.05);
z-index: 999999;
animation: dropdownSlideIn 0.15s ease-out;
/* Ensure proper rendering and isolation */
isolation: isolate;
contain: layout style;
/* Add scrolling for long lists */
max-height: 300px;
overflow-y: auto;
}
/* Removed dropdown-options-fixed - using simple absolute positioning */
@keyframes dropdownSlideIn {
0% {
opacity: 0;
transform: translateY(-8px);
}
100% {
opacity: 1;
transform: translateY(0);
}
}
.dropdown-option {
padding: 0.875rem 1rem;
cursor: pointer;
transition: all 0.15s ease-in-out;
border: none;
background: none;
width: 100%;
text-align: left;
font-size: 1rem;
color: #374151;
display: flex;
flex-direction: column;
gap: 0.125rem;
}
.dropdown-option:hover {
background-color: #f8fafc;
color: #1f2937;
}
.dropdown-option:focus {
outline: none;
background-color: #eff6ff;
color: #1e40af;
}
.dropdown-option:active {
background-color: #dbeafe;
}
.dropdown-option.selected {
background-color: #f3f4f6;
color: #374151;
}
.dropdown-option.selected:hover {
background-color: #e5e7eb;
}
.option-primary {
font-weight: 500;
line-height: 1.4;
}
.option-secondary {
font-size: 0.875rem;
opacity: 0.7;
font-weight: 400;
}
.dropdown-option.selected .option-secondary {
opacity: 0.9;
}
/* Dropdown scrollbar styling */
.dropdown-options::-webkit-scrollbar {
width: 8px;
}
.dropdown-options::-webkit-scrollbar-track {
background: #f8fafc;
border-radius: 4px;
}
.dropdown-options::-webkit-scrollbar-thumb {
background: #cbd5e1;
border-radius: 4px;
border: 1px solid #f8fafc;
}
.dropdown-options::-webkit-scrollbar-thumb:hover {
background: #94a3b8;
}
/* Loading state */
.loading-state {
display: flex;
align-items: center;
gap: 0.75rem;
padding: 1rem;
background-color: #fff;
border: 1px solid #ebe9e9;
border-radius: 8px;
margin: 1rem 1.5rem;
}
.spinner {
width: 20px;
height: 20px;
border: 2px solid #ebe9e9;
border-top: 2px solid #1c1a19;
border-radius: 50%;
animation: spin 1s linear infinite;
}
@keyframes spin {
0% {
transform: rotate(0deg);
}
100% {
transform: rotate(360deg);
}
}
/* Error state */
.error-state {
padding: 1rem;
background-color: #fef2f2;
border: 1px solid #fecaca;
border-radius: 8px;
margin: 1rem 1.5rem;
}
.error-state strong {
display: block;
margin-bottom: 0.25rem;
font-weight: 600;
}
.error-state p {
margin: 0;
font-size: 0.875rem;
opacity: 0.8;
}
/* OAuth Error Section */
.error-section {
background: #fff;
border: 1px solid #fecaca;
}
.error-details {
padding: 0 1.5rem 1.5rem 1.5rem;
display: flex;
flex-direction: column;
gap: 1rem;
}
.error-item {
display: flex;
flex-direction: column;
gap: 0.5rem;
}
.error-item strong {
font-weight: 500;
font-size: 0.875rem;
color: #991b1b;
}
.error-code {
display: inline-block;
padding: 0.5rem 0.75rem;
background-color: #fef2f2;
border: 1px solid #fecaca;
border-radius: 6px;
font-family: 'SF Mono', Monaco, 'Cascadia Code', 'Roboto Mono', Consolas, 'Courier New', monospace;
font-size: 0.875rem;
color: #991b1b;
font-weight: 500;
}
.error-description {
margin: 0;
padding: 0.75rem;
background-color: #fef2f2;
border: 1px solid #fecaca;
border-radius: 6px;
color: #991b1b;
font-size: 0.875rem;
line-height: 1.5;
}
.error-actions {
margin-top: 0.5rem;
padding: 1rem;
background-color: #fffbeb;
border: 1px solid #fde68a;
border-radius: 6px;
}
.error-actions p {
margin: 0;
color: #92400e;
font-size: 0.875rem;
line-height: 1.5;
}
/* Context details */
.context-details {
padding: 0 1.5rem 1.5rem 1.5rem;
display: flex;
flex-direction: column;
gap: 1rem;
}
.context-item {
display: flex;
flex-direction: column;
gap: 0.25rem;
}
.context-item strong {
font-weight: 500;
font-size: 0.875rem;
opacity: 0.7;
}
.environment-details {
display: flex;
align-items: center;
gap: 0.5rem;
}
.env-name {
font-weight: 500;
}
.env-type {
font-size: 0.875rem;
opacity: 0.6;
}
/* Actions section */
.actions-section {
padding: 1.5rem;
text-align: center;
background-color: #f9fafb;
}
/* Button container */
.button-container {
display: flex;
justify-content: center;
align-items: center;
}
/* Button */
.primary-button {
display: inline-flex;
align-items: center;
padding: 0.75rem 1.5rem;
background-color: #1c1a19;
color: #fff;
border: 1px solid #1c1a19;
border-radius: 8px;
font-size: 1rem;
font-weight: 500;
cursor: pointer;
transition:
background-color 0.15s ease-in-out,
transform 0.15s ease-in-out,
opacity 0.15s ease-in-out;
}
.primary-button:hover {
background-color: #2d2a28;
border-color: #2d2a28;
transform: translateY(-1px);
}
.primary-button:focus {
outline: none;
box-shadow: 0 0 0 3px rgba(28, 26, 25, 0.2);
border-color: #2d2a28;
}
.primary-button:active {
transform: translateY(0);
background-color: #3d3a38;
}
.primary-button:disabled {
background-color: #d1d5db;
border-color: #d1d5db;
color: #6b7280;
cursor: not-allowed;
transform: none;
box-shadow: none;
opacity: 0.65;
}
.primary-button:disabled:hover,
.primary-button:disabled:focus,
.primary-button:disabled:active {
background-color: #d1d5db;
border-color: #d1d5db;
color: #6b7280;
transform: none;
box-shadow: none;
}
/* Completion section */
.completion-section {
padding: 0;
}
.completion-card {
padding: 2rem 1.5rem;
text-align: center;
}
.completion-card h2 {
margin: 0 0 1rem 0;
font-size: 1.75rem;
font-weight: 600;
}
.completion-card p {
margin: 0;
font-size: 1rem;
line-height: 1.6;
}
/* Response section */
.response-section {
padding: 1.5rem;
}
.response-text {
background-color: #fff;
border: 1px solid #ebe9e9;
border-radius: 8px;
padding: 1rem;
font-family: 'SF Mono', Monaco, 'Cascadia Code', 'Roboto Mono', Consolas, 'Courier New', monospace;
font-size: 0.875rem;
line-height: 1.5;
white-space: pre-wrap;
word-break: break-word;
overflow-x: auto;
}
/* Responsive design */
@media (max-width: 768px) {
.logo-container {
top: 0.5rem;
left: 0.5rem;
}
.logo {
height: 1.5rem;
}
.app-container {
padding: 1rem 0.5rem;
}
.app-content {
max-width: 100%;
}
.app-header h1 {
font-size: 2rem;
}
.section-header {
padding: 1rem 1rem 0 1rem;
margin-bottom: 1rem;
}
.form-content,
.context-details,
.actions-section,
.response-section {
padding-left: 1rem;
padding-right: 1rem;
}
.loading-state,
.error-state {
margin-left: 1rem;
margin-right: 1rem;
}
}
@media (max-width: 480px) {
.logo {
height: 1.25rem;
}
.app-container {
padding: 0.5rem 0.25rem;
}
.app-header h1 {
font-size: 1.75rem;
}
.primary-button {
width: 100%;
}
}
/* Light mode styles */
@media (prefers-color-scheme: light) {
body {
background-color: #fff;
color: #1c1a19;
}
/* Sections */
section {
background: #fff;
border-color: #ebe9e9;
}
.section-header {
border-bottom-color: #ebe9e9;
}
.section-header h2,
.section-header h3 {
color: #1c1a19;
}
.section-header p {
color: #1c1a19;
}
/* Form elements */
.form-label {
color: #1c1a19;
}
.form-select {
background-color: #fff;
border-color: #ebe9e9;
color: #1c1a19;
background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%231c1a19' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M6 8l4 4 4-4'/%3e%3c/svg%3e");
}
.form-select:focus {
background-color: #fff;
border-color: #ebe9e9;
}
.form-select:hover:not(:focus) {
background-color: #fff;
border-color: #ebe9e9;
}
.form-select:disabled {
background-color: #f9f9f9;
border-color: #ebe9e9;
}
/* Custom dropdown */
.dropdown-trigger {
background-color: #fff;
border-color: #ebe9e9;
color: #1c1a19;
background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%231c1a19' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M6 8l4 4 4-4'/%3e%3c/svg%3e");
}
.dropdown-trigger:focus {
background-color: #fff;
border-color: #ebe9e9;
}
.dropdown-trigger:hover:not(:focus) {
background-color: #fff;
border-color: #ebe9e9;
}
.dropdown-trigger.open {
background-color: #fff;
border-color: #ebe9e9;
background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%231c1a19' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M14 12l-4-4-4 4'/%3e%3c/svg%3e");
}
.dropdown-options {
background: #fff;
border-color: #ebe9e9;
}
.dropdown-option {
color: #1c1a19;
}
.dropdown-option:hover {
background-color: #f9f9f9;
color: #1c1a19;
}
.dropdown-option:focus {
background-color: #f9f9f9;
color: #1c1a19;
}
.dropdown-option:active {
background-color: #f3f3f3;
}
.dropdown-option.selected {
background-color: #f9f9f9;
color: #1c1a19;
}
.dropdown-option.selected:hover {
background-color: #f3f3f3;
}
/* Loading state */
.loading-state {
background-color: #fff;
border: 1px solid #ebe9e9;
color: #1c1a19;
}
.spinner {
border-color: #ebe9e9;
border-top-color: #1c1a19;
}
/* Error state */
.error-state {
background-color: #fef2f2;
border-color: #fecaca;
color: #991b1b;
}
.error-state strong {
color: #991b1b;
}
.error-state p {
color: #991b1b;
}
/* Context details */
.context-item strong {
color: #1c1a19;
}
.env-name {
color: #1c1a19;
}
.env-type {
color: #1c1a19;
}
/* Actions section */
.actions-section {
background-color: #fff;
}
/* Response section */
.response-text {
background-color: #fff;
border-color: #ebe9e9;
color: #1c1a19;
}
/* App header */
.app-header h1 {
color: #1c1a19;
}
.app-header p {
color: #1c1a19;
}
/* Button light mode */
.primary-button {
background-color: #1c1a19;
color: #fff;
border-color: #1c1a19;
}
.primary-button:hover {
background-color: #2d2a28;
border-color: #2d2a28;
}
.primary-button:focus {
box-shadow: 0 0 0 3px rgba(28, 26, 25, 0.2);
border-color: #2d2a28;
}
.primary-button:active {
background-color: #3d3a38;
}
.primary-button:disabled {
background-color: #d6d3d1;
border-color: #e7e5e4;
color: #78716c;
opacity: 0.7;
}
.primary-button:disabled:hover,
.primary-button:disabled:focus,
.primary-button:disabled:active {
background-color: #d6d3d1;
border-color: #e7e5e4;
color: #78716c;
}
}
/* Dark mode styles */
@media (prefers-color-scheme: dark) {
/* Logo theme switching */
.logo-light {
display: none;
}
.logo-dark {
display: block;
}
/* Sections */
section {
background: #1c1a19;
border-color: #4e4a49;
}
.section-header {
border-bottom-color: #4e4a49;
}
.section-header h2,
.section-header h3 {
color: #f6f6f6;
}
.section-header p {
color: #f6f6f6;
}
/* Form elements */
.form-label {
color: #f6f6f6;
}
.form-select {
background-color: #1c1a19;
border-color: #4e4a49;
color: #f6f6f6;
background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%23f6f6f6' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M6 8l4 4 4-4'/%3e%3c/svg%3e");
}
.form-select:focus {
background-color: #1c1a19;
border-color: #4e4a49;
}
.form-select:hover:not(:focus) {
background-color: #1c1a19;
border-color: #4e4a49;
}
.form-select:disabled {
background-color: #374151;
border-color: #4e4a49;
}
/* Custom dropdown */
.dropdown-trigger {
background-color: #1c1a19;
border-color: #4e4a49;
color: #f6f6f6;
background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%23f6f6f6' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M6 8l4 4 4-4'/%3e%3c/svg%3e");
}
.dropdown-trigger:focus {
background-color: #1c1a19;
border-color: #4e4a49;
}
.dropdown-trigger:hover:not(:focus) {
background-color: #1c1a19;
border-color: #4e4a49;
}
.dropdown-trigger.open {
background-color: #1c1a19;
border-color: #4e4a49;
background-image: url("data:image/svg+xml,%3csvg xmlns='http://www.w3.org/2000/svg' fill='none' viewBox='0 0 20 20'%3e%3cpath stroke='%23f6f6f6' stroke-linecap='round' stroke-linejoin='round' stroke-width='1.5' d='M14 12l-4-4-4 4'/%3e%3c/svg%3e");
}
.dropdown-trigger.placeholder {
color: #9ca3af;
}
.dropdown-options {
background: #1c1a19;
border-color: #4e4a49;
}
.dropdown-option {
color: #f6f6f6;
}
.dropdown-option:hover {
background-color: #374151;
color: #f6f6f6;
}
.dropdown-option:focus {
background-color: #374151;
color: #f6f6f6;
}
.dropdown-option:active {
background-color: #4b5563;
}
.dropdown-option.selected {
background-color: #4e4a49;
color: #f6f6f6;
}
.dropdown-option.selected:hover {
background-color: #6b7280;
}
/* Dropdown scrollbar styling for dark mode */
.dropdown-options::-webkit-scrollbar-track {
background: #374151;
}
.dropdown-options::-webkit-scrollbar-thumb {
background: #6b7280;
border: 1px solid #374151;
}
.dropdown-options::-webkit-scrollbar-thumb:hover {
background: #9ca3af;
}
/* Loading state */
.loading-state {
background-color: #1c1a19;
border: 1px solid #4e4a49;
color: #f6f6f6;
}
.spinner {
border-color: #4e4a49;
border-top-color: #f6f6f6;
}
/* Error state */
.error-state {
background-color: #7f1d1d;
border-color: #4e4a49;
color: #f6f6f6;
}
.error-state strong {
color: #f6f6f6;
}
.error-state p {
color: #f6f6f6;
}
/* OAuth Error Section Dark Mode */
.error-section {
background: #1c1a19;
border-color: #991b1b;
}
.error-item strong {
color: #fca5a5;
}
.error-code {
background-color: #450a0a;
border-color: #7f1d1d;
color: #fca5a5;
}
.error-description {
background-color: #450a0a;
border-color: #7f1d1d;
color: #fca5a5;
}
.error-actions {
background-color: #422006;
border-color: #92400e;
}
.error-actions p {
color: #fde68a;
}
/* Context details */
.context-item strong {
color: #f6f6f6;
}
.env-name {
color: #f6f6f6;
}
.env-type {
color: #f6f6f6;
}
/* Actions section */
.actions-section {
background-color: #374151;
}
/* Response section */
.response-text {
background-color: #374151;
border-color: #4e4a49;
color: #f6f6f6;
}
/* App header */
.app-header h1 {
color: #f6f6f6;
}
.app-header p {
color: #f6f6f6;
}
/* Button dark mode */
.primary-button {
background-color: #fdfdfd;
color: #374151;
border-color: #4e4a49;
}
.primary-button:hover {
background-color: #f3f4f6;
border-color: #6b7280;
}
.primary-button:focus {
box-shadow: 0 0 0 3px rgba(246, 246, 246, 0.1);
border-color: #9ca3af;
}
.primary-button:active {
background-color: #e5e7eb;
}
.primary-button:disabled {
background-color: #2f2f30;
border-color: #3f3f40;
color: #8b949e;
opacity: 0.55;
}
.primary-button:disabled:hover,
.primary-button:disabled:focus,
.primary-button:disabled:active {
background-color: #2f2f30;
border-color: #3f3f40;
color: #8b949e;
}
}
```
--------------------------------------------------------------------------------
/tests/unit/discovery/test_exposures_fetcher.py:
--------------------------------------------------------------------------------
```python
from unittest.mock import patch
import pytest
from dbt_mcp.discovery.client import ExposuresFetcher
@pytest.fixture
def exposures_fetcher(mock_api_client):
return ExposuresFetcher(api_client=mock_api_client)
async def test_fetch_exposures_single_page(exposures_fetcher, mock_api_client):
mock_response = {
"data": {
"environment": {
"definition": {
"exposures": {
"pageInfo": {"hasNextPage": False, "endCursor": None},
"edges": [
{
"node": {
"name": "test_exposure",
"uniqueId": "exposure.test.test_exposure",
"exposureType": "application",
"maturity": "high",
"ownerEmail": "[email protected]",
"ownerName": "Test Owner",
"url": "https://example.com",
"meta": {},
"freshnessStatus": "Unknown",
"description": "Test exposure",
"label": None,
"parents": [
{"uniqueId": "model.test.parent_model"}
],
}
}
],
}
}
}
}
}
mock_api_client.execute_query.return_value = mock_response
with patch("dbt_mcp.discovery.client.raise_gql_error"):
result = await exposures_fetcher.fetch_exposures()
assert len(result) == 1
assert result[0]["name"] == "test_exposure"
assert result[0]["uniqueId"] == "exposure.test.test_exposure"
assert result[0]["exposureType"] == "application"
assert result[0]["maturity"] == "high"
assert result[0]["ownerEmail"] == "[email protected]"
assert result[0]["ownerName"] == "Test Owner"
assert result[0]["url"] == "https://example.com"
assert result[0]["meta"] == {}
assert result[0]["freshnessStatus"] == "Unknown"
assert result[0]["description"] == "Test exposure"
assert result[0]["parents"] == [{"uniqueId": "model.test.parent_model"}]
mock_api_client.execute_query.assert_called_once()
args, kwargs = mock_api_client.execute_query.call_args
assert args[1]["environmentId"] == 123
assert args[1]["first"] == 100
async def test_fetch_exposures_multiple_pages(exposures_fetcher, mock_api_client):
page1_response = {
"data": {
"environment": {
"definition": {
"exposures": {
"pageInfo": {"hasNextPage": True, "endCursor": "cursor123"},
"edges": [
{
"node": {
"name": "exposure1",
"uniqueId": "exposure.test.exposure1",
"exposureType": "application",
"maturity": "high",
"ownerEmail": "[email protected]",
"ownerName": "Test Owner 1",
"url": "https://example1.com",
"meta": {},
"freshnessStatus": "Unknown",
"description": "Test exposure 1",
"label": None,
"parents": [],
}
}
],
}
}
}
}
}
page2_response = {
"data": {
"environment": {
"definition": {
"exposures": {
"pageInfo": {"hasNextPage": False, "endCursor": "cursor456"},
"edges": [
{
"node": {
"name": "exposure2",
"uniqueId": "exposure.test.exposure2",
"exposureType": "dashboard",
"maturity": "medium",
"ownerEmail": "[email protected]",
"ownerName": "Test Owner 2",
"url": "https://example2.com",
"meta": {"key": "value"},
"freshnessStatus": "Fresh",
"description": "Test exposure 2",
"label": "Label 2",
"parents": [
{"uniqueId": "model.test.parent_model2"}
],
}
}
],
}
}
}
}
}
mock_api_client.execute_query.side_effect = [page1_response, page2_response]
with patch("dbt_mcp.discovery.client.raise_gql_error"):
result = await exposures_fetcher.fetch_exposures()
assert len(result) == 2
assert result[0]["name"] == "exposure1"
assert result[1]["name"] == "exposure2"
assert result[1]["meta"] == {"key": "value"}
assert result[1]["label"] == "Label 2"
assert mock_api_client.execute_query.call_count == 2
# Check first call (no cursor)
first_call = mock_api_client.execute_query.call_args_list[0]
assert first_call[0][1]["environmentId"] == 123
assert first_call[0][1]["first"] == 100
assert "after" not in first_call[0][1]
# Check second call (with cursor)
second_call = mock_api_client.execute_query.call_args_list[1]
assert second_call[0][1]["environmentId"] == 123
assert second_call[0][1]["first"] == 100
assert second_call[0][1]["after"] == "cursor123"
async def test_fetch_exposures_empty_response(exposures_fetcher, mock_api_client):
mock_response = {
"data": {
"environment": {
"definition": {
"exposures": {
"pageInfo": {"hasNextPage": False, "endCursor": None},
"edges": [],
}
}
}
}
}
mock_api_client.execute_query.return_value = mock_response
with patch("dbt_mcp.discovery.client.raise_gql_error"):
result = await exposures_fetcher.fetch_exposures()
assert len(result) == 0
assert isinstance(result, list)
async def test_fetch_exposures_handles_malformed_edges(
exposures_fetcher, mock_api_client
):
mock_response = {
"data": {
"environment": {
"definition": {
"exposures": {
"pageInfo": {"hasNextPage": False, "endCursor": None},
"edges": [
{
"node": {
"name": "valid_exposure",
"uniqueId": "exposure.test.valid_exposure",
"exposureType": "application",
"maturity": "high",
"ownerEmail": "[email protected]",
"ownerName": "Test Owner",
"url": "https://example.com",
"meta": {},
"freshnessStatus": "Unknown",
"description": "Valid exposure",
"label": None,
"parents": [],
}
},
{"invalid": "edge"}, # Missing "node" key
{"node": "not_a_dict"}, # Node is not a dict
{
"node": {
"name": "another_valid_exposure",
"uniqueId": "exposure.test.another_valid_exposure",
"exposureType": "dashboard",
"maturity": "low",
"ownerEmail": "[email protected]",
"ownerName": "Test Owner 2",
"url": "https://example2.com",
"meta": {},
"freshnessStatus": "Stale",
"description": "Another valid exposure",
"label": None,
"parents": [],
}
},
],
}
}
}
}
}
mock_api_client.execute_query.return_value = mock_response
with patch("dbt_mcp.discovery.client.raise_gql_error"):
result = await exposures_fetcher.fetch_exposures()
# Should only get the valid exposures (malformed edges should be filtered out)
assert len(result) == 2
assert result[0]["name"] == "valid_exposure"
assert result[1]["name"] == "another_valid_exposure"
async def test_fetch_exposure_details_by_unique_ids_single(
exposures_fetcher, mock_api_client
):
mock_response = {
"data": {
"environment": {
"definition": {
"exposures": {
"edges": [
{
"node": {
"name": "customer_dashboard",
"uniqueId": "exposure.analytics.customer_dashboard",
"exposureType": "dashboard",
"maturity": "high",
"ownerEmail": "[email protected]",
"ownerName": "Analytics Team",
"url": "https://dashboard.example.com/customers",
"meta": {"team": "analytics", "priority": "high"},
"freshnessStatus": "Fresh",
"description": "Customer analytics dashboard",
"label": "Customer Dashboard",
"parents": [
{"uniqueId": "model.analytics.customers"},
{
"uniqueId": "model.analytics.customer_metrics"
},
],
}
}
]
}
}
}
}
}
mock_api_client.execute_query.return_value = mock_response
with patch("dbt_mcp.discovery.client.raise_gql_error"):
result = await exposures_fetcher.fetch_exposure_details(
unique_ids=["exposure.analytics.customer_dashboard"]
)
assert isinstance(result, list)
assert len(result) == 1
exposure = result[0]
assert exposure["name"] == "customer_dashboard"
assert exposure["uniqueId"] == "exposure.analytics.customer_dashboard"
assert exposure["exposureType"] == "dashboard"
assert exposure["maturity"] == "high"
assert exposure["ownerEmail"] == "[email protected]"
assert exposure["ownerName"] == "Analytics Team"
assert exposure["url"] == "https://dashboard.example.com/customers"
assert exposure["meta"] == {"team": "analytics", "priority": "high"}
assert exposure["freshnessStatus"] == "Fresh"
assert exposure["description"] == "Customer analytics dashboard"
assert exposure["label"] == "Customer Dashboard"
assert len(exposure["parents"]) == 2
assert exposure["parents"][0]["uniqueId"] == "model.analytics.customers"
assert exposure["parents"][1]["uniqueId"] == "model.analytics.customer_metrics"
mock_api_client.execute_query.assert_called_once()
args, kwargs = mock_api_client.execute_query.call_args
assert args[1]["environmentId"] == 123
assert args[1]["first"] == 1
assert args[1]["filter"] == {"uniqueIds": ["exposure.analytics.customer_dashboard"]}
async def test_fetch_exposure_details_by_unique_ids_multiple(
exposures_fetcher, mock_api_client
):
mock_response = {
"data": {
"environment": {
"definition": {
"exposures": {
"edges": [
{
"node": {
"name": "customer_dashboard",
"uniqueId": "exposure.analytics.customer_dashboard",
"exposureType": "dashboard",
"maturity": "high",
"ownerEmail": "[email protected]",
"ownerName": "Analytics Team",
"url": "https://dashboard.example.com/customers",
"meta": {"team": "analytics", "priority": "high"},
"freshnessStatus": "Fresh",
"description": "Customer analytics dashboard",
"label": "Customer Dashboard",
"parents": [],
}
},
{
"node": {
"name": "sales_report",
"uniqueId": "exposure.sales.sales_report",
"exposureType": "analysis",
"maturity": "medium",
"ownerEmail": "[email protected]",
"ownerName": "Sales Team",
"url": None,
"meta": {},
"freshnessStatus": "Stale",
"description": "Monthly sales analysis report",
"label": None,
"parents": [{"uniqueId": "model.sales.sales_data"}],
}
},
]
}
}
}
}
}
mock_api_client.execute_query.return_value = mock_response
with patch("dbt_mcp.discovery.client.raise_gql_error"):
result = await exposures_fetcher.fetch_exposure_details(
unique_ids=[
"exposure.analytics.customer_dashboard",
"exposure.sales.sales_report",
]
)
assert isinstance(result, list)
assert len(result) == 2
# Check first exposure
exposure1 = result[0]
assert exposure1["name"] == "customer_dashboard"
assert exposure1["uniqueId"] == "exposure.analytics.customer_dashboard"
assert exposure1["exposureType"] == "dashboard"
# Check second exposure
exposure2 = result[1]
assert exposure2["name"] == "sales_report"
assert exposure2["uniqueId"] == "exposure.sales.sales_report"
assert exposure2["exposureType"] == "analysis"
mock_api_client.execute_query.assert_called_once()
args, kwargs = mock_api_client.execute_query.call_args
assert args[1]["environmentId"] == 123
assert args[1]["first"] == 2
assert args[1]["filter"] == {
"uniqueIds": [
"exposure.analytics.customer_dashboard",
"exposure.sales.sales_report",
]
}
async def test_fetch_exposure_details_by_name(exposures_fetcher, mock_api_client):
# Mock the response for fetch_exposures (which gets called when filtering by name)
mock_exposures_response = {
"data": {
"environment": {
"definition": {
"exposures": {
"pageInfo": {"hasNextPage": False, "endCursor": None},
"edges": [
{
"node": {
"name": "sales_report",
"uniqueId": "exposure.sales.sales_report",
"exposureType": "analysis",
"maturity": "medium",
"ownerEmail": "[email protected]",
"ownerName": "Sales Team",
"url": None,
"meta": {},
"freshnessStatus": "Stale",
"description": "Monthly sales analysis report",
"label": None,
"parents": [{"uniqueId": "model.sales.sales_data"}],
}
},
{
"node": {
"name": "other_exposure",
"uniqueId": "exposure.other.other_exposure",
"exposureType": "dashboard",
"maturity": "high",
"ownerEmail": "[email protected]",
"ownerName": "Other Team",
"url": None,
"meta": {},
"freshnessStatus": "Fresh",
"description": "Other exposure",
"label": None,
"parents": [],
}
},
],
}
}
}
}
}
mock_api_client.execute_query.return_value = mock_exposures_response
with patch("dbt_mcp.discovery.client.raise_gql_error"):
result = await exposures_fetcher.fetch_exposure_details(
exposure_name="sales_report"
)
assert isinstance(result, list)
assert len(result) == 1
exposure = result[0]
assert exposure["name"] == "sales_report"
assert exposure["uniqueId"] == "exposure.sales.sales_report"
assert exposure["exposureType"] == "analysis"
assert exposure["maturity"] == "medium"
assert exposure["url"] is None
assert exposure["meta"] == {}
assert exposure["freshnessStatus"] == "Stale"
assert exposure["label"] is None
# Should have called the GET_EXPOSURES query (not GET_EXPOSURE_DETAILS)
mock_api_client.execute_query.assert_called_once()
args, kwargs = mock_api_client.execute_query.call_args
assert args[1]["environmentId"] == 123
assert args[1]["first"] == 100 # PAGE_SIZE for fetch_exposures
async def test_fetch_exposure_details_not_found(exposures_fetcher, mock_api_client):
mock_response = {
"data": {"environment": {"definition": {"exposures": {"edges": []}}}}
}
mock_api_client.execute_query.return_value = mock_response
with patch("dbt_mcp.discovery.client.raise_gql_error"):
result = await exposures_fetcher.fetch_exposure_details(
unique_ids=["exposure.nonexistent.exposure"]
)
assert result == []
async def test_get_exposure_filters_unique_ids(exposures_fetcher):
filters = exposures_fetcher._get_exposure_filters(
unique_ids=["exposure.test.test_exposure"]
)
assert filters == {"uniqueIds": ["exposure.test.test_exposure"]}
async def test_get_exposure_filters_multiple_unique_ids(exposures_fetcher):
filters = exposures_fetcher._get_exposure_filters(
unique_ids=["exposure.test.test1", "exposure.test.test2"]
)
assert filters == {"uniqueIds": ["exposure.test.test1", "exposure.test.test2"]}
async def test_get_exposure_filters_name_raises_error(exposures_fetcher):
from dbt_mcp.errors import InvalidParameterError
with pytest.raises(
InvalidParameterError, match="ExposureFilter only supports uniqueIds"
):
exposures_fetcher._get_exposure_filters(exposure_name="test_exposure")
async def test_get_exposure_filters_no_params(exposures_fetcher):
from dbt_mcp.errors import InvalidParameterError
with pytest.raises(
InvalidParameterError,
match="unique_ids must be provided for exposure filtering",
):
exposures_fetcher._get_exposure_filters()
async def test_fetch_exposure_details_by_name_not_found(
exposures_fetcher, mock_api_client
):
# Mock empty response for fetch_exposures
mock_response = {
"data": {
"environment": {
"definition": {
"exposures": {
"pageInfo": {"hasNextPage": False, "endCursor": None},
"edges": [],
}
}
}
}
}
mock_api_client.execute_query.return_value = mock_response
with patch("dbt_mcp.discovery.client.raise_gql_error"):
result = await exposures_fetcher.fetch_exposure_details(
exposure_name="nonexistent_exposure"
)
assert result == []
```
--------------------------------------------------------------------------------
/src/dbt_mcp/lsp/lsp_connection.py:
--------------------------------------------------------------------------------
```python
"""LSP Connection Manager for dbt Fusion LSP.
This module manages the lifecycle of LSP processes and handles JSON-RPC
communication according to the Language Server Protocol specification.
"""
import asyncio
from enum import Enum
import itertools
import json
import logging
import socket
import subprocess
from collections.abc import Iterator, Sequence
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import uuid
from dataclasses import asdict
logger = logging.getLogger(__name__)
class LspEventName(str, Enum):
"""LSP event names."""
compileComplete = "dbt/lspCompileComplete"
logMessage = "window/logMessage"
progress = "$/progress"
workspaceDiagnostics = "workspace/diagnostics"
fileDiagnostics = "textDocument/publishDiagnostics"
def event_name_from_string(string: str) -> LspEventName | None:
"""Create an LSP event name from a string."""
try:
return LspEventName(string)
except ValueError:
return None
@dataclass
class JsonRpcMessage:
"""Represents a JSON-RPC 2.0 message."""
jsonrpc: str = "2.0"
id: int | str | None = None
method: str | None = None
params: dict[str, Any] | list[Any] | None = None
result: Any = None
error: dict[str, Any] | None = None
def to_dict(self, none_values: bool = False) -> dict[str, Any]:
"""Convert the message to a dictionary."""
def dict_factory(x: list[tuple[str, Any]]) -> dict[str, Any]:
return dict(x) if none_values else {k: v for k, v in x if v is not None}
return asdict(self, dict_factory=dict_factory)
@dataclass
class LspConnectionState:
"""Tracks the state of an LSP connection."""
initialized: bool = False
shutting_down: bool = False
capabilities: dict[str, Any] = field(default_factory=dict)
pending_requests: dict[int | str, asyncio.Future] = field(default_factory=dict)
pending_notifications: dict[LspEventName, list[asyncio.Future]] = field(
default_factory=dict
)
compiled: bool = False
# start at 20 to avoid collisions between ids of requests we are waiting for and the lsp server requests from us
request_id_counter: Iterator[int] = field(
default_factory=lambda: itertools.count(20)
)
def get_next_request_id(self) -> int:
return next(self.request_id_counter)
class LSPConnection:
"""LSP process lifecycle and communication via socket.
This class handles:
- Starting and stopping LSP server processes
- Socket-based JSON-RPC communication
- Request/response correlation
- Error handling and cleanup
"""
def __init__(
self,
binary_path: str,
cwd: str,
args: Sequence[str] | None = None,
connection_timeout: float = 10,
default_request_timeout: float = 60,
):
"""Initialize the LSP connection manager.
Args:
binary_path: Path to the LSP server binary
cwd: Working directory for the LSP process
args: Optional command-line arguments for the LSP server
connection_timeout: Timeout in seconds for establishing the initial socket
connection (default: 10). Used during server startup.
default_request_timeout: Default timeout in seconds for LSP request operations
(default: 60). Used when no timeout is specified for
individual requests.
"""
self.binary_path = Path(binary_path)
self.args = list(args) if args else []
self.cwd = cwd
self.host = "127.0.0.1"
self.port = 0
self.process: asyncio.subprocess.Process | None = None
self.state = LspConnectionState()
# Socket components
self._socket: socket.socket | None = None
self._connection: socket.socket | None = None
# Asyncio components for I/O
self._reader_task: asyncio.Task | None = None
self._writer_task: asyncio.Task | None = None
self._stdout_reader_task: asyncio.Task | None = None
self._stderr_reader_task: asyncio.Task | None = None
self._stop_event = asyncio.Event()
self._outgoing_queue: asyncio.Queue[bytes] = asyncio.Queue()
# Timeouts
self.connection_timeout = connection_timeout
self.default_request_timeout = default_request_timeout
logger.debug(f"LSP Connection initialized with binary: {self.binary_path}")
async def start(self) -> None:
"""Start the LSP server process and socket communication tasks."""
if self.process is not None:
logger.warning("LSP process is already running")
return
try:
self.setup_socket()
await self.launch_lsp_process()
# Wait for connection with timeout (run socket.accept in executor)
if self._socket:
self._socket.settimeout(self.connection_timeout)
try:
(
self._connection,
client_addr,
) = await asyncio.get_running_loop().run_in_executor(
None, self._socket.accept
)
if self._connection:
self._connection.settimeout(
None
) # Set to blocking for read/write
logger.debug(f"LSP server connected from {client_addr}")
except TimeoutError:
raise RuntimeError("Timeout waiting for LSP server to connect")
# Start I/O tasks
self._stop_event.clear()
self._reader_task = asyncio.get_running_loop().create_task(
self._read_loop()
)
self._writer_task = asyncio.get_running_loop().create_task(
self._write_loop()
)
except Exception as e:
logger.error(f"Failed to start LSP server: {e}")
await self.stop()
raise
def setup_socket(self) -> None:
"""Set up the socket for LSP server communication.
Creates a TCP socket, binds it to the configured host and port,
and starts listening for incoming connections. If port is 0,
the OS will auto-assign an available port.
"""
# Create socket and bind
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socket.bind((self.host, self.port))
self._socket.listen(1)
# Get the actual port if auto-assigned
_, actual_port = self._socket.getsockname()
self.port = actual_port
logger.debug(f"Socket listening on {self.host}:{self.port}")
async def launch_lsp_process(self) -> None:
"""Launch the LSP server process.
Starts the LSP server as a subprocess with socket communication enabled.
The process is started with stdout and stderr capture for monitoring.
The server will connect back to the socket set up by setup_socket().
"""
# Prepare command with socket info
cmd = [
str(self.binary_path),
"--socket",
f"{self.port}",
"--project-dir",
self.cwd,
*self.args,
]
logger.debug(f"Starting LSP server: {' '.join(cmd)}")
self.process = await asyncio.create_subprocess_exec(*cmd)
logger.info(f"LSP server started with PID: {self.process.pid}")
async def stop(self) -> None:
"""Stop the LSP server process and cleanup resources."""
logger.info("Stopping LSP server...")
# Signal tasks to stop
self._stop_event.set()
# Cancel I/O tasks
if self._reader_task and not self._reader_task.done():
self._reader_task.cancel()
try:
await self._reader_task
except asyncio.CancelledError:
pass
if self._writer_task and not self._writer_task.done():
self._writer_task.cancel()
try:
await self._writer_task
except asyncio.CancelledError:
pass
# Cancel stdout/stderr reader tasks
if self._stdout_reader_task and not self._stdout_reader_task.done():
self._stdout_reader_task.cancel()
try:
await self._stdout_reader_task
except asyncio.CancelledError:
pass
if self._stderr_reader_task and not self._stderr_reader_task.done():
self._stderr_reader_task.cancel()
try:
await self._stderr_reader_task
except asyncio.CancelledError:
pass
# Send shutdown request if initialized
if self.process and not self.state.shutting_down:
self.state.shutting_down = True
try:
self._send_shutdown_request()
await asyncio.sleep(0.5) # Give server time to process shutdown
except Exception as e:
logger.warning(f"Error sending shutdown request: {e}")
# Close socket connection
if self._connection:
try:
self._connection.close()
except Exception as e:
logger.error(f"Error closing socket connection: {e}")
finally:
self._connection = None
# Close listening socket
if self._socket:
try:
self._socket.close()
except Exception as e:
logger.error(f"Error closing socket: {e}")
finally:
self._socket = None
# Terminate the process
if self.process:
try:
self.process.terminate()
try:
await self.process.wait()
except subprocess.TimeoutExpired:
logger.warning("LSP process didn't terminate, killing...")
self.process.kill()
await self.process.wait()
except Exception as e:
logger.error(f"Error terminating LSP process: {e}")
finally:
self.process = None
# Clear state
self.state = LspConnectionState()
logger.info("LSP server stopped")
async def initialize(
self, root_uri: str | None = None, timeout: float = 10
) -> None:
"""Initialize the LSP connection.
Sends the initialize request to the LSP server and waits for the response.
The server capabilities are stored in the connection state.
Args:
root_uri: The root URI of the workspace (optional)
timeout: Timeout in seconds for the initialize request (default: 10)
"""
if self.state.initialized:
raise RuntimeError("LSP server is already initialized")
params = {
"processId": None,
"rootUri": root_uri,
"clientInfo": {
"name": "dbt-mcp",
"version": "1.0.0",
},
"capabilities": {},
"initializationOptions": {
"project-dir": "file:///",
"command-prefix": str(uuid.uuid4()),
},
}
# Send initialize request
result = await self.send_request("initialize", params, timeout=timeout)
# Store capabilities
self.state.capabilities = result.get("capabilities", {})
self.state.initialized = True
# Send initialized notification
self.send_notification("initialized", {})
logger.info("LSP server initialized successfully")
async def _read_loop(self) -> None:
"""Background task that reads messages from the LSP server via socket."""
if not self._connection:
logger.warning("LSP server socket is not available")
return
buffer = b""
while not self._stop_event.is_set():
try:
# Read data from socket (run in executor to avoid blocking)
self._connection.settimeout(0.1) # Short timeout to check stop event
try:
chunk = await asyncio.get_running_loop().run_in_executor(
None, self._connection.recv, 4096
)
except TimeoutError:
continue
if not chunk:
logger.warning("LSP server socket closed")
break
buffer += chunk
# Try to parse messages from buffer
while True:
message, remaining = self._parse_message(buffer)
if message is None:
break
buffer = remaining
# Process the message
self._handle_incoming_message(message)
except asyncio.CancelledError:
# Task was cancelled, exit cleanly
break
except Exception as e:
if not self._stop_event.is_set():
logger.error(f"Error in reader task: {e}")
break
async def _write_loop(self) -> None:
"""Background task that writes messages to the LSP server via socket."""
if not self._connection:
return
while not self._stop_event.is_set():
try:
# Get message from queue (with timeout to check stop event)
try:
data = await asyncio.wait_for(
self._outgoing_queue.get(), timeout=0.1
)
except TimeoutError:
continue
# Write to socket (run in executor to avoid blocking)
await asyncio.get_running_loop().run_in_executor(
None, self._connection.sendall, data
)
except asyncio.CancelledError:
# Task was cancelled, exit cleanly
break
except Exception as e:
if not self._stop_event.is_set():
logger.error(f"Error in writer task: {e}")
break
def _parse_message(self, buffer: bytes) -> tuple[JsonRpcMessage | None, bytes]:
"""Parse a JSON-RPC message from the buffer.
LSP uses HTTP-like headers followed by JSON content:
Content-Length: <length>\\r\\n
\\r\\n
<json-content>
"""
# Look for Content-Length header
header_end = buffer.find(b"\r\n\r\n")
if header_end == -1:
return None, buffer
# Parse headers
headers = buffer[:header_end].decode("utf-8")
content_length = None
for line in headers.split("\r\n"):
if line.startswith("Content-Length:"):
try:
content_length = int(line.split(":")[1].strip())
except (IndexError, ValueError):
logger.error(f"Invalid Content-Length header: {line}")
return None, buffer[header_end + 4 :]
if content_length is None:
logger.error("Missing Content-Length header")
return None, buffer[header_end + 4 :]
# Check if we have the full message
content_start = header_end + 4
content_end = content_start + content_length
if len(buffer) < content_end:
return None, buffer
# Parse JSON content
try:
content = buffer[content_start:content_end].decode("utf-8")
data = json.loads(content)
message = JsonRpcMessage(**data)
return message, buffer[content_end:]
except (json.JSONDecodeError, UnicodeDecodeError) as e:
logger.error(f"Failed to parse message: {e}")
return None, buffer[content_end:]
def _handle_incoming_message(self, message: JsonRpcMessage) -> None:
"""Handle an incoming message from the LSP server."""
# Handle responses to requests
if message.id is not None:
# Thread-safe: pop with default avoids race condition between check and pop
future = self.state.pending_requests.pop(message.id, None)
if future is not None:
logger.debug(f"Received response for request {message.to_dict()}")
# Use call_soon_threadsafe to safely resolve futures across event loop contexts
# This prevents "Task got Future attached to a different loop" errors when
# the future was created in one loop but is being resolved from another loop
# Get the loop from the future itself to ensure we schedule on the correct loop
future_loop = future.get_loop()
if message.error:
future_loop.call_soon_threadsafe(
future.set_exception,
RuntimeError(f"LSP error: {message.error}"),
)
else:
future_loop.call_soon_threadsafe(future.set_result, message.result)
return
else:
# it's an unknown request, we respond with an empty result
logger.debug(f"LSP request {message.to_dict()}")
self._send_message(
JsonRpcMessage(id=message.id, result=None), none_values=True
)
if message.method is None:
return
# it's a known event type we want to explicitly handle
if lsp_event_name := event_name_from_string(message.method):
# Check if this is an event we're waiting for
# Thread-safe: pop with default avoids race condition
futures = self.state.pending_notifications.pop(lsp_event_name, None)
if futures is not None:
logger.debug(f"Received event {lsp_event_name} - {message.to_dict()}")
# Use call_soon_threadsafe for notification futures as well
for future in futures:
future_loop = future.get_loop()
future_loop.call_soon_threadsafe(future.set_result, message.params)
match lsp_event_name:
case LspEventName.compileComplete:
logger.info("Recorded compile complete event")
self.state.compiled = True
case _:
logger.debug(f"LSP event {message.method}")
pass
else:
# it's an unknown notification, log it and move on
logger.debug(f"LSP event {message.method}")
async def send_request(
self,
method: str,
params: dict[str, Any] | list[Any] | None = None,
timeout: float | None = None,
) -> dict[str, Any]:
"""Send a request to the LSP server.
Args:
method: The JSON-RPC method name
params: Optional parameters for the method
timeout: Timeout in seconds for this request. If not specified, uses
default_request_timeout from the connection configuration.
Returns:
A dictionary containing the response result or error information
"""
if not self.process:
raise RuntimeError("LSP server is not running")
# Create request message
request_id = self.state.get_next_request_id()
message = JsonRpcMessage(
id=request_id,
method=method,
params=params,
)
# Create future for response using the current running loop
# This prevents "Task got Future attached to a different loop" errors
# when send_request is called from a different loop context than where
# the connection was initialized
future = asyncio.get_running_loop().create_future()
self.state.pending_requests[request_id] = future
# Send the message
self._send_message(message)
try:
return await asyncio.wait_for(
future, timeout=timeout or self.default_request_timeout
)
except Exception as e:
return {"error": str(e)}
def send_notification(
self,
method: str,
params: dict[str, Any] | list[Any] | None = None,
) -> None:
"""Send a notification to the LSP server.
Args:
method: The JSON-RPC method name
params: Optional parameters for the method
"""
if not self.process:
raise RuntimeError("LSP server is not running")
# Create notification message (no ID)
message = JsonRpcMessage(
method=method,
params=params,
)
# Send the message
self._send_message(message)
def wait_for_notification(
self, event_name: LspEventName
) -> asyncio.Future[dict[str, Any]]:
"""Wait for a notification from the LSP server.
Args:
event_name: The LSP event name to wait for
Returns:
A Future that will be resolved with the notification params when received
"""
future = asyncio.get_running_loop().create_future()
self.state.pending_notifications.setdefault(event_name, []).append(future)
return future
def _send_message(self, message: JsonRpcMessage, none_values: bool = False) -> None:
"""Send a message to the LSP server."""
# Serialize message
content = json.dumps(message.to_dict(none_values=none_values))
content_bytes = content.encode("utf-8")
# Create LSP message with headers
header = f"Content-Length: {len(content_bytes)}\r\n\r\n"
header_bytes = header.encode("utf-8")
data = header_bytes + content_bytes
logger.debug(f"Sending message: {content}")
# Queue for sending (put_nowait is safe from sync context)
self._outgoing_queue.put_nowait(data)
def _send_shutdown_request(self) -> None:
"""Send shutdown request to the LSP server."""
try:
# Send shutdown request
message = JsonRpcMessage(
id=self.state.get_next_request_id(),
method="shutdown",
)
self._send_message(message)
# Send exit notification
exit_message = JsonRpcMessage(
method="exit",
)
self._send_message(exit_message)
except Exception as e:
logger.error(f"Error sending shutdown: {e}")
def is_running(self) -> bool:
"""Check if the LSP server is running."""
return self.process is not None and self.process.returncode is None
```
--------------------------------------------------------------------------------
/src/dbt_mcp/discovery/client.py:
--------------------------------------------------------------------------------
```python
import textwrap
from typing import Literal, TypedDict
import requests
from dbt_mcp.config.config_providers import ConfigProvider, DiscoveryConfig
from dbt_mcp.errors import GraphQLError, InvalidParameterError
from dbt_mcp.gql.errors import raise_gql_error
PAGE_SIZE = 100
MAX_NODE_QUERY_LIMIT = 1000
class GraphQLQueries:
GET_MODELS = textwrap.dedent("""
query GetModels(
$environmentId: BigInt!,
$modelsFilter: ModelAppliedFilter,
$after: String,
$first: Int,
$sort: AppliedModelSort
) {
environment(id: $environmentId) {
applied {
models(filter: $modelsFilter, after: $after, first: $first, sort: $sort) {
pageInfo {
endCursor
}
edges {
node {
name
uniqueId
description
}
}
}
}
}
}
""")
GET_MODEL_HEALTH = textwrap.dedent("""
query GetModelDetails(
$environmentId: BigInt!,
$modelsFilter: ModelAppliedFilter
$first: Int,
) {
environment(id: $environmentId) {
applied {
models(filter: $modelsFilter, first: $first) {
edges {
node {
name
uniqueId
executionInfo {
lastRunGeneratedAt
lastRunStatus
executeCompletedAt
executeStartedAt
}
tests {
name
description
columnName
testType
executionInfo {
lastRunGeneratedAt
lastRunStatus
executeCompletedAt
executeStartedAt
}
}
ancestors(types: [Model, Source, Seed, Snapshot]) {
... on ModelAppliedStateNestedNode {
name
uniqueId
resourceType
materializedType
modelexecutionInfo: executionInfo {
lastRunStatus
executeCompletedAt
}
}
... on SnapshotAppliedStateNestedNode {
name
uniqueId
resourceType
snapshotExecutionInfo: executionInfo {
lastRunStatus
executeCompletedAt
}
}
... on SeedAppliedStateNestedNode {
name
uniqueId
resourceType
seedExecutionInfo: executionInfo {
lastRunStatus
executeCompletedAt
}
}
... on SourceAppliedStateNestedNode {
sourceName
name
resourceType
freshness {
maxLoadedAt
maxLoadedAtTimeAgoInS
freshnessStatus
}
}
}
}
}
}
}
}
}
""")
GET_MODEL_DETAILS = textwrap.dedent("""
query GetModelDetails(
$environmentId: BigInt!,
$modelsFilter: ModelAppliedFilter
$first: Int,
) {
environment(id: $environmentId) {
applied {
models(filter: $modelsFilter, first: $first) {
edges {
node {
name
uniqueId
compiledCode
description
database
schema
alias
catalog {
columns {
description
name
type
}
}
}
}
}
}
}
}
""")
COMMON_FIELDS_PARENTS_CHILDREN = textwrap.dedent("""
{
... on ExposureAppliedStateNestedNode {
resourceType
name
description
}
... on ExternalModelNode {
resourceType
description
name
}
... on MacroDefinitionNestedNode {
resourceType
name
description
}
... on MetricDefinitionNestedNode {
resourceType
name
description
}
... on ModelAppliedStateNestedNode {
resourceType
name
description
}
... on SavedQueryDefinitionNestedNode {
resourceType
name
description
}
... on SeedAppliedStateNestedNode {
resourceType
name
description
}
... on SemanticModelDefinitionNestedNode {
resourceType
name
description
}
... on SnapshotAppliedStateNestedNode {
resourceType
name
description
}
... on SourceAppliedStateNestedNode {
resourceType
sourceName
uniqueId
name
description
}
... on TestAppliedStateNestedNode {
resourceType
name
description
}
""")
GET_MODEL_PARENTS = (
textwrap.dedent("""
query GetModelParents(
$environmentId: BigInt!,
$modelsFilter: ModelAppliedFilter
$first: Int,
) {
environment(id: $environmentId) {
applied {
models(filter: $modelsFilter, first: $first) {
pageInfo {
endCursor
}
edges {
node {
parents
""")
+ COMMON_FIELDS_PARENTS_CHILDREN
+ textwrap.dedent("""
}
}
}
}
}
}
}
""")
)
GET_MODEL_CHILDREN = (
textwrap.dedent("""
query GetModelChildren(
$environmentId: BigInt!,
$modelsFilter: ModelAppliedFilter
$first: Int,
) {
environment(id: $environmentId) {
applied {
models(filter: $modelsFilter, first: $first) {
pageInfo {
endCursor
}
edges {
node {
children
""")
+ COMMON_FIELDS_PARENTS_CHILDREN
+ textwrap.dedent("""
}
}
}
}
}
}
}
""")
)
GET_SOURCES = textwrap.dedent("""
query GetSources(
$environmentId: BigInt!,
$sourcesFilter: SourceAppliedFilter,
$after: String,
$first: Int
) {
environment(id: $environmentId) {
applied {
sources(filter: $sourcesFilter, after: $after, first: $first) {
pageInfo {
hasNextPage
endCursor
}
edges {
node {
name
uniqueId
identifier
description
sourceName
resourceType
database
schema
freshness {
maxLoadedAt
maxLoadedAtTimeAgoInS
freshnessStatus
}
}
}
}
}
}
}
""")
GET_EXPOSURES = textwrap.dedent("""
query Exposures($environmentId: BigInt!, $first: Int, $after: String) {
environment(id: $environmentId) {
definition {
exposures(first: $first, after: $after) {
totalCount
pageInfo {
hasNextPage
endCursor
}
edges {
node {
name
uniqueId
url
description
}
}
}
}
}
}
""")
GET_EXPOSURE_DETAILS = textwrap.dedent("""
query ExposureDetails($environmentId: BigInt!, $filter: ExposureFilter, $first: Int) {
environment(id: $environmentId) {
definition {
exposures(first: $first, filter: $filter) {
edges {
node {
name
maturity
label
ownerEmail
ownerName
uniqueId
url
meta
freshnessStatus
exposureType
description
parents {
uniqueId
}
}
}
}
}
}
}
""")
class MetadataAPIClient:
def __init__(self, config_provider: ConfigProvider[DiscoveryConfig]):
self.config_provider = config_provider
async def execute_query(self, query: str, variables: dict) -> dict:
config = await self.config_provider.get_config()
url = config.url
headers = config.headers_provider.get_headers()
response = requests.post(
url=url,
json={"query": query, "variables": variables},
headers=headers,
)
return response.json()
class ModelFilter(TypedDict, total=False):
modelingLayer: Literal["marts"] | None
class SourceFilter(TypedDict, total=False):
sourceNames: list[str]
uniqueIds: list[str] | None
class ModelsFetcher:
def __init__(self, api_client: MetadataAPIClient):
self.api_client = api_client
async def get_environment_id(self) -> int:
config = await self.api_client.config_provider.get_config()
return config.environment_id
def _parse_response_to_json(self, result: dict) -> list[dict]:
raise_gql_error(result)
edges = result["data"]["environment"]["applied"]["models"]["edges"]
parsed_edges: list[dict] = []
if not edges:
return parsed_edges
if result.get("errors"):
raise GraphQLError(f"GraphQL query failed: {result['errors']}")
for edge in edges:
if not isinstance(edge, dict) or "node" not in edge:
continue
node = edge["node"]
if not isinstance(node, dict):
continue
parsed_edges.append(node)
return parsed_edges
def _get_model_filters(
self, model_name: str | None = None, unique_id: str | None = None
) -> dict[str, list[str] | str]:
if unique_id:
return {"uniqueIds": [unique_id]}
elif model_name:
return {"identifier": model_name}
else:
raise InvalidParameterError(
"Either model_name or unique_id must be provided"
)
async def fetch_models(self, model_filter: ModelFilter | None = None) -> list[dict]:
has_next_page = True
after_cursor: str = ""
all_edges: list[dict] = []
while has_next_page and len(all_edges) < MAX_NODE_QUERY_LIMIT:
variables = {
"environmentId": await self.get_environment_id(),
"after": after_cursor,
"first": PAGE_SIZE,
"modelsFilter": model_filter or {},
"sort": {"field": "queryUsageCount", "direction": "desc"},
}
result = await self.api_client.execute_query(
GraphQLQueries.GET_MODELS, variables
)
all_edges.extend(self._parse_response_to_json(result))
previous_after_cursor = after_cursor
after_cursor = result["data"]["environment"]["applied"]["models"][
"pageInfo"
]["endCursor"]
if previous_after_cursor == after_cursor:
has_next_page = False
return all_edges
async def fetch_model_details(
self, model_name: str | None = None, unique_id: str | None = None
) -> dict:
model_filters = self._get_model_filters(model_name, unique_id)
variables = {
"environmentId": await self.get_environment_id(),
"modelsFilter": model_filters,
"first": 1,
}
result = await self.api_client.execute_query(
GraphQLQueries.GET_MODEL_DETAILS, variables
)
raise_gql_error(result)
edges = result["data"]["environment"]["applied"]["models"]["edges"]
if not edges:
return {}
return edges[0]["node"]
async def fetch_model_parents(
self, model_name: str | None = None, unique_id: str | None = None
) -> list[dict]:
model_filters = self._get_model_filters(model_name, unique_id)
variables = {
"environmentId": await self.get_environment_id(),
"modelsFilter": model_filters,
"first": 1,
}
result = await self.api_client.execute_query(
GraphQLQueries.GET_MODEL_PARENTS, variables
)
raise_gql_error(result)
edges = result["data"]["environment"]["applied"]["models"]["edges"]
if not edges:
return []
return edges[0]["node"]["parents"]
async def fetch_model_children(
self, model_name: str | None = None, unique_id: str | None = None
) -> list[dict]:
model_filters = self._get_model_filters(model_name, unique_id)
variables = {
"environmentId": await self.get_environment_id(),
"modelsFilter": model_filters,
"first": 1,
}
result = await self.api_client.execute_query(
GraphQLQueries.GET_MODEL_CHILDREN, variables
)
raise_gql_error(result)
edges = result["data"]["environment"]["applied"]["models"]["edges"]
if not edges:
return []
return edges[0]["node"]["children"]
async def fetch_model_health(
self, model_name: str | None = None, unique_id: str | None = None
) -> list[dict]:
model_filters = self._get_model_filters(model_name, unique_id)
variables = {
"environmentId": await self.get_environment_id(),
"modelsFilter": model_filters,
"first": 1,
}
result = await self.api_client.execute_query(
GraphQLQueries.GET_MODEL_HEALTH, variables
)
raise_gql_error(result)
edges = result["data"]["environment"]["applied"]["models"]["edges"]
if not edges:
return []
return edges[0]["node"]
class ExposuresFetcher:
def __init__(self, api_client: MetadataAPIClient):
self.api_client = api_client
async def get_environment_id(self) -> int:
config = await self.api_client.config_provider.get_config()
return config.environment_id
def _parse_response_to_json(self, result: dict) -> list[dict]:
raise_gql_error(result)
edges = result["data"]["environment"]["definition"]["exposures"]["edges"]
parsed_edges: list[dict] = []
if not edges:
return parsed_edges
if result.get("errors"):
raise GraphQLError(f"GraphQL query failed: {result['errors']}")
for edge in edges:
if not isinstance(edge, dict) or "node" not in edge:
continue
node = edge["node"]
if not isinstance(node, dict):
continue
parsed_edges.append(node)
return parsed_edges
async def fetch_exposures(self) -> list[dict]:
has_next_page = True
after_cursor: str | None = None
all_edges: list[dict] = []
while has_next_page:
variables: dict[str, int | str] = {
"environmentId": await self.get_environment_id(),
"first": PAGE_SIZE,
}
if after_cursor:
variables["after"] = after_cursor
result = await self.api_client.execute_query(
GraphQLQueries.GET_EXPOSURES, variables
)
new_edges = self._parse_response_to_json(result)
all_edges.extend(new_edges)
page_info = result["data"]["environment"]["definition"]["exposures"][
"pageInfo"
]
has_next_page = page_info.get("hasNextPage", False)
after_cursor = page_info.get("endCursor")
return all_edges
def _get_exposure_filters(
self, exposure_name: str | None = None, unique_ids: list[str] | None = None
) -> dict[str, list[str]]:
if unique_ids:
return {"uniqueIds": unique_ids}
elif exposure_name:
raise InvalidParameterError(
"ExposureFilter only supports uniqueIds. Please use unique_ids parameter instead of exposure_name."
)
else:
raise InvalidParameterError(
"unique_ids must be provided for exposure filtering"
)
async def fetch_exposure_details(
self, exposure_name: str | None = None, unique_ids: list[str] | None = None
) -> list[dict]:
if exposure_name and not unique_ids:
# Since ExposureFilter doesn't support filtering by name,
# we need to fetch all exposures and find the one with matching name
all_exposures = await self.fetch_exposures()
for exposure in all_exposures:
if exposure.get("name") == exposure_name:
return [exposure]
return []
elif unique_ids:
exposure_filters = self._get_exposure_filters(unique_ids=unique_ids)
variables = {
"environmentId": await self.get_environment_id(),
"filter": exposure_filters,
"first": len(unique_ids), # Request as many as we're filtering for
}
result = await self.api_client.execute_query(
GraphQLQueries.GET_EXPOSURE_DETAILS, variables
)
raise_gql_error(result)
edges = result["data"]["environment"]["definition"]["exposures"]["edges"]
if not edges:
return []
return [edge["node"] for edge in edges]
else:
raise InvalidParameterError(
"Either exposure_name or unique_ids must be provided"
)
class SourcesFetcher:
def __init__(self, api_client: MetadataAPIClient):
self.api_client = api_client
async def get_environment_id(self) -> int:
config = await self.api_client.config_provider.get_config()
return config.environment_id
def _parse_response_to_json(self, result: dict) -> list[dict]:
raise_gql_error(result)
edges = result["data"]["environment"]["applied"]["sources"]["edges"]
parsed_edges: list[dict] = []
if not edges:
return parsed_edges
if result.get("errors"):
raise GraphQLError(f"GraphQL query failed: {result['errors']}")
for edge in edges:
if not isinstance(edge, dict) or "node" not in edge:
continue
node = edge["node"]
if not isinstance(node, dict):
continue
parsed_edges.append(node)
return parsed_edges
async def fetch_sources(
self,
source_names: list[str] | None = None,
unique_ids: list[str] | None = None,
) -> list[dict]:
source_filter: SourceFilter = {}
if source_names is not None:
source_filter["sourceNames"] = source_names
if unique_ids is not None:
source_filter["uniqueIds"] = unique_ids
has_next_page = True
after_cursor: str = ""
all_edges: list[dict] = []
while has_next_page and len(all_edges) < MAX_NODE_QUERY_LIMIT:
variables = {
"environmentId": await self.get_environment_id(),
"after": after_cursor,
"first": PAGE_SIZE,
"sourcesFilter": source_filter,
}
result = await self.api_client.execute_query(
GraphQLQueries.GET_SOURCES, variables
)
all_edges.extend(self._parse_response_to_json(result))
page_info = result["data"]["environment"]["applied"]["sources"]["pageInfo"]
has_next_page = page_info.get("hasNextPage", False)
after_cursor = page_info.get("endCursor")
return all_edges
```
--------------------------------------------------------------------------------
/tests/unit/lsp/test_lsp_connection.py:
--------------------------------------------------------------------------------
```python
"""Unit tests for the LSP connection module."""
import asyncio
import socket
import subprocess
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from dbt_mcp.lsp.lsp_connection import (
LSPConnection,
LspConnectionState,
LspEventName,
JsonRpcMessage,
event_name_from_string,
)
class TestJsonRpcMessage:
"""Test JsonRpcMessage dataclass."""
def test_to_dict_with_request(self):
"""Test converting a request message to dictionary."""
msg = JsonRpcMessage(id=1, method="initialize", params={"processId": None})
result = msg.to_dict()
assert result == {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {"processId": None},
}
def test_to_dict_with_response(self):
"""Test converting a response message to dictionary."""
msg = JsonRpcMessage(id=1, result={"capabilities": {}})
result = msg.to_dict()
assert result == {"jsonrpc": "2.0", "id": 1, "result": {"capabilities": {}}}
def test_to_dict_with_error(self):
"""Test converting an error message to dictionary."""
msg = JsonRpcMessage(
id=1, error={"code": -32601, "message": "Method not found"}
)
result = msg.to_dict()
assert result == {
"jsonrpc": "2.0",
"id": 1,
"error": {"code": -32601, "message": "Method not found"},
}
def test_to_dict_notification(self):
"""Test converting a notification message to dictionary."""
msg = JsonRpcMessage(
method="window/logMessage", params={"type": 3, "message": "Server started"}
)
result = msg.to_dict()
assert result == {
"jsonrpc": "2.0",
"method": "window/logMessage",
"params": {"type": 3, "message": "Server started"},
}
def test_from_dict(self):
"""Test creating message from dictionary."""
data = {
"jsonrpc": "2.0",
"id": 42,
"method": "textDocument/completion",
"params": {"textDocument": {"uri": "file:///test.sql"}},
}
msg = JsonRpcMessage(**data)
assert msg.jsonrpc == "2.0"
assert msg.id == 42
assert msg.method == "textDocument/completion"
assert msg.params == {"textDocument": {"uri": "file:///test.sql"}}
class TestLspEventName:
"""Test LspEventName enum and helpers."""
def test_event_name_from_string_valid(self):
"""Test converting valid string to event name."""
assert (
event_name_from_string("dbt/lspCompileComplete")
== LspEventName.compileComplete
)
assert event_name_from_string("window/logMessage") == LspEventName.logMessage
assert event_name_from_string("$/progress") == LspEventName.progress
def test_event_name_from_string_invalid(self):
"""Test converting invalid string returns None."""
assert event_name_from_string("invalid/event") is None
assert event_name_from_string("") is None
class TestLspConnectionState:
"""Test LspConnectionState dataclass."""
def test_initial_state(self):
"""Test initial state values."""
state = LspConnectionState()
assert state.initialized is False
assert state.shutting_down is False
assert state.capabilities is not None
assert len(state.capabilities) == 0
assert state.pending_requests == {}
assert state.pending_notifications == {}
assert state.compiled is False
def test_get_next_request_id(self):
"""Test request ID generation."""
state = LspConnectionState()
# Should start at 20 to avoid collisions
id1 = state.get_next_request_id()
id2 = state.get_next_request_id()
id3 = state.get_next_request_id()
assert id1 == 20
assert id2 == 21
assert id3 == 22
class TestLSPConnectionInitialization:
"""Test LSP connection initialization and validation."""
def test_init_valid_binary(self, tmp_path):
"""Test initialization with valid binary path."""
# Create a dummy binary file
binary_path = tmp_path / "lsp-server"
binary_path.touch()
conn = LSPConnection(
binary_path=str(binary_path),
cwd="/test/dir",
args=["--arg1", "--arg2"],
connection_timeout=15,
default_request_timeout=60,
)
assert conn.binary_path == binary_path
assert conn.cwd == "/test/dir"
assert conn.args == ["--arg1", "--arg2"]
assert conn.host == "127.0.0.1"
assert conn.port == 0
assert conn.connection_timeout == 15
assert conn.default_request_timeout == 60
assert conn.process is None
assert isinstance(conn.state, LspConnectionState)
class TestSocketSetup:
"""Test socket setup and lifecycle."""
def test_setup_socket_success(self, tmp_path):
"""Test successful socket setup."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
with patch("socket.socket") as mock_socket_class:
mock_socket = MagicMock()
mock_socket.getsockname.return_value = ("127.0.0.1", 54321)
mock_socket_class.return_value = mock_socket
conn.setup_socket()
# Verify socket setup
mock_socket_class.assert_called_once_with(
socket.AF_INET, socket.SOCK_STREAM
)
mock_socket.setsockopt.assert_called_once_with(
socket.SOL_SOCKET, socket.SO_REUSEADDR, 1
)
mock_socket.bind.assert_called_once_with(("127.0.0.1", 0))
mock_socket.listen.assert_called_once_with(1)
assert conn.port == 54321
assert conn._socket == mock_socket
class TestProcessLaunching:
"""Test LSP process launching and termination."""
@pytest.mark.asyncio
async def test_launch_lsp_process_success(self, tmp_path):
"""Test successful process launch."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test/dir")
conn.port = 12345
with patch("asyncio.create_subprocess_exec") as mock_create_subprocess:
mock_process = MagicMock()
mock_process.pid = 9999
mock_create_subprocess.return_value = mock_process
await conn.launch_lsp_process()
# Verify process was started with correct arguments
mock_create_subprocess.assert_called_once_with(
str(binary_path), "--socket", "12345", "--project-dir", "/test/dir"
)
assert conn.process == mock_process
class TestStartStop:
"""Test start/stop lifecycle."""
@pytest.mark.asyncio
async def test_start_success(self, tmp_path):
"""Test successful server start."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Mock socket setup
mock_socket = MagicMock()
mock_connection = MagicMock()
mock_socket.getsockname.return_value = ("127.0.0.1", 54321)
# Mock process
mock_process = MagicMock()
mock_process.pid = 9999
with (
patch("socket.socket", return_value=mock_socket),
patch("asyncio.create_subprocess_exec", return_value=mock_process),
patch.object(conn, "_read_loop", new_callable=AsyncMock),
patch.object(conn, "_write_loop", new_callable=AsyncMock),
):
# Mock socket accept
async def mock_accept_wrapper():
return mock_connection, ("127.0.0.1", 12345)
with patch("asyncio.get_running_loop") as mock_loop:
mock_loop.return_value.run_in_executor.return_value = (
mock_accept_wrapper()
)
mock_loop.return_value.create_task.side_effect = (
lambda coro: asyncio.create_task(coro)
)
await conn.start()
assert conn.process == mock_process
assert conn._connection == mock_connection
assert conn._reader_task is not None
assert conn._writer_task is not None
@pytest.mark.asyncio
async def test_start_already_running(self, tmp_path):
"""Test starting when already running."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
conn.process = MagicMock() # Simulate already running
with (
patch("socket.socket"),
patch("asyncio.create_subprocess_exec") as mock_create_subprocess,
):
await conn.start()
# Should not create a new process
mock_create_subprocess.assert_not_called()
@pytest.mark.asyncio
async def test_start_timeout(self, tmp_path):
"""Test start timeout when server doesn't connect."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test", connection_timeout=0.1)
mock_socket = MagicMock()
mock_socket.getsockname.return_value = ("127.0.0.1", 54321)
mock_process = MagicMock()
with (
patch("socket.socket", return_value=mock_socket),
patch("asyncio.create_subprocess_exec", return_value=mock_process),
):
# Simulate timeout in socket.accept
mock_socket.accept.side_effect = TimeoutError
with patch("asyncio.get_running_loop") as mock_loop:
mock_loop.return_value.run_in_executor.side_effect = TimeoutError
with pytest.raises(
RuntimeError, match="Timeout waiting for LSP server to connect"
):
await conn.start()
@pytest.mark.asyncio
async def test_stop_complete_cleanup(self, tmp_path):
"""Test complete cleanup on stop."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Setup mocks for running state
conn.process = MagicMock()
conn.process.terminate = MagicMock()
conn.process.wait = AsyncMock()
conn.process.kill = MagicMock()
conn._socket = MagicMock()
conn._connection = MagicMock()
# Create mock tasks with proper async behavior
async def mock_task():
pass
conn._reader_task = asyncio.create_task(mock_task())
conn._writer_task = asyncio.create_task(mock_task())
# Let tasks complete
await asyncio.sleep(0.01)
# Store references before they are set to None
mock_connection = conn._connection
mock_socket = conn._socket
mock_process = conn.process
with patch.object(conn, "_send_shutdown_request") as mock_shutdown:
await conn.stop()
# Verify cleanup methods were called
mock_shutdown.assert_called_once()
mock_connection.close.assert_called_once()
mock_socket.close.assert_called_once()
mock_process.terminate.assert_called_once()
# Verify everything was set to None
assert conn.process is None
assert conn._socket is None
assert conn._connection is None
@pytest.mark.asyncio
async def test_stop_force_kill(self, tmp_path):
"""Test force kill when process doesn't terminate."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Setup mock process that doesn't terminate
mock_process = MagicMock()
mock_process.terminate = MagicMock()
mock_process.wait = AsyncMock(
side_effect=[subprocess.TimeoutExpired("cmd", 1), None]
)
mock_process.kill = MagicMock()
conn.process = mock_process
await conn.stop()
# Verify force kill was called
mock_process.terminate.assert_called_once()
mock_process.kill.assert_called_once()
class TestInitializeMethod:
"""Test LSP initialization handshake."""
@pytest.mark.asyncio
async def test_initialize_success(self, tmp_path):
"""Test successful initialization."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
conn.process = MagicMock() # Simulate running
# Mock send_request to return capabilities
mock_result = {
"capabilities": {
"textDocumentSync": 2,
"completionProvider": {"triggerCharacters": [".", ":"]},
}
}
with (
patch.object(
conn, "send_request", new_callable=AsyncMock
) as mock_send_request,
patch.object(conn, "send_notification") as mock_send_notification,
):
mock_send_request.return_value = mock_result
await conn.initialize(root_uri="file:///workspace", timeout=5)
# Verify initialize request was sent
mock_send_request.assert_called_once()
call_args = mock_send_request.call_args
assert call_args[0][0] == "initialize"
assert call_args[1]["timeout"] == 5
params = call_args[0][1]
assert params["rootUri"] == "file:///workspace"
assert params["clientInfo"]["name"] == "dbt-mcp"
# Verify initialized notification was sent
mock_send_notification.assert_called_once_with("initialized", {})
# Verify state was updated
assert conn.state.initialized is True
assert conn.state.capabilities == mock_result["capabilities"]
@pytest.mark.asyncio
async def test_initialize_already_initialized(self, tmp_path):
"""Test initialization when already initialized."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
conn.process = MagicMock()
conn.state.initialized = True
with pytest.raises(RuntimeError, match="LSP server is already initialized"):
await conn.initialize()
class TestMessageParsing:
"""Test JSON-RPC message parsing."""
def test_parse_message_complete(self, tmp_path):
"""Test parsing a complete message."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Create a valid LSP message
content = '{"jsonrpc":"2.0","id":1,"result":{"test":true}}'
header = f"Content-Length: {len(content)}\r\n\r\n"
buffer = (header + content).encode("utf-8")
message, remaining = conn._parse_message(buffer)
assert message is not None
assert message.id == 1
assert message.result == {"test": True}
assert remaining == b""
def test_parse_message_incomplete_header(self, tmp_path):
"""Test parsing with incomplete header."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
buffer = b"Content-Length: 50\r\n" # Missing \r\n\r\n
message, remaining = conn._parse_message(buffer)
assert message is None
assert remaining == buffer
def test_parse_message_incomplete_content(self, tmp_path):
"""Test parsing with incomplete content."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
content = '{"jsonrpc":"2.0","id":1,"result":{"test":true}}'
header = f"Content-Length: {len(content)}\r\n\r\n"
# Only include part of the content
buffer = (header + content[:10]).encode("utf-8")
message, remaining = conn._parse_message(buffer)
assert message is None
assert remaining == buffer
def test_parse_message_invalid_json(self, tmp_path):
"""Test parsing with invalid JSON content."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
content = '{"invalid json'
header = f"Content-Length: {len(content)}\r\n\r\n"
buffer = (header + content).encode("utf-8")
message, remaining = conn._parse_message(buffer)
assert message is None
assert remaining == b"" # Invalid message is discarded
def test_parse_message_missing_content_length(self, tmp_path):
"""Test parsing with missing Content-Length header."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
buffer = b'Some-Header: value\r\n\r\n{"test":true}'
message, remaining = conn._parse_message(buffer)
assert message is None
assert remaining == b'{"test":true}' # Header consumed, content remains
def test_parse_message_multiple_messages(self, tmp_path):
"""Test parsing multiple messages from buffer."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Create two messages
content1 = '{"jsonrpc":"2.0","id":1,"result":true}'
content2 = '{"jsonrpc":"2.0","id":2,"result":false}'
header1 = f"Content-Length: {len(content1)}\r\n\r\n"
header2 = f"Content-Length: {len(content2)}\r\n\r\n"
buffer = (header1 + content1 + header2 + content2).encode("utf-8")
# Parse first message
message1, remaining1 = conn._parse_message(buffer)
assert message1 is not None
assert message1.id == 1
assert message1.result is True
# Parse second message
message2, remaining2 = conn._parse_message(remaining1)
assert message2 is not None
assert message2.id == 2
assert message2.result is False
assert remaining2 == b""
class TestMessageHandling:
"""Test incoming message handling."""
def test_handle_response_message(self, tmp_path):
"""Test handling response to a request."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Create a pending request
future = asyncio.Future()
conn.state.pending_requests[42] = future
# Handle response message
message = JsonRpcMessage(id=42, result={"success": True})
with patch.object(future, "get_loop") as mock_get_loop:
mock_loop = MagicMock()
mock_get_loop.return_value = mock_loop
conn._handle_incoming_message(message)
# Verify future was resolved
mock_loop.call_soon_threadsafe.assert_called_once()
args = mock_loop.call_soon_threadsafe.call_args[0]
assert args[0] == future.set_result
assert args[1] == {"success": True}
# Verify request was removed from pending
assert 42 not in conn.state.pending_requests
def test_handle_error_response(self, tmp_path):
"""Test handling error response."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Create a pending request
future = asyncio.Future()
conn.state.pending_requests[42] = future
# Handle error response
message = JsonRpcMessage(
id=42, error={"code": -32601, "message": "Method not found"}
)
with patch.object(future, "get_loop") as mock_get_loop:
mock_loop = MagicMock()
mock_get_loop.return_value = mock_loop
conn._handle_incoming_message(message)
# Verify future was rejected
mock_loop.call_soon_threadsafe.assert_called_once()
args = mock_loop.call_soon_threadsafe.call_args[0]
assert args[0] == future.set_exception
def test_handle_unknown_response(self, tmp_path):
"""Test handling response for unknown request ID."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Handle response with unknown ID
message = JsonRpcMessage(id=999, result={"test": True})
with patch.object(conn, "_send_message") as mock_send:
conn._handle_incoming_message(message)
# Should send empty response back
mock_send.assert_called_once()
sent_msg = mock_send.call_args[0][0]
assert isinstance(sent_msg, JsonRpcMessage)
assert sent_msg.id == 999
assert sent_msg.result is None
def test_handle_notification(self, tmp_path):
"""Test handling notification messages."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Create futures waiting for compile complete event
future1 = asyncio.Future()
future2 = asyncio.Future()
conn.state.pending_notifications[LspEventName.compileComplete] = [
future1,
future2,
]
# Handle compile complete notification
message = JsonRpcMessage(
method="dbt/lspCompileComplete", params={"success": True}
)
with (
patch.object(future1, "get_loop") as mock_get_loop1,
patch.object(future2, "get_loop") as mock_get_loop2,
):
mock_loop1 = MagicMock()
mock_loop2 = MagicMock()
mock_get_loop1.return_value = mock_loop1
mock_get_loop2.return_value = mock_loop2
conn._handle_incoming_message(message)
# Verify futures were resolved
mock_loop1.call_soon_threadsafe.assert_called_once_with(
future1.set_result, {"success": True}
)
mock_loop2.call_soon_threadsafe.assert_called_once_with(
future2.set_result, {"success": True}
)
# Verify compile state was set
assert conn.state.compiled is True
def test_handle_unknown_notification(self, tmp_path):
"""Test handling unknown notification."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Handle unknown notification
message = JsonRpcMessage(method="unknown/notification", params={"data": "test"})
# Should not raise, just log
conn._handle_incoming_message(message)
class TestSendRequest:
"""Test sending requests to LSP server."""
@pytest.mark.asyncio
async def test_send_request_success(self, tmp_path):
"""Test successful request sending."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
conn.process = MagicMock() # Simulate running
with (
patch.object(conn, "_send_message") as mock_send,
patch("asyncio.wait_for", new_callable=AsyncMock) as mock_wait_for,
):
mock_wait_for.return_value = {"result": "success"}
result = await conn.send_request(
"testMethod", {"param": "value"}, timeout=5
)
# Verify message was sent
mock_send.assert_called_once()
sent_msg = mock_send.call_args[0][0]
assert isinstance(sent_msg, JsonRpcMessage)
assert sent_msg.method == "testMethod"
assert sent_msg.params == {"param": "value"}
assert sent_msg.id is not None
# Verify result
assert result == {"result": "success"}
@pytest.mark.asyncio
async def test_send_request_not_running(self, tmp_path):
"""Test sending request when server not running."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# process is None - not running
with pytest.raises(RuntimeError, match="LSP server is not running"):
await conn.send_request("testMethod")
@pytest.mark.asyncio
async def test_send_request_timeout(self, tmp_path):
"""Test request timeout."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test", default_request_timeout=1)
conn.process = MagicMock()
with patch.object(conn, "_send_message"):
# Create a future that never resolves
future = asyncio.Future()
conn.state.pending_requests[20] = future
# Use real wait_for to test timeout
result = await conn.send_request("testMethod", timeout=0.01)
assert "error" in result
class TestSendNotification:
"""Test sending notifications to LSP server."""
def test_send_notification_success(self, tmp_path):
"""Test successful notification sending."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
conn.process = MagicMock()
with patch.object(conn, "_send_message") as mock_send:
conn.send_notification(
"window/showMessage", {"type": 3, "message": "Hello"}
)
# Verify message was sent
mock_send.assert_called_once()
sent_msg = mock_send.call_args[0][0]
assert isinstance(sent_msg, JsonRpcMessage)
assert sent_msg.method == "window/showMessage"
assert sent_msg.params == {"type": 3, "message": "Hello"}
assert sent_msg.id is None # Notifications have no ID
def test_send_notification_not_running(self, tmp_path):
"""Test sending notification when server not running."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# process is None - not running
with pytest.raises(RuntimeError, match="LSP server is not running"):
conn.send_notification("testMethod")
class TestWaitForNotification:
"""Test waiting for notifications."""
def test_wait_for_notification(self, tmp_path):
"""Test registering to wait for a notification."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
with patch("asyncio.get_running_loop") as mock_get_loop:
mock_loop = MagicMock()
mock_future = MagicMock()
mock_loop.create_future.return_value = mock_future
mock_get_loop.return_value = mock_loop
result = conn.wait_for_notification(LspEventName.compileComplete)
# Verify future was created and registered
assert result == mock_future
assert LspEventName.compileComplete in conn.state.pending_notifications
assert (
mock_future
in conn.state.pending_notifications[LspEventName.compileComplete]
)
class TestSendMessage:
"""Test low-level message sending."""
def test_send_message_with_jsonrpc_message(self, tmp_path):
"""Test sending JsonRpcMessage."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
conn._outgoing_queue = MagicMock()
message = JsonRpcMessage(id=1, method="test", params={"key": "value"})
conn._send_message(message)
# Verify message was queued
conn._outgoing_queue.put_nowait.assert_called_once()
data = conn._outgoing_queue.put_nowait.call_args[0][0]
# Parse the data to verify format
assert b"Content-Length:" in data
assert b"\r\n\r\n" in data
# JSON might have spaces after colons, check for both variants
assert b'"jsonrpc"' in data and b'"2.0"' in data
assert b'"method"' in data and b'"test"' in data
class TestShutdown:
"""Test shutdown sequence."""
def test_send_shutdown_request(self, tmp_path):
"""Test sending shutdown and exit messages."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
with patch.object(conn, "_send_message") as mock_send:
conn._send_shutdown_request()
# Verify two messages were sent
assert mock_send.call_count == 2
# First should be shutdown request
shutdown_msg = mock_send.call_args_list[0][0][0]
assert isinstance(shutdown_msg, JsonRpcMessage)
assert shutdown_msg.method == "shutdown"
assert shutdown_msg.id is not None
# Second should be exit notification
exit_msg = mock_send.call_args_list[1][0][0]
assert isinstance(exit_msg, JsonRpcMessage)
assert exit_msg.method == "exit"
assert exit_msg.id is None
class TestIsRunning:
"""Test is_running method."""
def test_is_running_true(self, tmp_path):
"""Test when process is running."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
conn.process = MagicMock()
conn.process.returncode = None
assert conn.is_running() is True
def test_is_running_false_no_process(self, tmp_path):
"""Test when no process."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
assert conn.is_running() is False
def test_is_running_false_process_exited(self, tmp_path):
"""Test when process has exited."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
conn.process = MagicMock()
conn.process.returncode = 0
assert conn.is_running() is False
class TestReadWriteLoops:
"""Test async I/O loops."""
@pytest.mark.asyncio
async def test_read_loop_processes_messages(self, tmp_path):
"""Test read loop processes incoming messages."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Setup mock connection
mock_connection = MagicMock()
conn._connection = mock_connection
# Create test data
content = '{"jsonrpc":"2.0","id":1,"result":true}'
header = f"Content-Length: {len(content)}\r\n\r\n"
test_data = (header + content).encode("utf-8")
# Mock recv to return data once then empty
recv_calls = [test_data, b""]
async def mock_recv_wrapper(size):
if recv_calls:
return recv_calls.pop(0)
return b""
with (
patch("asyncio.get_running_loop") as mock_get_loop,
patch.object(conn, "_handle_incoming_message") as mock_handle,
):
mock_loop = MagicMock()
mock_get_loop.return_value = mock_loop
mock_loop.run_in_executor.side_effect = (
lambda _, func, *args: mock_recv_wrapper(*args)
)
# Run read loop (will exit when recv returns empty)
await conn._read_loop()
# Verify message was handled
mock_handle.assert_called_once()
handled_msg = mock_handle.call_args[0][0]
assert handled_msg.id == 1
assert handled_msg.result is True
@pytest.mark.asyncio
async def test_write_loop_sends_messages(self, tmp_path):
"""Test write loop sends queued messages."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Setup mock connection
mock_connection = MagicMock()
conn._connection = mock_connection
# Queue test data
test_data = b"test message data"
conn._outgoing_queue.put_nowait(test_data)
# Set stop event after first iteration
async def stop_after_one():
await asyncio.sleep(0.01)
conn._stop_event.set()
with patch("asyncio.get_running_loop") as mock_get_loop:
mock_loop = MagicMock()
mock_get_loop.return_value = mock_loop
mock_loop.run_in_executor.return_value = asyncio.sleep(0)
# Run both coroutines
await asyncio.gather(
conn._write_loop(), stop_after_one(), return_exceptions=True
)
# Verify data was sent
mock_loop.run_in_executor.assert_called()
call_args = mock_loop.run_in_executor.call_args_list[-1]
assert call_args[0][1] == mock_connection.sendall
assert call_args[0][2] == test_data
class TestEdgeCases:
"""Test edge cases and error conditions."""
@pytest.mark.asyncio
async def test_concurrent_requests(self, tmp_path):
"""Test handling concurrent requests."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
conn.process = MagicMock()
# Track sent messages
sent_messages = []
def track_message(msg):
sent_messages.append(msg)
with patch.object(conn, "_send_message", side_effect=track_message):
# Create futures for multiple requests
future1 = asyncio.create_task(
conn.send_request("method1", JsonRpcMessage(id=1))
)
future2 = asyncio.create_task(
conn.send_request("method2", JsonRpcMessage(id=2))
)
future3 = asyncio.create_task(
conn.send_request("method3", JsonRpcMessage(id=3))
)
# Let tasks start
await asyncio.sleep(0.01)
# Verify all messages were sent with unique IDs
assert len(sent_messages) == 3
ids = [msg.id for msg in sent_messages]
assert len(set(ids)) == 3 # All IDs are unique
# Simulate responses
for msg in sent_messages:
if msg.id in conn.state.pending_requests:
future = conn.state.pending_requests[msg.id]
future.set_result({"response": msg.id})
# Wait for all requests
results = await asyncio.gather(future1, future2, future3)
# Verify each got correct response
assert all("response" in r for r in results)
@pytest.mark.asyncio
async def test_stop_with_pending_requests(self, tmp_path):
"""Test stopping with pending requests."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
conn.process = MagicMock()
conn.process.terminate = MagicMock()
conn.process.wait = AsyncMock()
# Add pending requests
future1 = asyncio.Future()
future2 = asyncio.Future()
conn.state.pending_requests[1] = future1
conn.state.pending_requests[2] = future2
await conn.stop()
# Verify state was cleared
assert len(conn.state.pending_requests) == 0
def test_message_with_unicode(self, tmp_path):
"""Test handling messages with unicode content."""
binary_path = tmp_path / "lsp"
binary_path.touch()
conn = LSPConnection(str(binary_path), "/test")
# Create message with unicode
content = '{"jsonrpc":"2.0","method":"test","params":{"text":"Hello 世界 🚀"}}'
header = f"Content-Length: {len(content.encode('utf-8'))}\r\n\r\n"
buffer = header.encode("utf-8") + content.encode("utf-8")
message, remaining = conn._parse_message(buffer)
assert message is not None
assert message.method == "test"
assert message.params["text"] == "Hello 世界 🚀"
assert remaining == b""
```