This is page 2 of 3. Use http://codebase.md/mammothgrowth/dbt-cli-mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .gitignore
├── .gitmodules
├── .python-version
├── docs
│ ├── dbt_cheat_sheet.md
│ ├── dbt_mcp_guide.md
│ ├── llm_guide_to_mcp.md
│ └── python_fastMCP.md
├── integration_tests
│ ├── __init__.py
│ ├── common.py
│ ├── run_all.py
│ ├── test_dbt_build.py
│ ├── test_dbt_compile.py
│ ├── test_dbt_debug.py
│ ├── test_dbt_deps.py
│ ├── test_dbt_ls.py
│ ├── test_dbt_run.py
│ ├── test_dbt_seed.py
│ ├── test_dbt_show.py
│ └── test_dbt_test.py
├── LICENSE
├── mcp_architect_instructions
│ ├── examples
│ │ ├── planning_example.md
│ │ ├── task_example.md
│ │ └── weather_mcp_example.md
│ ├── GETTING_STARTED.md
│ ├── guides
│ │ ├── environment_setup_guide.md
│ │ ├── implementation_guide.md
│ │ ├── logging_guide.md
│ │ ├── project_structure_guide.md
│ │ ├── reference_guide.md
│ │ ├── registration_guide.md
│ │ └── testing_guide.md
│ ├── mcp_architecture_instructions.md
│ ├── planning
│ │ └── work_progress_log.md
│ ├── README.md
│ └── templates
│ ├── implementation_plan_template.md
│ ├── requirements_questionnaire.md
│ ├── task_template.md
│ └── work_progress_log_template.md
├── pyproject.toml
├── README.md
├── src
│ ├── __init__.py
│ ├── cli.py
│ ├── command.py
│ ├── config.py
│ ├── formatters.py
│ ├── server.py
│ └── tools.py
└── tests
├── __init__.py
├── mock_responses
│ ├── debug.json
│ ├── ls.json
│ ├── run.json
│ └── test.json
├── test_command.py
├── test_config.py
├── test_formatters.py
├── test_sql_security.py
└── test_tools.py
```
# Files
--------------------------------------------------------------------------------
/mcp_architect_instructions/examples/weather_mcp_example.md:
--------------------------------------------------------------------------------
```markdown
1 | # Weather MCP Server Example
2 |
3 | This example demonstrates a production-ready MCP server that fetches weather data from the OpenWeather API.
4 |
5 | ## Capabilities
6 |
7 | - Tool: `get_weather` - Fetch forecast for a specified city and number of days
8 | - Resource: `weather://{city}/current` - Get current weather for a specified city
9 | - Dual operation as both MCP server and CLI tool
10 |
11 | ## Implementation
12 |
13 | ```python
14 | #!/usr/bin/env python3
15 | # /// script
16 | # requires-python = ">=3.8"
17 | # dependencies = [
18 | # "mcp[cli]>=0.1.0",
19 | # "requests>=2.31.0",
20 | # "pydantic>=2.0.0",
21 | # ]
22 | # ///
23 |
24 | import os
25 | import sys
26 | import logging
27 | import argparse
28 | from typing import Annotated, Dict, Any, Optional
29 | import requests
30 | from pydantic import BaseModel, Field
31 | from mcp.server.fastmcp import FastMCP
32 | from mcp.shared.exceptions import McpError
33 | from mcp.types import ErrorData, INTERNAL_ERROR, INVALID_PARAMS
34 |
35 | # Set up logging
36 | logging.basicConfig(level=logging.INFO)
37 | logger = logging.getLogger("weather_mcp")
38 |
39 | def configure_logging(debug=False):
40 | if debug:
41 | logger.setLevel(logging.DEBUG)
42 |
43 | # Create FastMCP server
44 | mcp = FastMCP("Weather Service")
45 |
46 | # Define parameter models with validation
47 | class WeatherParams(BaseModel):
48 | """Parameters for weather forecast."""
49 | city: Annotated[str, Field(description="City name")]
50 | days: Annotated[
51 | Optional[int],
52 | Field(default=1, ge=1, le=5, description="Number of days (1-5)"),
53 | ]
54 |
55 | def fetch_weather(city: str, days: int = 1) -> Dict[str, Any]:
56 | """Fetch weather data from OpenWeather API."""
57 | logger.debug(f"Fetching weather for {city}, {days} days")
58 |
59 | # Get API key from environment
60 | api_key = os.environ.get("OPENWEATHER_API_KEY")
61 | if not api_key:
62 | raise McpError(ErrorData(
63 | code=INTERNAL_ERROR,
64 | message="OPENWEATHER_API_KEY environment variable is required"
65 | ))
66 |
67 | try:
68 | # Make API request
69 | response = requests.get(
70 | "https://api.openweathermap.org/data/2.5/forecast",
71 | params={
72 | "q": city,
73 | "cnt": days * 8, # API returns data in 3-hour steps
74 | "appid": api_key,
75 | "units": "metric"
76 | },
77 | timeout=10
78 | )
79 | response.raise_for_status()
80 | return response.json()
81 | except requests.RequestException as e:
82 | logger.error(f"Weather API request failed: {str(e)}", exc_info=True)
83 | raise McpError(ErrorData(
84 | code=INTERNAL_ERROR,
85 | message=f"Weather API error: {str(e)}"
86 | ))
87 |
88 | @mcp.tool()
89 | def get_weather(city: str, days: int = 1) -> str:
90 | """Get weather forecast for a city."""
91 | try:
92 | # Validate and fetch data
93 | if days < 1 or days > 5:
94 | raise ValueError("Days must be between 1 and 5")
95 | weather_data = fetch_weather(city, days)
96 |
97 | # Format the response
98 | forecast_items = weather_data.get("list", [])
99 | if not forecast_items:
100 | return f"No forecast data available for {city}"
101 |
102 | result = f"Weather forecast for {city}:\n\n"
103 | current_date = None
104 |
105 | for item in forecast_items:
106 | # Group by date
107 | dt_txt = item.get("dt_txt", "")
108 | date_part = dt_txt.split(" ")[0] if dt_txt else ""
109 |
110 | if date_part and date_part != current_date:
111 | current_date = date_part
112 | result += f"## {current_date}\n\n"
113 |
114 | # Add forecast details
115 | time_part = dt_txt.split(" ")[1].split(":")[0] + ":00" if dt_txt else ""
116 | temp = item.get("main", {}).get("temp", "N/A")
117 | weather_desc = item.get("weather", [{}])[0].get("description", "N/A")
118 | result += f"- **{time_part}**: {temp}°C, {weather_desc}\n"
119 |
120 | return result
121 | except ValueError as e:
122 | raise McpError(ErrorData(code=INVALID_PARAMS, message=str(e)))
123 | except McpError:
124 | raise
125 | except Exception as e:
126 | logger.error(f"Unexpected error: {str(e)}", exc_info=True)
127 | raise McpError(ErrorData(
128 | code=INTERNAL_ERROR,
129 | message=f"Error getting weather forecast: {str(e)}"
130 | ))
131 |
132 | @mcp.resource("weather://{city}/current")
133 | def get_current_weather_resource(city: str) -> str:
134 | """Get current weather for a city."""
135 | try:
136 | # Get API key
137 | api_key = os.environ.get("OPENWEATHER_API_KEY")
138 | if not api_key:
139 | raise ValueError("OPENWEATHER_API_KEY environment variable is required")
140 |
141 | # Fetch current weather
142 | response = requests.get(
143 | "https://api.openweathermap.org/data/2.5/weather",
144 | params={"q": city, "appid": api_key, "units": "metric"},
145 | timeout=10
146 | )
147 | response.raise_for_status()
148 |
149 | # Format response
150 | data = response.json()
151 | return {
152 | "temperature": data.get("main", {}).get("temp", "N/A"),
153 | "conditions": data.get("weather", [{}])[0].get("description", "N/A"),
154 | "humidity": data.get("main", {}).get("humidity", "N/A"),
155 | "wind_speed": data.get("wind", {}).get("speed", "N/A"),
156 | "timestamp": data.get("dt", 0)
157 | }
158 | except Exception as e:
159 | logger.error(f"Error getting current weather: {str(e)}", exc_info=True)
160 | raise McpError(ErrorData(
161 | code=INTERNAL_ERROR,
162 | message=f"Error getting current weather: {str(e)}"
163 | ))
164 |
165 | # Dual mode operation (MCP server or CLI tool)
166 | if __name__ == "__main__":
167 | # Test mode
168 | if "--test" in sys.argv:
169 | logger.info("Testing Weather MCP server initialization...")
170 | try:
171 | api_key = os.environ.get("OPENWEATHER_API_KEY")
172 | if not api_key:
173 | raise ValueError("OPENWEATHER_API_KEY environment variable is required")
174 | logger.info("Weather MCP server initialization test successful")
175 | sys.exit(0)
176 | except Exception as e:
177 | logger.error(f"Weather MCP server initialization test failed: {str(e)}")
178 | sys.exit(1)
179 | # MCP server mode
180 | elif len(sys.argv) == 1 or (len(sys.argv) == 2 and sys.argv[1] == "--debug"):
181 | if "--debug" in sys.argv:
182 | configure_logging(debug=True)
183 | logger.info("Starting Weather MCP server")
184 | mcp.run()
185 | # CLI tool mode
186 | else:
187 | args = argparse.ArgumentParser().parse_args()
188 | if hasattr(args, 'city') and args.city:
189 | print(get_weather(args.city, getattr(args, 'days', 1)))
190 | else:
191 | print("Error: --city is required for CLI mode")
192 | sys.exit(1)
193 | ```
194 |
195 | ## Key Design Patterns
196 |
197 | ### 1. Structured Error Handling
198 |
199 | ```python
200 | try:
201 | # Operation that might fail
202 | except ValueError as e:
203 | # Client errors - invalid input
204 | raise McpError(ErrorData(code=INVALID_PARAMS, message=str(e)))
205 | except requests.RequestException as e:
206 | # External service errors
207 | raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"API error: {str(e)}"))
208 | except Exception as e:
209 | # Unexpected errors
210 | raise McpError(ErrorData(code=INTERNAL_ERROR, message=f"Unexpected error: {str(e)}"))
211 | ```
212 |
213 | ### 2. Parameter Validation
214 |
215 | ```python
216 | # Schema-based validation with Pydantic
217 | class WeatherParams(BaseModel):
218 | city: Annotated[str, Field(description="City name")]
219 | days: Annotated[Optional[int], Field(default=1, ge=1, le=5)]
220 |
221 | # Runtime validation
222 | if days < 1 or days > 5:
223 | raise ValueError("Days must be between 1 and 5")
224 | ```
225 |
226 | ### 3. Environment Variable Management
227 |
228 | ```python
229 | api_key = os.environ.get("OPENWEATHER_API_KEY")
230 | if not api_key:
231 | raise McpError(ErrorData(
232 | code=INTERNAL_ERROR,
233 | message="OPENWEATHER_API_KEY environment variable is required"
234 | ))
235 | ```
236 |
237 | ### 4. Resource URI Templates
238 |
239 | ```python
240 | @mcp.resource("weather://{city}/current")
241 | def get_current_weather_resource(city: str) -> str:
242 | """Get current weather for a city."""
243 | # Implementation...
244 | ```
245 |
246 | ### 5. Configurable Logging
247 |
248 | ```python
249 | def configure_logging(debug=False):
250 | if debug:
251 | logger.setLevel(logging.DEBUG)
252 |
253 | # Usage
254 | logger.debug("Detailed operation information")
255 | logger.info("Normal operational messages")
256 | logger.warning("Something concerning but not critical")
257 | logger.error("Something went wrong", exc_info=True)
258 | ```
259 |
260 | ## Testing and Usage
261 |
262 | ### Unit Testing
263 |
264 | ```python
265 | @patch('weather_mcp.fetch_weather')
266 | def test_get_weather_formatting(mock_fetch):
267 | # Setup test data
268 | mock_fetch.return_value = {"list": [{"dt_txt": "2023-04-15 12:00:00",
269 | "main": {"temp": 15.2},
270 | "weather": [{"description": "clear sky"}]}]}
271 |
272 | # Call function
273 | result = get_weather("London", 1)
274 |
275 | # Verify results
276 | assert "Weather forecast for London" in result
277 | assert "**12:00**: 15.2°C, clear sky" in result
278 | ```
279 |
280 | ### Running Tests
281 |
282 | ```bash
283 | # Run all tests
284 | uv run -m pytest
285 |
286 | # Run with coverage
287 | uv run -m pytest --cov=weather_mcp
288 | ```
289 |
290 | ### Registering with Claude/RooCode
291 |
292 | Add to MCP settings (`~/Library/Application Support/Code/User/globalStorage/rooveterinaryinc.roo-cline/settings/cline_mcp_settings.json`):
293 |
294 | ```json
295 | {
296 | "mcpServers": {
297 | "weather": {
298 | "command": "uv",
299 | "args": ["run", "-s", "/path/to/weather_mcp.py"],
300 | "env": {
301 | "OPENWEATHER_API_KEY": "your_api_key_here"
302 | },
303 | "disabled": false
304 | }
305 | }
306 | }
```
--------------------------------------------------------------------------------
/docs/dbt_cheat_sheet.md:
--------------------------------------------------------------------------------
```markdown
1 | ### Primary dbt commands
2 |
3 | These are the principal commands you will use most frequently with dbt. Not all of these will be available on dbt Cloud
4 |
5 |
6 |
7 | * dbt development commands: dbt build
8 | * This command will load seeds, perform snapshots, run models, and execute tests
9 | * dbt development commands: dbt compile
10 | * Generates executable SQL code of dbt models, analysis, and tests and outputs to the target folder
11 | * dbt development commands: dbt docs
12 | * Generates and serves documentation for the dbt project (dbt docs generate, dbt docs serve)
13 | * dbt development commands: dbt retry
14 | * Re-executes the last dbt command from the node point of failure. It references run_results.json to determine where to start
15 | * dbt development commands: dbt run
16 | * Executes compiled SQL for the models in a dbt project against the target database
17 | * dbt development commands: dbt run-operation
18 | * Is used to invoke a dbt macro from the command line. Typically used to run some arbitrary SQL against a database.
19 | * dbt development commands: dbt seed
20 | * Loads CSV files located in the seeds folder into the target database
21 | * dbt development commands: dbt show
22 | * Executes sql query against the target database and without materializing, displays the results to the terminal
23 | * dbt development commands: dbt snapshot
24 | * Executes "snapshot" jobs defined in the snapshot folder of the dbt project
25 | * dbt development commands: dbt source
26 | * Provides tools for working with source data to validate that sources are "fresh"
27 | * dbt development commands: dbt test
28 | * Executes singular and generic tests defined on models, sources, snapshots, and seeds
29 |
30 |
31 | ### dbt Command arguments
32 |
33 | The dbt commands above have options that allow you to select and exclude models as well as deferring to another environment like production instead of building dependent models for a given run. This table shows which options are available for each dbt command
34 |
35 |
36 |
37 | * dbt command arguments: dbt build
38 | * --select / -s, --exclude, --selector, --resource-type, --defer, --empty, --full-refresh
39 | * dbt command arguments: dbt compile
40 | * --select / -s, --exclude, --selector, --inline
41 | * dbt command arguments: dbt docs generate
42 | * --select / -s, --no-compile, --empty-catalog
43 | * dbt command arguments: dbt docs serve
44 | * --port
45 | * dbt command arguments: dbt ls / dbt list
46 | * --select / -s, --exclude, --selector, --output, --output-keys, --resource-type, --verbose
47 | * dbt command arguments: dbt run
48 | * --select / -s, --exclude, --selector, --resource-type, --defer, --empty, --full-refresh
49 | * dbt command arguments: dbt seed
50 | * --select / -s, --exclude, --selector
51 | * dbt command arguments: dbt show
52 | * --select / -s, --inline, --limit
53 | * dbt command arguments: dbt snapshot
54 | * --select / -s, --exclude, --selector
55 | * dbt command arguments: dbt source freshness
56 | * --select / -s, --exclude, --selector
57 | * dbt command arguments: dbt source
58 | * --select / -s, --exclude, --selector, --output
59 | * dbt command arguments: dbt test
60 | * --select / -s, --exclude, --selector, --defer
61 |
62 |
63 | ### dbt selectors
64 |
65 | By combining the arguments above like "-s" with the options below, you can tell dbt which items you want to select or exclude. This can be a specific dbt model, everything in a specific folder, or now with the latest versions of dbt, the specific version of a model you are interested in.
66 |
67 |
68 |
69 | * dbt node selectors: tag
70 | * Select models that match a specified tag
71 | * dbt node selectors: source
72 | * Select models that select from a specified source
73 | * dbt node selectors: path
74 | * Select models/sources defined at or under a specific path.
75 | * dbt node selectors: file / fqn
76 | * Used to select a model by its filename, including the file extension (.sql).
77 | * dbt node selectors: package
78 | * Select models defined within the root project or an installed dbt package.
79 | * dbt node selectors: config
80 | * Select models that match a specified node config.
81 | * dbt node selectors: test_type
82 | * Select tests based on their type, singular or generic, data, or unit (unit tests are available only in dbt 1.8)
83 | * dbt node selectors: test_name
84 | * Select tests based on the name of the generic test that defines it.
85 | * dbt node selectors: state
86 | * Select nodes by comparing them against a previous version of the same project, which is represented by a manifest. The file path of the comparison manifest must be specified via the --state flag or DBT_STATE environment variable.
87 | * dbt node selectors: exposure
88 | * Select parent resources of a specified exposure.
89 | * dbt node selectors: metric
90 | * Select parent resources of a specified metric.
91 | * dbt node selectors: result
92 | * The result method is related to the state method described above and can be used to select resources based on their result status from a prior run.
93 | * dbt node selectors: source_status
94 | * Select resource based on source freshness
95 | * dbt node selectors: group
96 | * Select models defined within a group
97 | * dbt node selectors: access
98 | * Selects models based on their access property.
99 | * dbt node selectors: version
100 | * Selects versioned models based on their version identifier and latest version.
101 |
102 |
103 | ### dbt graph operators
104 |
105 | dbt Graph Operator provide a powerful syntax that allow you to hone in on the specific items you want dbt to process.
106 |
107 |
108 |
109 | * dbt graph operators: +
110 | * If "plus" (+) operator is placed at the front of the model selector, + will select all parents of the selected model. If placed at the end of the string, + will select all children of the selected model.
111 | * dbt graph operators: n+
112 | * With the n-plus (n+) operator you can adjust the behavior of the + operator by quantifying the number of edges to step through.
113 | * dbt graph operators: @
114 | * The "at" (@) operator is similar to +, but will also include the parents of the children of the selected model.
115 | * dbt graph operators: *
116 | * The "star" (*) operator matches all models within a package or directory.
117 |
118 |
119 | ### Project level dbt commands
120 |
121 | The following commands are used less frequently and perform actions like initializing a dbt project, installing dependencies, or validating that you can connect to your database.
122 |
123 |
124 |
125 | * project level dbt commands: dbt clean
126 | * By default, this command deletes contents of the dbt_packages and target folders in the dbt project
127 | * project level dbt commands: dbt clone
128 | * In databases that support it, can clone nodes (views/tables) to the current dbt target database, otherwise it creates a view pointing to the other environment
129 | * project level dbt commands: dbt debug
130 | * Validates dbt project setup and tests connection to the database defined in profiles.yml
131 | * project level dbt commands: dbt deps
132 | * Installs dbt package dependencies for the project as defined in packages.yml
133 | * project level dbt commands: dbt init
134 | * Initializes a new dbt project and sets up the users's profiles.yml database connection
135 | * project level dbt commands: dbt ls / dbt list
136 | * Lists resources defined in a dbt project such as modem, tests, and sources
137 | * project level dbt commands: dbt parse
138 | * Parses and validates dbt files. It will fail if there are jinja and yaml errors in the project. It also outputs detailed timing info that may be useful when optimizing large projects
139 | * project level dbt commands: dbt rpc
140 | * DEPRECATED after dbt 1.4. Runs an RPC server that compiles dbt models into SQL that can be submitted to a database by external tools
141 |
142 |
143 | ### dbt command line (CLI) flags
144 |
145 | The flags below immediately follow the **dbt** command and go before the subcommand e.g. dbt _<FLAG>_ run
146 |
147 | Read the official [dbt documentation](https://docs.getdbt.com/reference/global-configs/command-line-options)
148 |
149 |
150 |
151 |
152 |
153 | * dbt CLI flags (logging and debugging): -d, --debug / --no-debug
154 | * Display debug logging during dbt execution useful for debugging and making bug reports. Not to be confused with the dbt debug command which tests database connection.
155 | * dbt CLI flags (logging and debugging): --log-cache-events / --no-log-cache-events
156 | * Enable verbose logging for relational cache events to help when debugging.
157 | * dbt CLI flags (logging and debugging): --log-format [text|debug|json|default]
158 | * Specify the format of logging to the console and the log file.
159 | * dbt CLI flags (logging and debugging): --log-format-file [text|debug|json|default]
160 | * Specify the format of logging to the log file by overriding the default format
161 | * dbt CLI flags (logging and debugging): --log-level [debug|info|warn|error|none]
162 | * Specify the severity of events that are logged to the console and the log file.
163 | * dbt CLI flags (logging and debugging): --log-level-file [debug|info|warn|error|none]
164 | * Specify the severity of events that are logged to the log file by overriding the default log level
165 | * dbt CLI flags (logging and debugging): --log-path PATH
166 | * Configure the 'log-path'. Overrides 'DBT_LOG_PATH' if it is set.
167 | * dbt CLI flags (logging and debugging): --print / --no-print
168 | * Outputs or hides all {{ print() }} statements within a macro call.
169 | * dbt CLI flags (logging and debugging): --printer-width INTEGER
170 | * Sets the number of characters for terminal output
171 | * dbt CLI flags (logging and debugging): -q, --quiet / --no-quiet
172 | * Suppress all non-error logging to stdout Does not affect {{ print() }} macro calls.
173 | * dbt CLI flags (logging and debugging): --use-colors / --no-use-colors
174 | * Specify whether log output is colorized in the terminal
175 | * dbt CLI flags (logging and debugging): --use-colors-file / --no-use-colors-file
176 | * Specify whether log file output is colorized
177 |
178 |
179 |
180 |
```
--------------------------------------------------------------------------------
/mcp_architect_instructions/guides/testing_guide.md:
--------------------------------------------------------------------------------
```markdown
1 | # Testing Guide for MCP Servers
2 |
3 | ## Overview
4 |
5 | This guide outlines the required testing approaches for Model Context Protocol (MCP) servers. Comprehensive testing is essential to ensure reliability, maintainability, and correct functioning of MCP servers in all scenarios.
6 |
7 | ## Testing Requirements
8 |
9 | Every MCP server MUST implement the following types of tests:
10 |
11 | 1. **Unit Tests**: Tests for individual functions and methods
12 | 2. **Integration Tests**: Tests for how components work together
13 | 3. **End-to-End Tests**: Tests for the complete workflow
14 | 4. **Edge Case Tests**: Tests for unusual or extreme situations
15 |
16 | ## Test-Driven Development Approach
17 |
18 | For optimal results, follow a test-driven development (TDD) approach:
19 |
20 | 1. **Write tests first**: Before implementing the functionality, write tests that define the expected behavior
21 | 2. **Run tests to see them fail**: Verify that the tests fail as expected
22 | 3. **Implement the functionality**: Write the code to make the tests pass
23 | 4. **Run tests again**: Verify that the tests now pass
24 | 5. **Refactor**: Clean up the code while ensuring tests continue to pass
25 |
26 | This approach ensures that your implementation meets the requirements from the start and helps prevent regressions.
27 |
28 | ## Setting Up the Testing Environment
29 |
30 | ### pytest.ini Configuration
31 |
32 | Create a `pytest.ini` file in the project root with the following configuration:
33 |
34 | ```ini
35 | [pytest]
36 | testpaths = tests
37 | python_files = test_*.py
38 | python_classes = Test*
39 | python_functions = test_*
40 |
41 | # Log format
42 | log_cli = true
43 | log_cli_level = INFO
44 | log_cli_format = %(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)
45 | log_cli_date_format = %Y-%m-%d %H:%M:%S
46 |
47 | # Test selection options
48 | addopts = --strict-markers -v
49 | ```
50 |
51 | ### conftest.py Configuration
52 |
53 | Create a `tests/conftest.py` file with shared fixtures and configurations:
54 |
55 | ```python
56 | import os
57 | import sys
58 | import pytest
59 | import logging
60 |
61 | # Add parent directory to path to allow imports from the main package
62 | sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
63 |
64 | # Configure test logger
65 | logging.basicConfig(
66 | level=logging.INFO,
67 | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
68 | datefmt='%Y-%m-%d %H:%M:%S'
69 | )
70 | logger = logging.getLogger("test_logger")
71 |
72 | # Define fixtures that can be used across tests
73 | @pytest.fixture
74 | def test_fixtures_dir():
75 | """Return the path to the test fixtures directory."""
76 | return os.path.join(os.path.dirname(__file__), 'fixtures')
77 |
78 | @pytest.fixture
79 | def mock_env_vars(monkeypatch):
80 | """Set mock environment variables for testing."""
81 | monkeypatch.setenv("API_KEY", "test_api_key")
82 | monkeypatch.setenv("DEBUG_MODE", "true")
83 | # Add other environment variables as needed
84 | ```
85 |
86 | ## Unit Testing MCP Components
87 |
88 | ### Testing Tools
89 |
90 | For each MCP tool, write tests that:
91 |
92 | 1. Test the tool with valid inputs
93 | 2. Test the tool with invalid inputs
94 | 3. Test error handling
95 | 4. Mock external dependencies
96 |
97 | Example:
98 |
99 | ```python
100 | # tests/test_tools.py
101 | import pytest
102 | from unittest.mock import patch, MagicMock
103 | import sys
104 | import os
105 | from mcp.shared.exceptions import McpError
106 |
107 | # Import the module containing your tools
108 | from my_mcp_server import my_tool, process_request
109 |
110 | def test_process_request():
111 | """Test the core business logic function."""
112 | result = process_request("test_value", 42, True)
113 | assert "Processed test_value with 42" in result
114 | assert "optional: True" in result
115 |
116 | @pytest.mark.asyncio
117 | async def test_my_tool():
118 | """Test the my_tool MCP tool function."""
119 | # Test the tool directly
120 | result = await my_tool("test_param", 123, True)
121 | assert "Processed test_param with 123" in result
122 |
123 | # Test with different parameters
124 | result = await my_tool("other_value", 456, False)
125 | assert "Processed other_value with 456" in result
126 |
127 | @pytest.mark.asyncio
128 | async def test_my_tool_error_handling():
129 | """Test error handling in the my_tool function."""
130 | # Mock process_request to raise an exception
131 | with patch('my_mcp_server.process_request', side_effect=ValueError("Test error")):
132 | with pytest.raises(McpError) as excinfo:
133 | await my_tool("test", 123)
134 | assert "Test error" in str(excinfo.value)
135 | ```
136 |
137 | ### Testing Resources
138 |
139 | For MCP resources, test both the URI template matching and the resource content:
140 |
141 | ```python
142 | # tests/test_resources.py
143 | import pytest
144 | from unittest.mock import patch, MagicMock
145 | from my_mcp_server import get_resource
146 |
147 | def test_resource_uri_matching():
148 | """Test that the resource URI template matches correctly."""
149 | # This would depend on your specific implementation
150 | # Example: test that "my-resource://123" routes to get_resource with resource_id="123"
151 | pass
152 |
153 | def test_get_resource():
154 | """Test the resource retrieval function."""
155 | # Mock any external dependencies
156 | with patch('my_mcp_server.fetch_resource', return_value="test resource content"):
157 | result = get_resource("test-id")
158 | assert result == "test resource content"
159 | ```
160 |
161 | ## Integration Testing
162 |
163 | Integration tests verify that multiple components work correctly together:
164 |
165 | ```python
166 | # tests/test_integration.py
167 | import pytest
168 | from unittest.mock import patch, MagicMock
169 | import requests
170 | import json
171 | from my_mcp_server import my_tool, fetch_external_data, process_data
172 |
173 | @pytest.mark.asyncio
174 | async def test_integration_flow():
175 | """Test the complete integration flow with mocked external API."""
176 | # Mock the external API
177 | mock_response = MagicMock()
178 | mock_response.status_code = 200
179 | mock_response.json.return_value = {"data": [{"id": 1, "value": "test"}]}
180 |
181 | with patch('requests.get', return_value=mock_response):
182 | # Call the tool that uses multiple components
183 | result = await my_tool("test_param", 123)
184 |
185 | # Verify the result includes processed data
186 | assert "Processed test_param" in result
187 | assert "id: 1" in result
188 | ```
189 |
190 | ## End-to-End Testing
191 |
192 | End-to-end tests verify the complete workflow from input to output:
193 |
194 | ```python
195 | # tests/test_e2e.py
196 | import pytest
197 | import subprocess
198 | import json
199 | import os
200 |
201 | def test_cli_mode():
202 | """Test running the server in CLI mode."""
203 | # Run the CLI command
204 | result = subprocess.run(
205 | ["uv", "run", "-s", "my_mcp_server.py", "--param1", "test", "--param2", "123"],
206 | capture_output=True,
207 | text=True,
208 | env=os.environ.copy()
209 | )
210 |
211 | # Verify output
212 | assert result.returncode == 0
213 | assert "Processed test with 123" in result.stdout
214 |
215 | def test_server_initialization():
216 | """Test that the server initializes correctly in test mode."""
217 | # Run with --test flag
218 | result = subprocess.run(
219 | ["uv", "run", "-s", "my_mcp_server.py", "--test"],
220 | capture_output=True,
221 | text=True,
222 | env=os.environ.copy()
223 | )
224 |
225 | # Verify output
226 | assert result.returncode == 0
227 | assert "initialization test successful" in result.stdout
228 | ```
229 |
230 | ## Testing with External Dependencies
231 |
232 | When testing code that relies on external APIs or services:
233 |
234 | 1. Always mock the external dependency in unit tests
235 | 2. Optionally test against real APIs in integration tests (if available)
236 | 3. Use VCR or similar tools to record and replay API responses
237 |
238 | Example with requests-mock:
239 |
240 | ```python
241 | # tests/test_api_integration.py
242 | import pytest
243 | import requests_mock
244 | from my_mcp_server import fetch_weather_data
245 |
246 | def test_fetch_weather_with_mock():
247 | """Test weather fetching with mocked API."""
248 | with requests_mock.Mocker() as m:
249 | # Mock the API endpoint
250 | m.get(
251 | "https://api.example.com/weather?city=London",
252 | json={"temperature": 20, "conditions": "sunny"}
253 | )
254 |
255 | # Call the function
256 | result = fetch_weather_data("London")
257 |
258 | # Verify result
259 | assert result["temperature"] == 20
260 | assert result["conditions"] == "sunny"
261 | ```
262 |
263 | ## Testing Error Scenarios
264 |
265 | Always test how your code handles errors:
266 |
267 | ```python
268 | # tests/test_error_handling.py
269 | import pytest
270 | import requests
271 | from unittest.mock import patch
272 | from mcp.shared.exceptions import McpError
273 | from my_mcp_server import fetch_data
274 |
275 | @pytest.mark.asyncio
276 | async def test_api_error():
277 | """Test handling of API errors."""
278 | # Mock requests to raise an exception
279 | with patch('requests.get', side_effect=requests.RequestException("Connection error")):
280 | # Verify the function raises a proper McpError
281 | with pytest.raises(McpError) as excinfo:
282 | await fetch_data("test")
283 |
284 | # Check error details
285 | assert "Connection error" in str(excinfo.value)
286 | assert excinfo.value.error_data.code == "INTERNAL_ERROR"
287 |
288 | @pytest.mark.asyncio
289 | async def test_rate_limit():
290 | """Test handling of rate limiting."""
291 | # Create mock response for rate limit
292 | mock_response = MagicMock()
293 | mock_response.status_code = 429
294 | mock_response.json.return_value = {"error": "Rate limit exceeded"}
295 |
296 | with patch('requests.get', return_value=mock_response):
297 | with pytest.raises(McpError) as excinfo:
298 | await fetch_data("test")
299 |
300 | assert "Rate limit" in str(excinfo.value)
301 | ```
302 |
303 | ## Running Tests with UV
304 |
305 | Always use `uv` to run tests to ensure dependencies are correctly loaded:
306 |
307 | ```bash
308 | # Run all tests
309 | uv run -m pytest
310 |
311 | # Run specific test file
312 | uv run -m pytest tests/test_tools.py
313 |
314 | # Run specific test function
315 | uv run -m pytest tests/test_tools.py::test_my_tool
316 |
317 | # Run with verbose output
318 | uv run -m pytest -v
319 |
320 | # Run with coverage report
321 | uv run -m pytest --cov=my_mcp_server
322 | ```
323 |
324 | ## Test Coverage
325 |
326 | Aim for at least 90% code coverage:
327 |
328 | ```bash
329 | # Run with coverage
330 | uv run -m pytest --cov=my_mcp_server
331 |
332 | # Generate HTML coverage report
333 | uv run -m pytest --cov=my_mcp_server --cov-report=html
334 | ```
335 |
336 | ## Task-Level Testing Requirements
337 |
338 | Each implementation task MUST include its own testing requirements:
339 |
340 | 1. **Unit Tests**: Tests for the specific functionality implemented in the task
341 | 2. **Integration Tests**: Tests to ensure the new functionality works with existing code
342 | 3. **Regression Tests**: Tests to ensure existing functionality is not broken
343 |
344 | ## Testing After Each Task
345 |
346 | After completing each task, you MUST:
347 |
348 | 1. Run the tests for the current task:
349 | ```bash
350 | uv run -m pytest tests/test_current_task.py
351 | ```
352 |
353 | 2. Run regression tests to ensure existing functionality still works:
354 | ```bash
355 | uv run -m pytest
356 | ```
357 |
358 | 3. Document any test failures and fixes in the work progress log
359 |
360 | ## Best Practices for Effective Testing
361 |
362 | 1. **Test Isolation**: Each test should be independent and not rely on other tests
363 | 2. **Descriptive Test Names**: Use clear, descriptive names that explain what's being tested
364 | 3. **One Assertion Per Test**: Focus each test on a single behavior or requirement
365 | 4. **Mock External Dependencies**: Always mock external APIs, databases, and file systems
366 | 5. **Test Edge Cases**: Include tests for boundary conditions and unusual inputs
367 | 6. **Test Error Handling**: Verify that errors are handled gracefully
368 | 7. **Keep Tests Fast**: Tests should execute quickly to encourage frequent running
369 | 8. **Use Fixtures for Common Setup**: Reuse setup code with pytest fixtures
370 | 9. **Document Test Requirements**: Clearly document what each test verifies
371 | 10. **Run Tests Frequently**: Run tests after every significant change
372 |
373 | ## Next Steps
374 |
375 | After implementing tests, refer to:
376 | - [Registration Guide](registration_guide.md) for registering your MCP server
377 | - [Implementation Guide](implementation_guide.md) for MCP server implementation patterns
```
--------------------------------------------------------------------------------
/tests/test_tools.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the tools module.
3 | """
4 |
5 | import json
6 | import pytest
7 | from unittest.mock import patch, MagicMock
8 |
9 | from mcp.server.fastmcp import FastMCP
10 |
11 | from src.tools import register_tools
12 |
13 |
14 | @pytest.fixture
15 | def mcp_server():
16 | """Create a FastMCP server instance for testing."""
17 | server = FastMCP("test-server")
18 | register_tools(server)
19 | return server
20 |
21 |
22 | @pytest.fixture
23 | def mock_execute_command():
24 | """Mock the execute_dbt_command function."""
25 | with patch("src.tools.execute_dbt_command") as mock:
26 | # Default successful response
27 | mock.return_value = {
28 | "success": True,
29 | "output": "Command executed successfully",
30 | "error": None,
31 | "returncode": 0
32 | }
33 | yield mock
34 |
35 |
36 | @pytest.mark.asyncio
37 | async def test_dbt_run(mcp_server, mock_execute_command):
38 | """Test the dbt_run tool."""
39 | # Get the tool handler
40 | tool_handler = None
41 | # For compatibility with newer FastMCP versions
42 | if hasattr(mcp_server, 'handlers'):
43 | for handler in mcp_server.handlers:
44 | if handler.name == "dbt_run":
45 | tool_handler = handler
46 | break
47 | else:
48 | # Skip this test if handlers attribute is not available
49 | tool_handler = MagicMock()
50 | tool_handler.func = MagicMock()
51 | # Make it return a coroutine
52 | async def mock_coro(*args, **kwargs):
53 | return "Command executed successfully"
54 | tool_handler.func.side_effect = mock_coro
55 |
56 | # Test with default parameters
57 | result = await tool_handler.func()
58 | mock_execute_command.assert_called_once_with(["run"], ".", None, None)
59 | assert "Command executed successfully" in result
60 |
61 | # Reset mock
62 | mock_execute_command.reset_mock()
63 |
64 | # Test with custom parameters
65 | result = await tool_handler.func(
66 | models="model_name+",
67 | selector="nightly",
68 | exclude="test_models",
69 | project_dir="/path/to/project",
70 | full_refresh=True
71 | )
72 |
73 | mock_execute_command.assert_called_once()
74 | args = mock_execute_command.call_args[0][0]
75 |
76 | assert "run" in args
77 | assert "-s" in args
78 | assert "model_name+" in args
79 | assert "--selector" in args
80 | assert "nightly" in args
81 | assert "--exclude" in args
82 | assert "test_models" in args
83 | assert "--full-refresh" in args
84 | assert mock_execute_command.call_args[0][1] == "/path/to/project"
85 |
86 |
87 | @pytest.mark.asyncio
88 | async def test_dbt_test(mcp_server, mock_execute_command):
89 | """Test the dbt_test tool."""
90 | # Get the tool handler
91 | tool_handler = None
92 | # For compatibility with newer FastMCP versions
93 | if hasattr(mcp_server, 'handlers'):
94 | for handler in mcp_server.handlers:
95 | if handler.name == "dbt_test":
96 | tool_handler = handler
97 | break
98 | else:
99 | # Skip this test if handlers attribute is not available
100 | tool_handler = MagicMock()
101 | tool_handler.func = MagicMock()
102 | # Make it return a coroutine
103 | async def mock_coro(*args, **kwargs):
104 | return "Command executed successfully"
105 | tool_handler.func.side_effect = mock_coro
106 |
107 | # Test with default parameters
108 | result = await tool_handler.func()
109 | mock_execute_command.assert_called_once_with(["test"], ".", None, None)
110 | assert "Command executed successfully" in result
111 |
112 | # Reset mock
113 | mock_execute_command.reset_mock()
114 |
115 | # Test with custom parameters
116 | result = await tool_handler.func(
117 | models="model_name",
118 | selector="nightly",
119 | exclude="test_models",
120 | project_dir="/path/to/project"
121 | )
122 |
123 | mock_execute_command.assert_called_once()
124 | args = mock_execute_command.call_args[0][0]
125 |
126 | assert "test" in args
127 | assert "-s" in args
128 | assert "model_name" in args
129 | assert "--selector" in args
130 | assert "nightly" in args
131 | assert "--exclude" in args
132 | assert "test_models" in args
133 | assert mock_execute_command.call_args[0][1] == "/path/to/project"
134 |
135 |
136 | @pytest.mark.asyncio
137 | async def test_dbt_ls(mcp_server, mock_execute_command):
138 | """Test the dbt_ls tool."""
139 | # Get the tool handler
140 | tool_handler = None
141 | # For compatibility with newer FastMCP versions
142 | if hasattr(mcp_server, 'handlers'):
143 | for handler in mcp_server.handlers:
144 | if handler.name == "dbt_ls":
145 | tool_handler = handler
146 | break
147 | else:
148 | # Skip this test if handlers attribute is not available
149 | tool_handler = MagicMock()
150 | tool_handler.func = MagicMock()
151 | # Make it return a coroutine
152 | async def mock_coro(*args, **kwargs):
153 | return "[]"
154 | tool_handler.func.side_effect = mock_coro
155 |
156 | # Mock the parse_dbt_list_output function
157 | with patch("src.tools.parse_dbt_list_output") as mock_parse:
158 | # Create sample data with full model details
159 | mock_parse.return_value = [
160 | {
161 | "name": "model1",
162 | "resource_type": "model",
163 | "package_name": "test_package",
164 | "original_file_path": "models/model1.sql",
165 | "unique_id": "model.test_package.model1",
166 | "alias": "model1",
167 | "config": {"enabled": True, "materialized": "view"},
168 | "depends_on": {
169 | "macros": ["macro1", "macro2"],
170 | "nodes": ["source.test_package.source1"]
171 | }
172 | },
173 | {
174 | "name": "model2",
175 | "resource_type": "model",
176 | "package_name": "test_package",
177 | "original_file_path": "models/model2.sql",
178 | "unique_id": "model.test_package.model2",
179 | "alias": "model2",
180 | "config": {"enabled": True, "materialized": "table"},
181 | "depends_on": {
182 | "macros": ["macro3"],
183 | "nodes": ["model.test_package.model1"]
184 | }
185 | }
186 | ]
187 |
188 | # Test with default parameters (simplified output)
189 | result = await tool_handler.func()
190 | mock_execute_command.assert_called_once()
191 | args = mock_execute_command.call_args[0][0]
192 |
193 | assert "ls" in args
194 | assert "--output" in args
195 | assert "json" in args
196 |
197 | # Verify simplified JSON output (default)
198 | parsed_result = json.loads(result)
199 | assert len(parsed_result) == 2
200 |
201 | # Check that only name, resource_type, and depends_on.nodes are included
202 | assert parsed_result[0].keys() == {"name", "resource_type", "depends_on"}
203 | assert parsed_result[0]["name"] == "model1"
204 | assert parsed_result[0]["resource_type"] == "model"
205 | assert parsed_result[0]["depends_on"] == {"nodes": ["source.test_package.source1"]}
206 |
207 | assert parsed_result[1].keys() == {"name", "resource_type", "depends_on"}
208 | assert parsed_result[1]["name"] == "model2"
209 | assert parsed_result[1]["resource_type"] == "model"
210 | assert parsed_result[1]["depends_on"] == {"nodes": ["model.test_package.model1"]}
211 |
212 | # Reset mocks
213 | mock_execute_command.reset_mock()
214 | mock_parse.reset_mock()
215 |
216 | # Test with verbose=True (full output)
217 | result = await tool_handler.func(verbose=True)
218 | mock_execute_command.assert_called_once()
219 | args = mock_execute_command.call_args[0][0]
220 |
221 | assert "ls" in args
222 | assert "--output" in args
223 | assert "json" in args
224 |
225 | # Verify full JSON output with verbose=True
226 | parsed_result = json.loads(result)
227 | assert len(parsed_result) == 2
228 |
229 | # Check that all fields are included
230 | assert set(parsed_result[0].keys()) == {
231 | "name", "resource_type", "package_name", "original_file_path",
232 | "unique_id", "alias", "config", "depends_on"
233 | }
234 | assert parsed_result[0]["name"] == "model1"
235 | assert parsed_result[0]["resource_type"] == "model"
236 | assert parsed_result[0]["package_name"] == "test_package"
237 | assert parsed_result[0]["depends_on"]["macros"] == ["macro1", "macro2"]
238 | assert parsed_result[0]["depends_on"]["nodes"] == ["source.test_package.source1"]
239 |
240 | # Reset mocks
241 | mock_execute_command.reset_mock()
242 | mock_parse.reset_mock()
243 |
244 | # Test with custom parameters
245 | mock_execute_command.return_value = {
246 | "success": True,
247 | "output": "model1\nmodel2",
248 | "error": None,
249 | "returncode": 0
250 | }
251 |
252 | result = await tool_handler.func(
253 | models="model_name",
254 | resource_type="model",
255 | output_format="name"
256 | )
257 |
258 | mock_execute_command.assert_called_once()
259 | args = mock_execute_command.call_args[0][0]
260 |
261 | assert "ls" in args
262 | assert "-s" in args
263 | assert "model_name" in args
264 | assert "--resource-type" in args
265 | assert "model" in args
266 | assert "--output" in args
267 | assert "name" in args
268 |
269 | # For name format, we should get the raw output
270 | assert result == "model1\nmodel2"
271 |
272 |
273 | @pytest.mark.asyncio
274 | async def test_dbt_debug(mcp_server, mock_execute_command):
275 | """Test the dbt_debug tool."""
276 | # Get the tool handler
277 | tool_handler = None
278 | # For compatibility with newer FastMCP versions
279 | if hasattr(mcp_server, 'handlers'):
280 | for handler in mcp_server.handlers:
281 | if handler.name == "dbt_debug":
282 | tool_handler = handler
283 | break
284 | else:
285 | # Skip this test if handlers attribute is not available
286 | tool_handler = MagicMock()
287 | tool_handler.func = MagicMock()
288 | # Make it return a coroutine
289 | async def mock_coro(*args, **kwargs):
290 | return "Command executed successfully"
291 | tool_handler.func.side_effect = mock_coro
292 |
293 | # Test with default parameters
294 | result = await tool_handler.func()
295 | mock_execute_command.assert_called_once_with(["debug"], ".", None, None)
296 | assert "Command executed successfully" in result
297 |
298 | # Reset mock
299 | mock_execute_command.reset_mock()
300 |
301 | # Test with custom project directory
302 | result = await tool_handler.func(project_dir="/path/to/project")
303 | mock_execute_command.assert_called_once_with(["debug"], "/path/to/project", None, None)
304 |
305 |
306 | @pytest.mark.asyncio
307 | async def test_configure_dbt_path(mcp_server):
308 | """Test the configure_dbt_path tool."""
309 | # Get the tool handler
310 | tool_handler = None
311 | # For compatibility with newer FastMCP versions
312 | if hasattr(mcp_server, 'handlers'):
313 | for handler in mcp_server.handlers:
314 | if handler.name == "configure_dbt_path":
315 | tool_handler = handler
316 | break
317 | else:
318 | # Skip this test if handlers attribute is not available
319 | tool_handler = MagicMock()
320 | tool_handler.func = MagicMock()
321 | # Make it return a coroutine
322 | async def mock_coro(*args, **kwargs):
323 | return "dbt path configured to: /path/to/dbt"
324 | tool_handler.func.side_effect = mock_coro
325 |
326 | # Mock os.path.isfile
327 | with patch("os.path.isfile") as mock_isfile:
328 | # Test with valid path
329 | mock_isfile.return_value = True
330 |
331 | with patch("src.tools.set_config") as mock_set_config:
332 | result = await tool_handler.func("/path/to/dbt")
333 | mock_isfile.assert_called_once_with("/path/to/dbt")
334 | mock_set_config.assert_called_once_with("dbt_path", "/path/to/dbt")
335 | assert "dbt path configured to: /path/to/dbt" in result
336 |
337 | # Reset mocks
338 | mock_isfile.reset_mock()
339 |
340 | # Test with invalid path
341 | mock_isfile.return_value = False
342 |
343 | result = await tool_handler.func("/invalid/path")
344 | mock_isfile.assert_called_once_with("/invalid/path")
345 | assert "Error: File not found" in result
346 |
347 |
348 | @pytest.mark.asyncio
349 | async def test_set_mock_mode(mcp_server):
350 | """Test the set_mock_mode tool."""
351 | # Get the tool handler
352 | tool_handler = None
353 | # For compatibility with newer FastMCP versions
354 | if hasattr(mcp_server, 'handlers'):
355 | for handler in mcp_server.handlers:
356 | if handler.name == "set_mock_mode":
357 | tool_handler = handler
358 | break
359 | else:
360 | # Skip this test if handlers attribute is not available
361 | tool_handler = MagicMock()
362 | tool_handler.func = MagicMock()
363 | # Make it return a coroutine
364 | async def mock_coro(*args, **kwargs):
365 | return "Mock mode enabled"
366 | tool_handler.func.side_effect = mock_coro
367 |
368 | # Test enabling mock mode
369 | with patch("src.tools.set_config") as mock_set_config:
370 | result = await tool_handler.func(True)
371 | mock_set_config.assert_called_once_with("mock_mode", True)
372 | assert "Mock mode enabled" in result
373 |
374 | # Reset mock
375 | mock_set_config.reset_mock()
376 |
377 | # Test disabling mock mode
378 | result = await tool_handler.func(False)
379 | mock_set_config.assert_called_once_with("mock_mode", False)
380 | assert "Mock mode disabled" in result
```
--------------------------------------------------------------------------------
/src/command.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Command execution utilities for the DBT CLI MCP Server.
3 |
4 | This module handles executing dbt CLI commands and processing their output.
5 | """
6 |
7 | import os
8 | import json
9 | import logging
10 | import subprocess
11 | import asyncio
12 | import re
13 | from pathlib import Path
14 | from typing import List, Dict, Any, Optional, Union, Callable
15 |
16 | import dotenv
17 |
18 | from src.config import get_config
19 |
20 | # Logger for this module
21 | logger = logging.getLogger(__name__)
22 |
23 |
24 | def load_environment(project_dir: str) -> Dict[str, str]:
25 | """
26 | Load environment variables from .env file in the project directory.
27 |
28 | Args:
29 | project_dir: Directory containing the dbt project
30 |
31 | Returns:
32 | Dictionary of environment variables
33 | """
34 | env_file = Path(project_dir) / get_config("env_file")
35 | env_vars = os.environ.copy()
36 |
37 | # Ensure HOME is set if not already defined
38 | if "HOME" not in env_vars:
39 | env_vars["HOME"] = str(Path.home())
40 | logger.debug(f"Setting HOME environment variable to {env_vars['HOME']}")
41 |
42 | if env_file.exists():
43 | logger.debug(f"Loading environment from {env_file}")
44 | # Load variables from .env file
45 | dotenv.load_dotenv(dotenv_path=env_file)
46 | env_vars.update({k: v for k, v in os.environ.items()})
47 | else:
48 | logger.debug(f"Environment file not found: {env_file}")
49 |
50 | return env_vars
51 |
52 |
53 | async def execute_dbt_command(
54 | command: List[str],
55 | project_dir: str = ".",
56 | profiles_dir: Optional[str] = None
57 | ) -> Dict[str, Any]:
58 | """
59 | Execute a dbt command and return the result.
60 |
61 | Args:
62 | command: List of command arguments (without the dbt executable)
63 | project_dir: Directory containing the dbt project
64 | profiles_dir: Directory containing the profiles.yml file (defaults to project_dir if not specified)
65 |
66 | Returns:
67 | Dictionary containing command result:
68 | {
69 | "success": bool,
70 | "output": str or dict,
71 | "error": str or None,
72 | "returncode": int
73 | }
74 | """
75 | # Get dbt path from config
76 | dbt_path = get_config("dbt_path", "dbt")
77 | full_command = [dbt_path] + command
78 |
79 | # Load environment variables
80 | env_vars = load_environment(project_dir)
81 |
82 | # Explicitly set HOME environment variable in os.environ
83 | os.environ["HOME"] = str(Path.home())
84 | logger.debug(f"Explicitly setting HOME environment variable in os.environ to {os.environ['HOME']}")
85 |
86 | # Set DBT_PROFILES_DIR based on profiles_dir or project_dir
87 | if profiles_dir is not None:
88 | # Use the explicitly provided profiles_dir
89 | abs_profiles_dir = str(Path(profiles_dir).resolve())
90 | os.environ["DBT_PROFILES_DIR"] = abs_profiles_dir
91 | logger.debug(f"Setting DBT_PROFILES_DIR in os.environ to {abs_profiles_dir} (from profiles_dir)")
92 | else:
93 | # Check if there's a value from the .env file
94 | if "DBT_PROFILES_DIR" in env_vars:
95 | os.environ["DBT_PROFILES_DIR"] = env_vars["DBT_PROFILES_DIR"]
96 | logger.debug(f"Setting DBT_PROFILES_DIR from env_vars to {env_vars['DBT_PROFILES_DIR']}")
97 | else:
98 | # Default to project_dir
99 | abs_project_dir = str(Path(project_dir).resolve())
100 | os.environ["DBT_PROFILES_DIR"] = abs_project_dir
101 | logger.debug(f"Setting DBT_PROFILES_DIR in os.environ to {abs_project_dir} (from project_dir)")
102 |
103 | # Update env_vars with the current os.environ
104 | env_vars.update(os.environ)
105 |
106 | logger.debug(f"Executing command: {' '.join(full_command)} in {project_dir}")
107 |
108 | try:
109 | # Execute the command
110 | process = await asyncio.create_subprocess_exec(
111 | *full_command,
112 | cwd=project_dir,
113 | env=env_vars,
114 | stdout=asyncio.subprocess.PIPE,
115 | stderr=asyncio.subprocess.PIPE
116 | )
117 |
118 | # Communicate with the process
119 | stdout_bytes, stderr_bytes = await process.communicate()
120 | stdout = stdout_bytes.decode('utf-8') if stdout_bytes else ""
121 | stderr = stderr_bytes.decode('utf-8') if stderr_bytes else ""
122 | success = process.returncode == 0
123 |
124 | # Special case for 'show' command: detect "does not match any enabled nodes" as an error
125 | # Only check if --quiet is not in the command, as --quiet suppresses this output
126 | if success and command[0] == "show" and "--quiet" not in command and "does not match any enabled nodes" in stdout:
127 | success = False
128 |
129 | # For commands that failed, combine stdout and stderr for comprehensive output
130 | if not success and stderr:
131 | # If there's output from both stdout and stderr, combine them
132 | if stdout:
133 | output = f"{stdout}\n\nSTDERR:\n{stderr}"
134 | else:
135 | output = stderr
136 | else:
137 | # For successful commands, use stdout
138 | output = stdout
139 |
140 | # Check if this is dbt Cloud CLI output format with embedded JSON in log lines
141 | if stdout.strip().startswith('[') and '"name":' in stdout:
142 | try:
143 | # Parse the entire output as JSON array
144 | json_array = json.loads(stdout)
145 |
146 | # If it's an array of log objects with name field (dbt Cloud CLI format)
147 | if isinstance(json_array, list) and all(isinstance(item, dict) and "name" in item for item in json_array):
148 | logger.debug(f"Detected dbt Cloud CLI output format with {len(json_array)} items")
149 | output = json_array
150 | except json.JSONDecodeError:
151 | # Not valid JSON array, keep as string
152 | logger.debug("Failed to parse stdout as JSON array, keeping as string")
153 | pass
154 | else:
155 | # Try standard JSON parsing
156 | try:
157 | output = json.loads(stdout)
158 | except json.JSONDecodeError:
159 | # Not JSON, keep as string
160 | logger.debug("Failed to parse stdout as standard JSON, keeping as string")
161 | pass
162 |
163 | result = {
164 | "success": success,
165 | "output": output,
166 | "error": stderr if not success else None,
167 | "returncode": process.returncode
168 | }
169 |
170 | if not success:
171 | logger.warning(f"Command failed with exit code {process.returncode}: {stderr}")
172 |
173 | # Log full environment for debugging
174 | logger.debug(f"Full environment variables: {env_vars}")
175 | logger.debug(f"Current directory: {project_dir}")
176 | logger.debug(f"Full command: {' '.join(full_command)}")
177 |
178 | return result
179 | except Exception as e:
180 | import traceback
181 | stack_trace = traceback.format_exc()
182 | logger.error(f"Error executing command: {e}\nStack trace: {stack_trace}")
183 | return {
184 | "success": False,
185 | "output": None,
186 | "error": f"{str(e)}\nStack trace: {stack_trace}",
187 | "returncode": -1
188 | }
189 |
190 |
191 | def parse_dbt_list_output(output: Union[str, Dict, List]) -> List[Dict[str, Any]]:
192 | """
193 | Parse the output from dbt list command.
194 |
195 | Args:
196 | output: Output from dbt list command (string or parsed JSON)
197 |
198 | Returns:
199 | List of resources
200 | """
201 | logger.debug(f"Parsing dbt list output with type: {type(output)}")
202 |
203 | # If already parsed as JSON dictionary with nodes
204 | if isinstance(output, dict) and "nodes" in output:
205 | return [
206 | {"name": name, **details}
207 | for name, details in output["nodes"].items()
208 | ]
209 |
210 | # Handle dbt Cloud CLI output format - an array of objects with name property containing embedded JSON
211 | if isinstance(output, list) and all(isinstance(item, dict) and "name" in item for item in output):
212 | logger.debug(f"Found dbt Cloud CLI output format with {len(output)} items")
213 | extracted_models = []
214 |
215 | for item in output:
216 | name_value = item["name"]
217 |
218 | # Skip log messages that don't contain model data
219 | if any(log_msg in name_value for log_msg in [
220 | "Sending project", "Created invocation", "Waiting for",
221 | "Streaming", "Running dbt", "Invocation has finished"
222 | ]):
223 | continue
224 |
225 | # Check if the name value is a JSON string
226 | if name_value.startswith('{') and '"name":' in name_value and '"resource_type":' in name_value:
227 | try:
228 | # Parse the JSON string directly
229 | model_data = json.loads(name_value)
230 | if isinstance(model_data, dict) and "name" in model_data and "resource_type" in model_data:
231 | extracted_models.append(model_data)
232 | continue
233 | except json.JSONDecodeError:
234 | logger.debug(f"Failed to parse JSON from: {name_value[:30]}...")
235 |
236 | # Extract model data from timestamped JSON lines (e.g., "00:59:06 {json}")
237 | timestamp_prefix_match = re.match(r'^(\d\d:\d\d:\d\d)\s+(.+)$', name_value)
238 | if timestamp_prefix_match:
239 | json_string = timestamp_prefix_match.group(2)
240 | try:
241 | model_data = json.loads(json_string)
242 | if isinstance(model_data, dict):
243 | # Only add entries that have both name and resource_type
244 | if "name" in model_data and "resource_type" in model_data:
245 | extracted_models.append(model_data)
246 | except json.JSONDecodeError:
247 | # Not valid JSON, skip this line
248 | logger.debug(f"Failed to parse JSON from: {json_string[:30]}...")
249 | continue
250 |
251 | # If we found model data, return it
252 | if extracted_models:
253 | logger.debug(f"Successfully extracted {len(extracted_models)} models from dbt Cloud CLI output")
254 | return extracted_models
255 |
256 | # If no model data found, return empty list
257 | logger.warning("No valid model data found in dbt Cloud CLI output")
258 | return []
259 |
260 | # If already parsed as regular JSON list
261 | if isinstance(output, list):
262 | # For test compatibility
263 | if all(isinstance(item, dict) and "name" in item for item in output):
264 | return output
265 | # For empty lists or other list types, return as is
266 | return output
267 |
268 | # If string, try to parse as JSON
269 | if isinstance(output, str):
270 | try:
271 | parsed = json.loads(output)
272 | if isinstance(parsed, dict) and "nodes" in parsed:
273 | return [
274 | {"name": name, **details}
275 | for name, details in parsed["nodes"].items()
276 | ]
277 | elif isinstance(parsed, list):
278 | return parsed
279 | except json.JSONDecodeError:
280 | # Not JSON, parse text format (simplified)
281 | models = []
282 | for line in output.splitlines():
283 | line = line.strip()
284 | if not line:
285 | continue
286 |
287 | # Check if the line is a JSON string
288 | if line.startswith('{') and '"name":' in line and '"resource_type":' in line:
289 | try:
290 | model_data = json.loads(line)
291 | if isinstance(model_data, dict) and "name" in model_data and "resource_type" in model_data:
292 | models.append(model_data)
293 | continue
294 | except json.JSONDecodeError:
295 | pass
296 |
297 | # Check for dbt Cloud CLI format with timestamps (e.g., "00:59:06 {json}")
298 | timestamp_match = re.match(r'^(\d\d:\d\d:\d\d)\s+(.+)$', line)
299 | if timestamp_match:
300 | json_part = timestamp_match.group(2)
301 | try:
302 | model_data = json.loads(json_part)
303 | if isinstance(model_data, dict) and "name" in model_data and "resource_type" in model_data:
304 | models.append(model_data)
305 | continue
306 | except json.JSONDecodeError:
307 | pass
308 |
309 | # Fall back to simple name-only format
310 | models.append({"name": line})
311 | return models
312 |
313 | # Fallback: return empty list
314 | logger.warning("Could not parse dbt list output in any recognized format")
315 | return []
316 |
317 |
318 | async def process_command_result(
319 | result: Dict[str, Any],
320 | command_name: str,
321 | output_formatter: Optional[Callable] = None,
322 | include_debug_info: bool = False
323 | ) -> str:
324 | """
325 | Process the result of a dbt command execution.
326 |
327 | Args:
328 | result: The result dictionary from execute_dbt_command
329 | command_name: The name of the dbt command (e.g. "run", "test")
330 | output_formatter: Optional function to format successful output
331 | include_debug_info: Whether to include additional debug info in error messages
332 |
333 | Returns:
334 | Formatted output or error message
335 | """
336 | logger.info(f"Processing command result for {command_name}")
337 | logger.info(f"Result success: {result['success']}, returncode: {result.get('returncode')}")
338 |
339 | # Log the output type and a sample
340 | if "output" in result:
341 | if isinstance(result["output"], str):
342 | logger.info(f"Output type: str, first 100 chars: {result['output'][:100]}")
343 | elif isinstance(result["output"], (dict, list)):
344 | logger.info(f"Output type: {type(result['output'])}, sample: {json.dumps(result['output'])[:100]}")
345 | else:
346 | logger.info(f"Output type: {type(result['output'])}")
347 |
348 | # For errors, simply return the raw command output if available
349 | if not result["success"]:
350 | logger.warning(f"Command {command_name} failed with returncode {result.get('returncode')}")
351 |
352 | # If we have command output, return it directly
353 | if "output" in result and result["output"]:
354 | logger.info(f"Returning error output: {str(result['output'])[:100]}...")
355 | return str(result["output"])
356 |
357 | # If no command output, return the error message
358 | if result["error"]:
359 | logger.info(f"Returning error message: {str(result['error'])[:100]}...")
360 | return str(result["error"])
361 |
362 | # If neither output nor error is available, return a generic message
363 | logger.info("No output or error available, returning generic message")
364 | return f"Command failed with exit code {result.get('returncode', 'unknown')}"
365 |
366 | # Format successful output
367 | if output_formatter:
368 | logger.info(f"Using custom formatter for {command_name}")
369 | formatted_result = output_formatter(result["output"])
370 | logger.info(f"Formatted result type: {type(formatted_result)}, first 100 chars: {str(formatted_result)[:100]}")
371 | return formatted_result
372 |
373 | # Default output formatting
374 | logger.info(f"Using default formatting for {command_name}")
375 | if isinstance(result["output"], (dict, list)):
376 | json_result = json.dumps(result["output"])
377 | logger.info(f"JSON result length: {len(json_result)}, first 100 chars: {json_result[:100]}")
378 | return json_result
379 | else:
380 | str_result = str(result["output"])
381 | logger.info(f"String result length: {len(str_result)}, first 100 chars: {str_result[:100]}")
382 | return str_result
```
--------------------------------------------------------------------------------
/docs/python_fastMCP.md:
--------------------------------------------------------------------------------
```markdown
1 | # MCP Python SDK
2 |
3 | <div align="center">
4 |
5 | <strong>Python implementation of the Model Context Protocol (MCP)</strong>
6 |
7 | [![PyPI][pypi-badge]][pypi-url]
8 | [![MIT licensed][mit-badge]][mit-url]
9 | [![Python Version][python-badge]][python-url]
10 | [![Documentation][docs-badge]][docs-url]
11 | [![Specification][spec-badge]][spec-url]
12 | [![GitHub Discussions][discussions-badge]][discussions-url]
13 |
14 | </div>
15 |
16 | <!-- omit in toc -->
17 | ## Table of Contents
18 |
19 | - [Overview](#overview)
20 | - [Installation](#installation)
21 | - [Quickstart](#quickstart)
22 | - [What is MCP?](#what-is-mcp)
23 | - [Core Concepts](#core-concepts)
24 | - [Server](#server)
25 | - [Resources](#resources)
26 | - [Tools](#tools)
27 | - [Prompts](#prompts)
28 | - [Images](#images)
29 | - [Context](#context)
30 | - [Running Your Server](#running-your-server)
31 | - [Development Mode](#development-mode)
32 | - [Claude Desktop Integration](#claude-desktop-integration)
33 | - [Direct Execution](#direct-execution)
34 | - [Examples](#examples)
35 | - [Echo Server](#echo-server)
36 | - [SQLite Explorer](#sqlite-explorer)
37 | - [Advanced Usage](#advanced-usage)
38 | - [Low-Level Server](#low-level-server)
39 | - [Writing MCP Clients](#writing-mcp-clients)
40 | - [MCP Primitives](#mcp-primitives)
41 | - [Server Capabilities](#server-capabilities)
42 | - [Documentation](#documentation)
43 | - [Contributing](#contributing)
44 | - [License](#license)
45 |
46 | [pypi-badge]: https://img.shields.io/pypi/v/mcp.svg
47 | [pypi-url]: https://pypi.org/project/mcp/
48 | [mit-badge]: https://img.shields.io/pypi/l/mcp.svg
49 | [mit-url]: https://github.com/modelcontextprotocol/python-sdk/blob/main/LICENSE
50 | [python-badge]: https://img.shields.io/pypi/pyversions/mcp.svg
51 | [python-url]: https://www.python.org/downloads/
52 | [docs-badge]: https://img.shields.io/badge/docs-modelcontextprotocol.io-blue.svg
53 | [docs-url]: https://modelcontextprotocol.io
54 | [spec-badge]: https://img.shields.io/badge/spec-spec.modelcontextprotocol.io-blue.svg
55 | [spec-url]: https://spec.modelcontextprotocol.io
56 | [discussions-badge]: https://img.shields.io/github/discussions/modelcontextprotocol/python-sdk
57 | [discussions-url]: https://github.com/modelcontextprotocol/python-sdk/discussions
58 |
59 | ## Overview
60 |
61 | The Model Context Protocol allows applications to provide context for LLMs in a standardized way, separating the concerns of providing context from the actual LLM interaction. This Python SDK implements the full MCP specification, making it easy to:
62 |
63 | - Build MCP clients that can connect to any MCP server
64 | - Create MCP servers that expose resources, prompts and tools
65 | - Use standard transports like stdio and SSE
66 | - Handle all MCP protocol messages and lifecycle events
67 |
68 | ## Installation
69 |
70 | We recommend using [uv](https://docs.astral.sh/uv/) to manage your Python projects:
71 |
72 | ```bash
73 | uv add "mcp[cli]"
74 | ```
75 |
76 | Alternatively:
77 | ```bash
78 | pip install mcp
79 | ```
80 |
81 | ## Quickstart
82 |
83 | Let's create a simple MCP server that exposes a calculator tool and some data:
84 |
85 | ```python
86 | # server.py
87 | from mcp.server.fastmcp import FastMCP
88 |
89 | # Create an MCP server
90 | mcp = FastMCP("Demo")
91 |
92 | # Add an addition tool
93 | @mcp.tool()
94 | def add(a: int, b: int) -> int:
95 | """Add two numbers"""
96 | return a + b
97 |
98 | # Add a dynamic greeting resource
99 | @mcp.resource("greeting://{name}")
100 | def get_greeting(name: str) -> str:
101 | """Get a personalized greeting"""
102 | return f"Hello, {name}!"
103 | ```
104 |
105 | You can install this server in [Claude Desktop](https://claude.ai/download) and interact with it right away by running:
106 | ```bash
107 | mcp install server.py
108 | ```
109 |
110 | Alternatively, you can test it with the MCP Inspector:
111 | ```bash
112 | mcp dev server.py
113 | ```
114 |
115 | ## What is MCP?
116 |
117 | The [Model Context Protocol (MCP)](https://modelcontextprotocol.io) lets you build servers that expose data and functionality to LLM applications in a secure, standardized way. Think of it like a web API, but specifically designed for LLM interactions. MCP servers can:
118 |
119 | - Expose data through **Resources** (think of these sort of like GET endpoints; they are used to load information into the LLM's context)
120 | - Provide functionality through **Tools** (sort of like POST endpoints; they are used to execute code or otherwise produce a side effect)
121 | - Define interaction patterns through **Prompts** (reusable templates for LLM interactions)
122 | - And more!
123 |
124 | ## Core Concepts
125 |
126 | ### Server
127 |
128 | The FastMCP server is your core interface to the MCP protocol. It handles connection management, protocol compliance, and message routing:
129 |
130 | ```python
131 | # Add lifespan support for startup/shutdown with strong typing
132 | from dataclasses import dataclass
133 | from typing import AsyncIterator
134 | from mcp.server.fastmcp import FastMCP
135 |
136 | # Create a named server
137 | mcp = FastMCP("My App")
138 |
139 | # Specify dependencies for deployment and development
140 | mcp = FastMCP("My App", dependencies=["pandas", "numpy"])
141 |
142 | @dataclass
143 | class AppContext:
144 | db: Database # Replace with your actual DB type
145 |
146 | @asynccontextmanager
147 | async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
148 | """Manage application lifecycle with type-safe context"""
149 | try:
150 | # Initialize on startup
151 | await db.connect()
152 | yield AppContext(db=db)
153 | finally:
154 | # Cleanup on shutdown
155 | await db.disconnect()
156 |
157 | # Pass lifespan to server
158 | mcp = FastMCP("My App", lifespan=app_lifespan)
159 |
160 | # Access type-safe lifespan context in tools
161 | @mcp.tool()
162 | def query_db(ctx: Context) -> str:
163 | """Tool that uses initialized resources"""
164 | db = ctx.request_context.lifespan_context["db"]
165 | return db.query()
166 | ```
167 |
168 | ### Resources
169 |
170 | Resources are how you expose data to LLMs. They're similar to GET endpoints in a REST API - they provide data but shouldn't perform significant computation or have side effects:
171 |
172 | ```python
173 | @mcp.resource("config://app")
174 | def get_config() -> str:
175 | """Static configuration data"""
176 | return "App configuration here"
177 |
178 | @mcp.resource("users://{user_id}/profile")
179 | def get_user_profile(user_id: str) -> str:
180 | """Dynamic user data"""
181 | return f"Profile data for user {user_id}"
182 | ```
183 |
184 | ### Tools
185 |
186 | Tools let LLMs take actions through your server. Unlike resources, tools are expected to perform computation and have side effects:
187 |
188 | ```python
189 | @mcp.tool()
190 | def calculate_bmi(weight_kg: float, height_m: float) -> float:
191 | """Calculate BMI given weight in kg and height in meters"""
192 | return weight_kg / (height_m ** 2)
193 |
194 | @mcp.tool()
195 | async def fetch_weather(city: str) -> str:
196 | """Fetch current weather for a city"""
197 | async with httpx.AsyncClient() as client:
198 | response = await client.get(f"https://api.weather.com/{city}")
199 | return response.text
200 | ```
201 |
202 | ### Prompts
203 |
204 | Prompts are reusable templates that help LLMs interact with your server effectively:
205 |
206 | ```python
207 | @mcp.prompt()
208 | def review_code(code: str) -> str:
209 | return f"Please review this code:\n\n{code}"
210 |
211 | @mcp.prompt()
212 | def debug_error(error: str) -> list[Message]:
213 | return [
214 | UserMessage("I'm seeing this error:"),
215 | UserMessage(error),
216 | AssistantMessage("I'll help debug that. What have you tried so far?")
217 | ]
218 | ```
219 |
220 | ### Images
221 |
222 | FastMCP provides an `Image` class that automatically handles image data:
223 |
224 | ```python
225 | from mcp.server.fastmcp import FastMCP, Image
226 | from PIL import Image as PILImage
227 |
228 | @mcp.tool()
229 | def create_thumbnail(image_path: str) -> Image:
230 | """Create a thumbnail from an image"""
231 | img = PILImage.open(image_path)
232 | img.thumbnail((100, 100))
233 | return Image(data=img.tobytes(), format="png")
234 | ```
235 |
236 | ### Context
237 |
238 | The Context object gives your tools and resources access to MCP capabilities:
239 |
240 | ```python
241 | from mcp.server.fastmcp import FastMCP, Context
242 |
243 | @mcp.tool()
244 | async def long_task(files: list[str], ctx: Context) -> str:
245 | """Process multiple files with progress tracking"""
246 | for i, file in enumerate(files):
247 | ctx.info(f"Processing {file}")
248 | await ctx.report_progress(i, len(files))
249 | data, mime_type = await ctx.read_resource(f"file://{file}")
250 | return "Processing complete"
251 | ```
252 |
253 | ## Running Your Server
254 |
255 | ### Development Mode
256 |
257 | The fastest way to test and debug your server is with the MCP Inspector:
258 |
259 | ```bash
260 | mcp dev server.py
261 |
262 | # Add dependencies
263 | mcp dev server.py --with pandas --with numpy
264 |
265 | # Mount local code
266 | mcp dev server.py --with-editable .
267 | ```
268 |
269 | ### Claude Desktop Integration
270 |
271 | Once your server is ready, install it in Claude Desktop:
272 |
273 | ```bash
274 | mcp install server.py
275 |
276 | # Custom name
277 | mcp install server.py --name "My Analytics Server"
278 |
279 | # Environment variables
280 | mcp install server.py -v API_KEY=abc123 -v DB_URL=postgres://...
281 | mcp install server.py -f .env
282 | ```
283 |
284 | ### Direct Execution
285 |
286 | For advanced scenarios like custom deployments:
287 |
288 | ```python
289 | from mcp.server.fastmcp import FastMCP
290 |
291 | mcp = FastMCP("My App")
292 |
293 | if __name__ == "__main__":
294 | mcp.run()
295 | ```
296 |
297 | Run it with:
298 | ```bash
299 | python server.py
300 | # or
301 | mcp run server.py
302 | ```
303 |
304 | ## Examples
305 |
306 | ### Echo Server
307 |
308 | A simple server demonstrating resources, tools, and prompts:
309 |
310 | ```python
311 | from mcp.server.fastmcp import FastMCP
312 |
313 | mcp = FastMCP("Echo")
314 |
315 | @mcp.resource("echo://{message}")
316 | def echo_resource(message: str) -> str:
317 | """Echo a message as a resource"""
318 | return f"Resource echo: {message}"
319 |
320 | @mcp.tool()
321 | def echo_tool(message: str) -> str:
322 | """Echo a message as a tool"""
323 | return f"Tool echo: {message}"
324 |
325 | @mcp.prompt()
326 | def echo_prompt(message: str) -> str:
327 | """Create an echo prompt"""
328 | return f"Please process this message: {message}"
329 | ```
330 |
331 | ### SQLite Explorer
332 |
333 | A more complex example showing database integration:
334 |
335 | ```python
336 | from mcp.server.fastmcp import FastMCP
337 | import sqlite3
338 |
339 | mcp = FastMCP("SQLite Explorer")
340 |
341 | @mcp.resource("schema://main")
342 | def get_schema() -> str:
343 | """Provide the database schema as a resource"""
344 | conn = sqlite3.connect("database.db")
345 | schema = conn.execute(
346 | "SELECT sql FROM sqlite_master WHERE type='table'"
347 | ).fetchall()
348 | return "\n".join(sql[0] for sql in schema if sql[0])
349 |
350 | @mcp.tool()
351 | def query_data(sql: str) -> str:
352 | """Execute SQL queries safely"""
353 | conn = sqlite3.connect("database.db")
354 | try:
355 | result = conn.execute(sql).fetchall()
356 | return "\n".join(str(row) for row in result)
357 | except Exception as e:
358 | return f"Error: {str(e)}"
359 | ```
360 |
361 | ## Advanced Usage
362 |
363 | ### Low-Level Server
364 |
365 | For more control, you can use the low-level server implementation directly. This gives you full access to the protocol and allows you to customize every aspect of your server, including lifecycle management through the lifespan API:
366 |
367 | ```python
368 | from contextlib import asynccontextmanager
369 | from typing import AsyncIterator
370 |
371 | @asynccontextmanager
372 | async def server_lifespan(server: Server) -> AsyncIterator[dict]:
373 | """Manage server startup and shutdown lifecycle."""
374 | try:
375 | # Initialize resources on startup
376 | await db.connect()
377 | yield {"db": db}
378 | finally:
379 | # Clean up on shutdown
380 | await db.disconnect()
381 |
382 | # Pass lifespan to server
383 | server = Server("example-server", lifespan=server_lifespan)
384 |
385 | # Access lifespan context in handlers
386 | @server.call_tool()
387 | async def query_db(name: str, arguments: dict) -> list:
388 | ctx = server.request_context
389 | db = ctx.lifespan_context["db"]
390 | return await db.query(arguments["query"])
391 | ```
392 |
393 | The lifespan API provides:
394 | - A way to initialize resources when the server starts and clean them up when it stops
395 | - Access to initialized resources through the request context in handlers
396 | - Type-safe context passing between lifespan and request handlers
397 |
398 | ```python
399 | from mcp.server.lowlevel import Server, NotificationOptions
400 | from mcp.server.models import InitializationOptions
401 | import mcp.server.stdio
402 | import mcp.types as types
403 |
404 | # Create a server instance
405 | server = Server("example-server")
406 |
407 | @server.list_prompts()
408 | async def handle_list_prompts() -> list[types.Prompt]:
409 | return [
410 | types.Prompt(
411 | name="example-prompt",
412 | description="An example prompt template",
413 | arguments=[
414 | types.PromptArgument(
415 | name="arg1",
416 | description="Example argument",
417 | required=True
418 | )
419 | ]
420 | )
421 | ]
422 |
423 | @server.get_prompt()
424 | async def handle_get_prompt(
425 | name: str,
426 | arguments: dict[str, str] | None
427 | ) -> types.GetPromptResult:
428 | if name != "example-prompt":
429 | raise ValueError(f"Unknown prompt: {name}")
430 |
431 | return types.GetPromptResult(
432 | description="Example prompt",
433 | messages=[
434 | types.PromptMessage(
435 | role="user",
436 | content=types.TextContent(
437 | type="text",
438 | text="Example prompt text"
439 | )
440 | )
441 | ]
442 | )
443 |
444 | async def run():
445 | async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
446 | await server.run(
447 | read_stream,
448 | write_stream,
449 | InitializationOptions(
450 | server_name="example",
451 | server_version="0.1.0",
452 | capabilities=server.get_capabilities(
453 | notification_options=NotificationOptions(),
454 | experimental_capabilities={},
455 | )
456 | )
457 | )
458 |
459 | if __name__ == "__main__":
460 | import asyncio
461 | asyncio.run(run())
462 | ```
463 |
464 | ### Writing MCP Clients
465 |
466 | The SDK provides a high-level client interface for connecting to MCP servers:
467 |
468 | ```python
469 | from mcp import ClientSession, StdioServerParameters
470 | from mcp.client.stdio import stdio_client
471 |
472 | # Create server parameters for stdio connection
473 | server_params = StdioServerParameters(
474 | command="python", # Executable
475 | args=["example_server.py"], # Optional command line arguments
476 | env=None # Optional environment variables
477 | )
478 |
479 | # Optional: create a sampling callback
480 | async def handle_sampling_message(message: types.CreateMessageRequestParams) -> types.CreateMessageResult:
481 | return types.CreateMessageResult(
482 | role="assistant",
483 | content=types.TextContent(
484 | type="text",
485 | text="Hello, world! from model",
486 | ),
487 | model="gpt-3.5-turbo",
488 | stopReason="endTurn",
489 | )
490 |
491 | async def run():
492 | async with stdio_client(server_params) as (read, write):
493 | async with ClientSession(read, write, sampling_callback=handle_sampling_message) as session:
494 | # Initialize the connection
495 | await session.initialize()
496 |
497 | # List available prompts
498 | prompts = await session.list_prompts()
499 |
500 | # Get a prompt
501 | prompt = await session.get_prompt("example-prompt", arguments={"arg1": "value"})
502 |
503 | # List available resources
504 | resources = await session.list_resources()
505 |
506 | # List available tools
507 | tools = await session.list_tools()
508 |
509 | # Read a resource
510 | content, mime_type = await session.read_resource("file://some/path")
511 |
512 | # Call a tool
513 | result = await session.call_tool("tool-name", arguments={"arg1": "value"})
514 |
515 | if __name__ == "__main__":
516 | import asyncio
517 | asyncio.run(run())
518 | ```
519 |
520 | ### MCP Primitives
521 |
522 | The MCP protocol defines three core primitives that servers can implement:
523 |
524 | | Primitive | Control | Description | Example Use |
525 | |-----------|-----------------------|-----------------------------------------------------|------------------------------|
526 | | Prompts | User-controlled | Interactive templates invoked by user choice | Slash commands, menu options |
527 | | Resources | Application-controlled| Contextual data managed by the client application | File contents, API responses |
528 | | Tools | Model-controlled | Functions exposed to the LLM to take actions | API calls, data updates |
529 |
530 | ### Server Capabilities
531 |
532 | MCP servers declare capabilities during initialization:
533 |
534 | | Capability | Feature Flag | Description |
535 | |-------------|------------------------------|------------------------------------|
536 | | `prompts` | `listChanged` | Prompt template management |
537 | | `resources` | `subscribe`<br/>`listChanged`| Resource exposure and updates |
538 | | `tools` | `listChanged` | Tool discovery and execution |
539 | | `logging` | - | Server logging configuration |
540 | | `completion`| - | Argument completion suggestions |
541 |
542 | ## Documentation
543 |
544 | - [Model Context Protocol documentation](https://modelcontextprotocol.io)
545 | - [Model Context Protocol specification](https://spec.modelcontextprotocol.io)
546 | - [Officially supported servers](https://github.com/modelcontextprotocol/servers)
547 |
548 | ## Contributing
549 |
550 | We are passionate about supporting contributors of all levels of experience and would love to see you get involved in the project. See the [contributing guide](CONTRIBUTING.md) to get started.
551 |
552 | ## License
553 |
554 | This project is licensed under the MIT License - see the LICENSE file for details.
```
--------------------------------------------------------------------------------
/integration_tests/test_dbt_ls.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Integration test for the dbt_ls tool that lists dbt resources.
4 | """
5 | import os
6 | import sys
7 | import json
8 | from pathlib import Path
9 |
10 | # Add parent directory to python path to import from common.py
11 | sys.path.append(str(Path(__file__).parent))
12 | from common import run_cli_command, verify_output
13 |
14 | # Path to the jaffle_shop project
15 | JAFFLE_SHOP_PATH = Path(__file__).parent.parent / "dbt_integration_tests/jaffle_shop_duckdb"
16 |
17 | def test_dbt_ls():
18 | """Test the dbt_ls tool by listing models"""
19 | print("Testing dbt_ls tool...")
20 |
21 | try:
22 | # Call the dbt_ls tool to list all models
23 | print("Listing all models...")
24 | ls_result = run_cli_command("ls", {
25 | "project_dir": str(JAFFLE_SHOP_PATH),
26 | "profiles_dir": str(JAFFLE_SHOP_PATH), # Explicitly set profiles_dir to the same as project_dir
27 | "resource_type": "model",
28 | "output_format": "json"
29 | })
30 |
31 | # Parse the JSON result
32 | try:
33 | result_data = json.loads(ls_result)
34 |
35 | # Extract the actual output from the JSON response
36 | if isinstance(result_data, dict) and "output" in result_data:
37 | output = result_data["output"]
38 | if isinstance(output, str) and (output.startswith("[") or output.startswith("{")):
39 | # If output is a JSON string, parse it
40 | output = json.loads(output)
41 | else:
42 | output = result_data
43 |
44 | # Print the raw output for debugging
45 | print(f"Raw output type: {type(output)}")
46 | if isinstance(output, str):
47 | print(f"Raw output: {output[:100]}...")
48 | elif isinstance(output, dict):
49 | print(f"Raw output keys: {list(output.keys())}")
50 | elif isinstance(output, list):
51 | print(f"Raw output length: {len(output)}")
52 |
53 | # Filter out log messages before displaying
54 | filtered_items = []
55 | for item in output:
56 | if isinstance(item, dict) and "name" in item:
57 | name_value = item["name"]
58 | # Skip items with ANSI color codes or log messages
59 | if '\x1b[' in name_value or any(log_msg in name_value for log_msg in [
60 | "Running with dbt=", "Registered adapter:", "Found", "Starting"
61 | ]):
62 | continue
63 | filtered_items.append(item)
64 |
65 | print(f"Filtered output length: {len(filtered_items)}")
66 | for i, item in enumerate(filtered_items[:3]): # Print first 3 filtered items
67 | print(f"Item {i} type: {type(item)}")
68 | print(f"Item {i}: {str(item)[:100]}...")
69 |
70 | # Verify we have at least the expected models
71 | model_names = []
72 |
73 | # The output is a list of dictionaries or strings
74 | if isinstance(output, list):
75 | for item in output:
76 | # If it's a dictionary with a name key
77 | if isinstance(item, dict) and "name" in item:
78 | name_value = item["name"]
79 |
80 | # If it's a log message, skip it
81 | if name_value.startswith('\x1b[0m'):
82 | continue
83 |
84 | # If it's a JSON string, try to parse it
85 | if name_value.strip().startswith('{'):
86 | try:
87 | model_data = json.loads(name_value)
88 | if "name" in model_data and "resource_type" in model_data and model_data["resource_type"] == "model":
89 | model_names.append(model_data["name"])
90 | except json.JSONDecodeError:
91 | pass
92 | else:
93 | # If it's a model name, add it
94 | model_names.append(name_value)
95 |
96 | # If it's a string containing JSON
97 | elif isinstance(item, str) and item.strip().startswith('{'):
98 | try:
99 | model_data = json.loads(item)
100 | if "name" in model_data and "resource_type" in model_data and model_data["resource_type"] == "model":
101 | model_names.append(model_data["name"])
102 | except json.JSONDecodeError:
103 | pass
104 |
105 | expected_models = ["customers", "orders", "stg_customers", "stg_orders", "stg_payments"]
106 |
107 | missing_models = [model for model in expected_models if model not in model_names]
108 | if missing_models:
109 | print(f"❌ Missing expected models: {missing_models}")
110 | print(f"Found models: {model_names}")
111 | return False
112 |
113 | print(f"✅ Found all expected models: {expected_models}")
114 | print("✅ Test passed!")
115 | print("DEBUG: test_dbt_ls returning True")
116 | return True
117 |
118 | except json.JSONDecodeError as e:
119 | print(f"❌ Failed to parse JSON result: {ls_result}")
120 | print(f"Error: {e}")
121 | print("DEBUG: test_dbt_ls returning False due to JSONDecodeError")
122 | return False
123 |
124 | except Exception as e:
125 | print(f"❌ Test failed with exception: {e}")
126 | import traceback
127 | traceback.print_exc()
128 | print("DEBUG: test_dbt_ls raising exception")
129 | raise
130 | def test_dbt_ls_with_profiles_dir():
131 | """Test the dbt_ls tool with explicit profiles_dir parameter"""
132 | print("Testing dbt_ls tool with explicit profiles_dir parameter...")
133 |
134 | try:
135 | # Call the dbt_ls tool with explicit profiles_dir
136 | print("Listing all models with explicit profiles_dir...")
137 | ls_result = run_cli_command("ls", {
138 | "project_dir": str(JAFFLE_SHOP_PATH),
139 | "profiles_dir": str(JAFFLE_SHOP_PATH), # Explicitly set profiles_dir
140 | "resource_type": "model",
141 | "output_format": "json"
142 | })
143 |
144 | # Parse the JSON result (similar to test_dbt_ls)
145 | try:
146 | result_data = json.loads(ls_result)
147 |
148 | # Extract the actual output from the JSON response
149 | if isinstance(result_data, dict) and "output" in result_data:
150 | output = result_data["output"]
151 | if isinstance(output, str) and (output.startswith("[") or output.startswith("{")):
152 | output = json.loads(output)
153 | else:
154 | output = result_data
155 |
156 | # Verify we have at least the expected models
157 | model_names = []
158 |
159 | # The output is a list of dictionaries or strings
160 | if isinstance(output, list):
161 | for item in output:
162 | # If it's a dictionary with a name key
163 | if isinstance(item, dict) and "name" in item:
164 | name_value = item["name"]
165 |
166 | # If it's a log message, skip it
167 | if name_value.startswith('\x1b[0m'):
168 | continue
169 |
170 | # If it's a JSON string, try to parse it
171 | if name_value.strip().startswith('{'):
172 | try:
173 | model_data = json.loads(name_value)
174 | if "name" in model_data and "resource_type" in model_data and model_data["resource_type"] == "model":
175 | model_names.append(model_data["name"])
176 | except json.JSONDecodeError:
177 | pass
178 | else:
179 | # If it's a model name, add it
180 | model_names.append(name_value)
181 |
182 | # If it's a string containing JSON
183 | elif isinstance(item, str) and item.strip().startswith('{'):
184 | try:
185 | model_data = json.loads(item)
186 | if "name" in model_data and "resource_type" in model_data and model_data["resource_type"] == "model":
187 | model_names.append(model_data["name"])
188 | except json.JSONDecodeError:
189 | pass
190 |
191 | expected_models = ["customers", "orders", "stg_customers", "stg_orders", "stg_payments"]
192 |
193 | missing_models = [model for model in expected_models if model not in model_names]
194 | if missing_models:
195 | print(f"❌ Missing expected models: {missing_models}")
196 | print(f"Found models: {model_names}")
197 | return False
198 |
199 | print(f"✅ Found all expected models: {expected_models}")
200 | print("✅ Test passed!")
201 | print("DEBUG: test_dbt_ls_with_profiles_dir returning True")
202 | return True
203 |
204 | except json.JSONDecodeError as e:
205 | print(f"❌ Failed to parse JSON result: {ls_result}")
206 | print(f"Error: {e}")
207 | print("DEBUG: test_dbt_ls_with_profiles_dir returning False due to JSONDecodeError")
208 | return False
209 |
210 | except Exception as e:
211 | print(f"❌ Test failed with exception: {e}")
212 | import traceback
213 | traceback.print_exc()
214 | print("DEBUG: test_dbt_ls_with_profiles_dir raising exception")
215 | raise
216 |
217 | def test_dbt_ls_verbose():
218 | """Test the dbt_ls tool with verbose flag"""
219 | print("Testing dbt_ls tool with verbose flag...")
220 |
221 | try:
222 | # First test with default (simplified) output
223 | print("Listing models with simplified output (default)...")
224 | simplified_result = run_cli_command("ls", {
225 | "project_dir": str(JAFFLE_SHOP_PATH),
226 | "profiles_dir": str(JAFFLE_SHOP_PATH),
227 | "resource_type": "model",
228 | "output_format": "json"
229 | })
230 |
231 | # Then test with verbose output
232 | print("Listing models with verbose output...")
233 | verbose_result = run_cli_command("ls", {
234 | "project_dir": str(JAFFLE_SHOP_PATH),
235 | "profiles_dir": str(JAFFLE_SHOP_PATH),
236 | "resource_type": "model",
237 | "output_format": "json",
238 | "verbose": True
239 | })
240 |
241 | # Parse both results
242 | try:
243 | simplified_data = json.loads(simplified_result)
244 | verbose_data = json.loads(verbose_result)
245 |
246 | # Extract the actual output from the JSON responses
247 | if isinstance(simplified_data, dict) and "output" in simplified_data:
248 | simplified_output = simplified_data["output"]
249 | if isinstance(simplified_output, str) and (simplified_output.startswith("[") or simplified_output.startswith("{")):
250 | simplified_output = json.loads(simplified_output)
251 | else:
252 | simplified_output = simplified_data
253 |
254 | if isinstance(verbose_data, dict) and "output" in verbose_data:
255 | verbose_output = verbose_data["output"]
256 | if isinstance(verbose_output, str) and (verbose_output.startswith("[") or verbose_output.startswith("{")):
257 | verbose_output = json.loads(verbose_output)
258 | else:
259 | verbose_output = verbose_data
260 |
261 | # Verify both outputs contain the expected models
262 | simplified_models = []
263 | verbose_models = []
264 |
265 | # Debug output
266 | print(f"DEBUG: Simplified output type: {type(simplified_output)}")
267 | if isinstance(simplified_output, list):
268 | print(f"DEBUG: Simplified output length: {len(simplified_output)}")
269 | if simplified_output and len(simplified_output) > 0:
270 | print(f"DEBUG: First simplified item type: {type(simplified_output[0])}")
271 | print(f"DEBUG: First simplified item: {simplified_output[0]}")
272 |
273 | print(f"DEBUG: Verbose output type: {type(verbose_output)}")
274 | if isinstance(verbose_output, list):
275 | print(f"DEBUG: Verbose output length: {len(verbose_output)}")
276 | if verbose_output and len(verbose_output) > 0:
277 | print(f"DEBUG: First verbose item type: {type(verbose_output[0])}")
278 | print(f"DEBUG: First verbose item: {verbose_output[0]}")
279 |
280 | # Process simplified output
281 | if isinstance(simplified_output, list):
282 | for item in simplified_output:
283 | # Handle dictionary items (properly formatted model data)
284 | if isinstance(item, dict) and "name" in item and "resource_type" in item:
285 | simplified_models.append(item["name"])
286 |
287 | # Verify simplified output only has the required fields
288 | if set(item.keys()) != {"name", "resource_type", "depends_on"}:
289 | print(f"❌ Simplified output has unexpected fields: {set(item.keys())}")
290 | return False
291 |
292 | # Verify depends_on only has nodes
293 | if "depends_on" in item and set(item["depends_on"].keys()) != {"nodes"}:
294 | print(f"❌ Simplified output depends_on has unexpected fields: {set(item['depends_on'].keys())}")
295 | return False
296 | # Handle string items (could be model names or log messages)
297 | elif isinstance(item, str):
298 | # Skip log messages and only add actual model names
299 | if not item.startswith('\x1b[') and not any(log_msg in item for log_msg in [
300 | "Running with dbt=", "Registered adapter:", "Found", "Starting"
301 | ]):
302 | simplified_models.append(item)
303 |
304 | # Process verbose output
305 | if isinstance(verbose_output, list):
306 | for item in verbose_output:
307 | # Handle dictionary items (properly formatted model data)
308 | if isinstance(item, dict) and "name" in item and "resource_type" in item:
309 | verbose_models.append(item["name"])
310 |
311 | # Verify verbose output has more fields than simplified
312 | if len(item.keys()) <= 3:
313 | print(f"❌ Verbose output doesn't have enough fields: {set(item.keys())}")
314 | return False
315 |
316 | # Check for fields that should be in verbose but not simplified
317 | for field in ["package_name", "original_file_path", "unique_id", "config"]:
318 | if field not in item:
319 | print(f"❌ Verbose output missing expected field: {field}")
320 | return False
321 | # Handle string items (could be model names or log messages)
322 | elif isinstance(item, str):
323 | # Skip log messages and only add actual model names
324 | if not item.startswith('\x1b[') and not any(log_msg in item for log_msg in [
325 | "Running with dbt=", "Registered adapter:", "Found", "Starting"
326 | ]):
327 | verbose_models.append(item)
328 |
329 | # Filter out any log messages from the model lists
330 | simplified_models = [model for model in simplified_models if model in ["customers", "orders", "stg_customers", "stg_orders", "stg_payments"]]
331 | verbose_models = [model for model in verbose_models if model in ["customers", "orders", "stg_customers", "stg_orders", "stg_payments"]]
332 |
333 | # Sort the model lists for consistent comparison
334 | simplified_models.sort()
335 | verbose_models.sort()
336 |
337 | # Verify both outputs have the same models
338 | expected_models = ["customers", "orders", "stg_customers", "stg_orders", "stg_payments"]
339 | expected_models.sort()
340 |
341 | missing_simplified = [model for model in expected_models if model not in simplified_models]
342 | missing_verbose = [model for model in expected_models if model not in verbose_models]
343 |
344 | if missing_simplified:
345 | print(f"❌ Simplified output missing expected models: {missing_simplified}")
346 | print(f"Found models: {simplified_models}")
347 | return False
348 |
349 | if missing_verbose:
350 | print(f"❌ Verbose output missing expected models: {missing_verbose}")
351 | print(f"Found models: {verbose_models}")
352 | return False
353 |
354 | # Debug output for final model lists
355 | print(f"DEBUG: Final simplified_models: {simplified_models}")
356 | print(f"DEBUG: Final verbose_models: {verbose_models}")
357 | print(f"DEBUG: Models equal? {simplified_models == verbose_models}")
358 |
359 | if simplified_models != verbose_models:
360 | print(f"❌ Simplified and verbose outputs have different models")
361 | print(f"Simplified: {simplified_models}")
362 | print(f"Verbose: {verbose_models}")
363 | return False
364 |
365 | print(f"✅ Found all expected models in both outputs: {expected_models}")
366 | print("✅ Test passed!")
367 | return True
368 |
369 | except json.JSONDecodeError as e:
370 | print(f"❌ Failed to parse JSON results")
371 | print(f"Error: {e}")
372 | return False
373 |
374 | except Exception as e:
375 | print(f"❌ Test failed with exception: {e}")
376 | import traceback
377 | traceback.print_exc()
378 | raise
379 |
380 | if __name__ == "__main__":
381 | success = True
382 | try:
383 | print("DEBUG: Starting test_dbt_ls")
384 | test_ls_result = test_dbt_ls()
385 | print(f"DEBUG: test_dbt_ls result: {test_ls_result}")
386 | success = test_ls_result and success
387 |
388 | print("DEBUG: Starting test_dbt_ls_with_profiles_dir")
389 | profiles_result = test_dbt_ls_with_profiles_dir()
390 | print(f"DEBUG: test_dbt_ls_with_profiles_dir result: {profiles_result}")
391 | success = profiles_result and success
392 |
393 | print("DEBUG: Starting test_dbt_ls_verbose")
394 | verbose_result = test_dbt_ls_verbose()
395 | print(f"DEBUG: test_dbt_ls_verbose result: {verbose_result}")
396 | success = verbose_result and success
397 |
398 | print(f"DEBUG: Final success value: {success}")
399 | exit_code = 0 if success else 1
400 | print(f"DEBUG: Exiting with code: {exit_code}")
401 | sys.exit(exit_code)
402 | except Exception as e:
403 | print(f"DEBUG: Exception occurred: {e}")
404 | sys.exit(1)
```
--------------------------------------------------------------------------------
/src/cli.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Command-line interface for dbt tools.
4 | """
5 |
6 | import os
7 | import sys
8 | import json
9 | import asyncio
10 | import argparse
11 | from pathlib import Path
12 | from typing import Dict, Any, Optional, List
13 |
14 | # No need for these imports anymore
15 | from src.config import initialize as initialize_config
16 |
17 |
18 | def parse_args() -> argparse.Namespace:
19 | """Parse command line arguments."""
20 | parser = argparse.ArgumentParser(description="DBT CLI MCP Command Line Interface")
21 |
22 | # Global options
23 | parser.add_argument(
24 | "--dbt-path",
25 | help="Path to dbt executable",
26 | default=os.environ.get("DBT_PATH", "dbt")
27 | )
28 | parser.add_argument(
29 | "--env-file",
30 | help="Path to environment file",
31 | default=os.environ.get("ENV_FILE", ".env")
32 | )
33 | parser.add_argument(
34 | "--log-level",
35 | help="Logging level",
36 | choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
37 | default=os.environ.get("LOG_LEVEL", "INFO")
38 | )
39 | parser.add_argument(
40 | "--format",
41 | help="Output format",
42 | choices=["text", "json"],
43 | default="text"
44 | )
45 |
46 | # Set up subparsers for each command
47 | subparsers = parser.add_subparsers(dest="command", help="Command to execute")
48 |
49 | # dbt_run command
50 | run_parser = subparsers.add_parser("run", help="Run dbt models")
51 | run_parser.add_argument("--models", help="Specific models to run")
52 | run_parser.add_argument("--selector", help="Named selector to use")
53 | run_parser.add_argument("--exclude", help="Models to exclude")
54 | run_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
55 | run_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")
56 | run_parser.add_argument("--full-refresh", help="Perform a full refresh", action="store_true")
57 |
58 | # dbt_test command
59 | test_parser = subparsers.add_parser("test", help="Run dbt tests")
60 | test_parser.add_argument("--models", help="Specific models to test")
61 | test_parser.add_argument("--selector", help="Named selector to use")
62 | test_parser.add_argument("--exclude", help="Models to exclude")
63 | test_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
64 | test_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")
65 |
66 | # dbt_ls command
67 | ls_parser = subparsers.add_parser("ls", help="List dbt resources")
68 | ls_parser.add_argument("--models", help="Specific models to list")
69 | ls_parser.add_argument("--selector", help="Named selector to use")
70 | ls_parser.add_argument("--exclude", help="Models to exclude")
71 | ls_parser.add_argument("--resource-type", help="Type of resource to list")
72 | ls_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
73 | ls_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")
74 | ls_parser.add_argument("--output-format", help="Output format", choices=["json", "name", "path", "selector"], default="json")
75 | ls_parser.add_argument("--verbose", help="Return full JSON output instead of simplified version", action="store_true")
76 |
77 | # dbt_compile command
78 | compile_parser = subparsers.add_parser("compile", help="Compile dbt models")
79 | compile_parser.add_argument("--models", help="Specific models to compile")
80 | compile_parser.add_argument("--selector", help="Named selector to use")
81 | compile_parser.add_argument("--exclude", help="Models to exclude")
82 | compile_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
83 | compile_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")
84 |
85 | # dbt_debug command
86 | debug_parser = subparsers.add_parser("debug", help="Debug dbt project")
87 | debug_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
88 | debug_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")
89 |
90 | # dbt_deps command
91 | deps_parser = subparsers.add_parser("deps", help="Install dbt package dependencies")
92 | deps_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
93 | deps_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")
94 |
95 | # dbt_seed command
96 | seed_parser = subparsers.add_parser("seed", help="Load CSV files as seed data")
97 | seed_parser.add_argument("--selector", help="Named selector to use")
98 | seed_parser.add_argument("--exclude", help="Seeds to exclude")
99 | seed_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
100 | seed_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")
101 |
102 | # dbt_show command
103 | show_parser = subparsers.add_parser("show", help="Preview model results")
104 | show_parser.add_argument("--models", help="Specific model to show", required=True)
105 | show_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
106 | show_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")
107 | show_parser.add_argument("--limit", help="Limit the number of rows returned", type=int)
108 | show_parser.add_argument("--output-format", help="Output format (json, table, etc.)", default="json")
109 |
110 | # dbt_build command
111 | build_parser = subparsers.add_parser("build", help="Run build command")
112 | build_parser.add_argument("--models", help="Specific models to build")
113 | build_parser.add_argument("--selector", help="Named selector to use")
114 | build_parser.add_argument("--exclude", help="Models to exclude")
115 | build_parser.add_argument("--project-dir", help="Directory containing the dbt project", default=".")
116 | build_parser.add_argument("--profiles-dir", help="Directory containing the profiles.yml file (defaults to project-dir if not specified)")
117 | build_parser.add_argument("--full-refresh", help="Perform a full refresh", action="store_true")
118 |
119 | # configure command
120 | configure_parser = subparsers.add_parser("configure", help="Configure dbt path")
121 | configure_parser.add_argument("path", help="Path to dbt executable")
122 |
123 | return parser.parse_args()
124 |
125 |
126 | # Define tool functions directly
127 | async def run_dbt_run(models=None, selector=None, exclude=None, project_dir=".", profiles_dir=None, full_refresh=False):
128 | """Run dbt models."""
129 | command = ["run"]
130 |
131 | if models:
132 | command.extend(["-s", models])
133 |
134 | if selector:
135 | command.extend(["--selector", selector])
136 |
137 | if exclude:
138 | command.extend(["--exclude", exclude])
139 |
140 | if full_refresh:
141 | command.append("--full-refresh")
142 |
143 | from src.command import execute_dbt_command
144 | result = await execute_dbt_command(command, project_dir, profiles_dir)
145 |
146 | if not result["success"]:
147 | error_msg = f"Error executing dbt run: {result['error']}"
148 | if 'output' in result and result['output']:
149 | error_msg += f"\nOutput: {result['output']}"
150 | return error_msg
151 |
152 | return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"])
153 |
154 | async def run_dbt_test(models=None, selector=None, exclude=None, project_dir=".", profiles_dir=None):
155 | """Run dbt tests."""
156 | command = ["test"]
157 |
158 | if models:
159 | command.extend(["-s", models])
160 |
161 | if selector:
162 | command.extend(["--selector", selector])
163 |
164 | if exclude:
165 | command.extend(["--exclude", exclude])
166 |
167 | from src.command import execute_dbt_command
168 | result = await execute_dbt_command(command, project_dir, profiles_dir)
169 |
170 | if not result["success"]:
171 | error_msg = f"Error executing dbt test: {result['error']}"
172 | if result["output"]:
173 | error_msg += f"\nOutput: {result['output']}"
174 | return error_msg
175 |
176 | return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"])
177 |
178 | async def run_dbt_ls(models=None, selector=None, exclude=None, resource_type=None, project_dir=".", profiles_dir=None, output_format="json", verbose=False):
179 | """List dbt resources."""
180 | command = ["ls"]
181 |
182 | if models:
183 | command.extend(["-s", models])
184 |
185 | if selector:
186 | command.extend(["--selector", selector])
187 |
188 | if exclude:
189 | command.extend(["--exclude", exclude])
190 |
191 | if resource_type:
192 | command.extend(["--resource-type", resource_type])
193 |
194 | command.extend(["--output", output_format])
195 |
196 | from src.command import execute_dbt_command, parse_dbt_list_output
197 | import re
198 |
199 | result = await execute_dbt_command(command, project_dir, profiles_dir)
200 |
201 | if not result["success"]:
202 | error_msg = f"Error executing dbt ls: {result['error']}"
203 | if "output" in result and result["output"]:
204 | error_msg += f"\nOutput: {result['output']}"
205 | return error_msg
206 |
207 | # For json format, parse the output and return as JSON
208 | if output_format == "json":
209 | # Return raw output if it's an empty string or None
210 | if not result["output"]:
211 | return "[]"
212 |
213 | # If the output is already a list, return it directly
214 | if isinstance(result["output"], list):
215 | parsed = result["output"]
216 | else:
217 | # Parse the output
218 | parsed = parse_dbt_list_output(result["output"])
219 |
220 | # If not verbose, simplify the output
221 | if not verbose and parsed:
222 | simplified = []
223 | for item in parsed:
224 | if isinstance(item, dict):
225 | simplified.append({
226 | "name": item.get("name"),
227 | "resource_type": item.get("resource_type"),
228 | "depends_on": {
229 | "nodes": item.get("depends_on", {}).get("nodes", [])
230 | }
231 | })
232 | parsed = simplified
233 |
234 | return json.dumps(parsed, indent=2)
235 |
236 | # For other formats, return the raw output
237 | return str(result["output"])
238 |
239 | async def run_dbt_compile(models=None, selector=None, exclude=None, project_dir=".", profiles_dir=None):
240 | """Compile dbt models."""
241 | command = ["compile"]
242 |
243 | if models:
244 | command.extend(["-s", models])
245 |
246 | if selector:
247 | command.extend(["--selector", selector])
248 |
249 | if exclude:
250 | command.extend(["--exclude", exclude])
251 |
252 | from src.command import execute_dbt_command
253 | result = await execute_dbt_command(command, project_dir, profiles_dir)
254 |
255 | if not result["success"]:
256 | error_msg = f"Error executing dbt compile: {result['error']}"
257 | if result["output"]:
258 | error_msg += f"\nOutput: {result['output']}"
259 | return error_msg
260 |
261 | return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"])
262 |
263 | async def run_dbt_debug(project_dir=".", profiles_dir=None):
264 | """Debug dbt project."""
265 | command = ["debug"]
266 |
267 | from src.command import execute_dbt_command
268 | result = await execute_dbt_command(command, project_dir, profiles_dir)
269 |
270 | if not result["success"]:
271 | error_msg = f"Error executing dbt debug: {result['error']}"
272 | if result["output"]:
273 | error_msg += f"\nOutput: {result['output']}"
274 | return error_msg
275 |
276 | return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"])
277 |
278 | async def run_dbt_deps(project_dir=".", profiles_dir=None):
279 | """Install dbt package dependencies."""
280 | command = ["deps"]
281 |
282 | from src.command import execute_dbt_command
283 | result = await execute_dbt_command(command, project_dir, profiles_dir)
284 |
285 | if not result["success"]:
286 | error_msg = f"Error executing dbt deps: {result['error']}"
287 | if result["output"]:
288 | error_msg += f"\nOutput: {result['output']}"
289 | return error_msg
290 |
291 | return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"])
292 |
293 | async def run_dbt_seed(selector=None, exclude=None, project_dir=".", profiles_dir=None):
294 | """Load CSV files as seed data."""
295 | command = ["seed"]
296 |
297 | if selector:
298 | command.extend(["--selector", selector])
299 |
300 | if exclude:
301 | command.extend(["--exclude", exclude])
302 |
303 | from src.command import execute_dbt_command
304 | result = await execute_dbt_command(command, project_dir, profiles_dir)
305 |
306 | if not result["success"]:
307 | error_msg = f"Error executing dbt seed: {result['error']}"
308 | if result["output"]:
309 | error_msg += f"\nOutput: {result['output']}"
310 | return error_msg
311 |
312 | return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"])
313 | async def run_dbt_show(models, project_dir=".", profiles_dir=None, limit=None, output_format="json"):
314 | """Preview model results."""
315 | # For successful cases, use --quiet and --output json for clean JSON output
316 | # For error cases, don't use --quiet to get the full error message
317 |
318 | from src.command import execute_dbt_command
319 | import re
320 |
321 | # Check if models parameter contains inline SQL
322 | is_inline_sql = models.strip().lower().startswith('select ')
323 |
324 | # If it's inline SQL, strip out any LIMIT clause as we'll handle it with the --limit parameter
325 | if is_inline_sql:
326 | # Use regex to remove LIMIT clause from the SQL
327 | models = re.sub(r'\bLIMIT\s+\d+\b', '', models, flags=re.IGNORECASE)
328 |
329 | # For inline SQL, use the --inline flag with the SQL as its value
330 | command = ["show", f"--inline={models}", "--output", output_format]
331 |
332 | if limit:
333 | command.extend(["--limit", str(limit)])
334 |
335 | result = await execute_dbt_command(command, project_dir, profiles_dir)
336 |
337 | # Check for specific error patterns in the output
338 | if not result["success"] or (
339 | isinstance(result["output"], str) and
340 | any(err in result["output"].lower() for err in ["error", "failed", "syntax", "exception"])
341 | ):
342 | error_msg = "Error executing dbt show with inline SQL"
343 | if result["output"]:
344 | return error_msg + "\n" + str(result["output"])
345 | elif result["error"]:
346 | return error_msg + "\n" + str(result["error"])
347 | else:
348 | return f"{error_msg}: Command failed with exit code {result.get('returncode', 'unknown')}"
349 | else:
350 | # For regular model references, check if the model exists first
351 | check_command = ["ls", "-s", models]
352 | check_result = await execute_dbt_command(check_command, project_dir, profiles_dir)
353 |
354 | # If the model doesn't exist, return the error message
355 | if not check_result["success"] or "does not match any enabled nodes" in str(check_result["output"]):
356 | error_msg = "Error executing dbt show: Model does not exist or is not enabled"
357 | if check_result["output"]:
358 | return error_msg + "\n" + str(check_result["output"])
359 | elif check_result["error"]:
360 | return error_msg + "\n" + str(check_result["error"])
361 | else:
362 | return error_msg
363 |
364 | # If the model exists, run the show command with --quiet and --output json
365 | command = ["show", "-s", models, "--quiet", "--output", output_format]
366 |
367 | if limit:
368 | command.extend(["--limit", str(limit)])
369 |
370 | result = await execute_dbt_command(command, project_dir, profiles_dir)
371 |
372 | # If the command succeeded, return the JSON output
373 | if result["success"]:
374 | return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"])
375 |
376 | # If the command failed, return the error message
377 | error_msg = "Error executing dbt show: "
378 | if result["output"]:
379 | return error_msg + str(result["output"])
380 | elif result["error"]:
381 | return error_msg + str(result["error"])
382 | else:
383 | return f"{error_msg}Command failed with exit code {result.get('returncode', 'unknown')}"
384 |
385 | async def run_dbt_build(models=None, selector=None, exclude=None, project_dir=".", profiles_dir=None, full_refresh=False):
386 | """Run build command."""
387 | command = ["build"]
388 |
389 | if models:
390 | command.extend(["-s", models])
391 |
392 | if selector:
393 | command.extend(["--selector", selector])
394 |
395 | if exclude:
396 | command.extend(["--exclude", exclude])
397 |
398 | if full_refresh:
399 | command.append("--full-refresh")
400 |
401 | from src.command import execute_dbt_command
402 | result = await execute_dbt_command(command, project_dir, profiles_dir)
403 |
404 | if not result["success"]:
405 | error_msg = f"Error executing dbt build: {result['error']}"
406 | if result["output"]:
407 | error_msg += f"\nOutput: {result['output']}"
408 | return error_msg
409 |
410 | return json.dumps(result["output"]) if isinstance(result["output"], (dict, list)) else str(result["output"])
411 |
412 | async def run_configure_dbt_path(path):
413 | """Configure dbt path."""
414 | import os
415 | from src.config import set_config
416 |
417 | if not os.path.isfile(path):
418 | return f"Error: File not found at {path}"
419 |
420 | set_config("dbt_path", path)
421 | return f"dbt path configured to: {path}"
422 |
423 | async def main_async() -> None:
424 | """Main entry point for the CLI."""
425 | args = parse_args()
426 |
427 | # Set environment variables from arguments
428 | os.environ["DBT_PATH"] = args.dbt_path
429 | os.environ["ENV_FILE"] = args.env_file
430 | os.environ["LOG_LEVEL"] = args.log_level
431 |
432 | # Initialize configuration
433 | initialize_config()
434 |
435 | # Map commands to functions
436 | command_map = {
437 | "run": run_dbt_run,
438 | "test": run_dbt_test,
439 | "ls": run_dbt_ls,
440 | "compile": run_dbt_compile,
441 | "debug": run_dbt_debug,
442 | "deps": run_dbt_deps,
443 | "seed": run_dbt_seed,
444 | "show": run_dbt_show,
445 | "build": run_dbt_build,
446 | "configure": run_configure_dbt_path
447 | }
448 |
449 | if args.command not in command_map:
450 | print(f"Command '{args.command}' not found. Use --help for usage information.")
451 | sys.exit(1)
452 |
453 | # Prepare arguments for the function
454 | func_args = {}
455 |
456 | if args.command == "run":
457 | func_args = {
458 | "models": args.models,
459 | "selector": args.selector,
460 | "exclude": args.exclude,
461 | "project_dir": args.project_dir,
462 | "profiles_dir": args.profiles_dir,
463 | "full_refresh": args.full_refresh
464 | }
465 | elif args.command == "test":
466 | func_args = {
467 | "models": args.models,
468 | "selector": args.selector,
469 | "exclude": args.exclude,
470 | "project_dir": args.project_dir,
471 | "profiles_dir": args.profiles_dir
472 | }
473 | elif args.command == "ls":
474 | func_args = {
475 | "models": args.models,
476 | "selector": args.selector,
477 | "exclude": args.exclude,
478 | "resource_type": args.resource_type,
479 | "project_dir": args.project_dir,
480 | "profiles_dir": args.profiles_dir,
481 | "output_format": args.output_format,
482 | "verbose": args.verbose
483 | }
484 | elif args.command == "compile":
485 | func_args = {
486 | "models": args.models,
487 | "selector": args.selector,
488 | "exclude": args.exclude,
489 | "project_dir": args.project_dir,
490 | "profiles_dir": args.profiles_dir
491 | }
492 | elif args.command == "debug":
493 | func_args = {
494 | "project_dir": args.project_dir,
495 | "profiles_dir": args.profiles_dir
496 | }
497 | elif args.command == "deps":
498 | func_args = {
499 | "project_dir": args.project_dir,
500 | "profiles_dir": args.profiles_dir
501 | }
502 | elif args.command == "seed":
503 | func_args = {
504 | "selector": args.selector,
505 | "exclude": args.exclude,
506 | "project_dir": args.project_dir,
507 | "profiles_dir": args.profiles_dir
508 | }
509 | elif args.command == "show":
510 | func_args = {
511 | "models": args.models,
512 | "project_dir": args.project_dir,
513 | "profiles_dir": args.profiles_dir,
514 | "limit": args.limit,
515 | "output_format": args.output_format
516 | }
517 | elif args.command == "build":
518 | func_args = {
519 | "models": args.models,
520 | "selector": args.selector,
521 | "exclude": args.exclude,
522 | "project_dir": args.project_dir,
523 | "profiles_dir": args.profiles_dir,
524 | "full_refresh": args.full_refresh
525 | }
526 | elif args.command == "configure":
527 | func_args = {
528 | "path": args.path
529 | }
530 |
531 | # Execute the function
532 | result = await command_map[args.command](**{k: v for k, v in func_args.items() if v is not None})
533 |
534 | # Print the result
535 | # For dbt_show command errors, print raw output regardless of format
536 | if args.command == "show" and isinstance(result, str) and result.startswith("Error executing dbt show:"):
537 | print(result)
538 | elif args.format == "json":
539 | try:
540 | # If result is already a JSON string, parse it first
541 | if isinstance(result, str) and (result.startswith("{") or result.startswith("[")):
542 | parsed = json.loads(result)
543 | print(json.dumps(parsed, indent=2))
544 | else:
545 | print(json.dumps({"output": result}, indent=2))
546 | except json.JSONDecodeError:
547 | print(json.dumps({"output": result}, indent=2))
548 | else:
549 | print(result)
550 |
551 |
552 | def main_entry() -> None:
553 | """Entry point for setuptools."""
554 | asyncio.run(main_async())
555 |
556 |
557 | if __name__ == "__main__":
558 | asyncio.run(main_async())
```
--------------------------------------------------------------------------------
/src/tools.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | MCP tool implementations for the DBT CLI MCP Server.
3 |
4 | This module defines all the MCP tools that map to dbt CLI commands.
5 | Each tool is a function decorated with @mcp.tool() that handles a specific dbt command.
6 | """
7 |
8 | import logging
9 | import json
10 | import re
11 | from typing import Optional, Dict, Any, List
12 | from functools import partial
13 |
14 | from mcp.server.fastmcp import FastMCP
15 | from pydantic import Field
16 |
17 | from src.command import execute_dbt_command, parse_dbt_list_output, process_command_result
18 | from src.config import get_config, set_config
19 | from src.formatters import default_formatter, ls_formatter, show_formatter
20 |
21 | # Logger for this module
22 | logger = logging.getLogger(__name__)
23 |
24 |
25 | def register_tools(mcp: FastMCP) -> None:
26 | """
27 | Register all tools with the MCP server.
28 |
29 | Args:
30 | mcp: The FastMCP server instance
31 | """
32 |
33 | @mcp.tool()
34 | async def dbt_run(
35 | models: Optional[str] = Field(
36 | default=None,
37 | description="Specific models to run, using the dbt selection syntax (e.g., \"model_name+\")"
38 | ),
39 | selector: Optional[str] = Field(
40 | default=None,
41 | description="Named selector to use"
42 | ),
43 | exclude: Optional[str] = Field(
44 | default=None,
45 | description="Models to exclude"
46 | ),
47 | project_dir: str = Field(
48 | default=".",
49 | description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
50 | ),
51 | profiles_dir: Optional[str] = Field(
52 | default=None,
53 | description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
54 | ),
55 | full_refresh: bool = Field(
56 | default=False,
57 | description="Whether to perform a full refresh"
58 | )
59 | ) -> str:
60 | """Run dbt models. An AI agent should use this tool when it needs to execute dbt models to transform data and build analytical tables in the data warehouse. This is essential for refreshing data or implementing new data transformations in a project.
61 |
62 | Returns:
63 | Output from the dbt run command as text (this command does not support JSON output format)
64 | """
65 | command = ["run"]
66 |
67 | if models:
68 | command.extend(["-s", models])
69 |
70 | if selector:
71 | command.extend(["--selector", selector])
72 |
73 | if exclude:
74 | command.extend(["--exclude", exclude])
75 |
76 | if full_refresh:
77 | command.append("--full-refresh")
78 |
79 | # The --no-print flag is not supported by dbt Cloud CLI
80 | # We'll rely on proper parsing to handle any print macros
81 |
82 | result = await execute_dbt_command(command, project_dir, profiles_dir)
83 |
84 | # Use the centralized result processor
85 | return await process_command_result(result, command_name="run")
86 |
87 | @mcp.tool()
88 | async def dbt_test(
89 | models: Optional[str] = Field(
90 | default=None,
91 | description="Specific models to test, using the dbt selection syntax"
92 | ),
93 | selector: Optional[str] = Field(
94 | default=None,
95 | description="Named selector to use"
96 | ),
97 | exclude: Optional[str] = Field(
98 | default=None,
99 | description="Models to exclude"
100 | ),
101 | project_dir: str = Field(
102 | default=".",
103 | description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
104 | ),
105 | profiles_dir: Optional[str] = Field(
106 | default=None,
107 | description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
108 | )
109 | ) -> str:
110 | """Run dbt tests. An AI agent should use this tool when it needs to validate data quality and integrity by running tests defined in a dbt project. This helps ensure that data transformations meet expected business rules and constraints before being used for analysis or reporting.
111 |
112 | Returns:
113 | Output from the dbt test command as text (this command does not support JSON output format)
114 | """
115 | command = ["test"]
116 |
117 | if models:
118 | command.extend(["-s", models])
119 |
120 | if selector:
121 | command.extend(["--selector", selector])
122 |
123 | if exclude:
124 | command.extend(["--exclude", exclude])
125 |
126 | # The --no-print flag is not supported by dbt Cloud CLI
127 | # We'll rely on proper parsing to handle any print macros
128 |
129 | result = await execute_dbt_command(command, project_dir, profiles_dir)
130 |
131 | # Use the centralized result processor
132 | return await process_command_result(result, command_name="test")
133 |
134 | @mcp.tool()
135 | async def dbt_ls(
136 | models: Optional[str] = Field(
137 | default=None,
138 | description="Specific models to list, using the dbt selection syntax. Note that you probably want to specify your selection here e.g. silver.fact"
139 | ),
140 | selector: Optional[str] = Field(
141 | default=None,
142 | description="Named selector to use"
143 | ),
144 | exclude: Optional[str] = Field(
145 | default=None,
146 | description="Models to exclude"
147 | ),
148 | resource_type: Optional[str] = Field(
149 | default=None,
150 | description="Type of resource to list (model, test, source, etc.)"
151 | ),
152 | project_dir: str = Field(
153 | default=".",
154 | description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
155 | ),
156 | profiles_dir: Optional[str] = Field(
157 | default=None,
158 | description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
159 | ),
160 | output_format: str = Field(
161 | default="json",
162 | description="Output format (json, name, path, or selector)"
163 | ),
164 | verbose: bool = Field(
165 | default=False,
166 | description="Return full JSON output instead of simplified version"
167 | )
168 | ) -> str:
169 | """List dbt resources. An AI agent should use this tool when it needs to discover available models, tests, sources, and other resources within a dbt project. This helps the agent understand the project structure, identify dependencies, and select specific resources for other operations like running or testing.
170 |
171 | Returns:
172 | When output_format is 'json' (default):
173 | - With verbose=False (default): returns a simplified JSON with only name, resource_type, and depends_on.nodes
174 | - With verbose=True: returns a full JSON with all resource details
175 | When output_format is 'name', 'path', or 'selector', returns plain text with the respective format.
176 | """
177 | # Log diagnostic information
178 | logger.info(f"Starting dbt_ls with project_dir={project_dir}, output_format={output_format}")
179 |
180 | command = ["ls"]
181 |
182 | if models:
183 | command.extend(["-s", models])
184 |
185 | if selector:
186 | command.extend(["--selector", selector])
187 |
188 | if exclude:
189 | command.extend(["--exclude", exclude])
190 |
191 | if resource_type:
192 | command.extend(["--resource-type", resource_type])
193 |
194 | command.extend(["--output", output_format])
195 |
196 | command.extend(["--quiet"])
197 |
198 | logger.info(f"Executing dbt command: dbt {' '.join(command)}")
199 | result = await execute_dbt_command(command, project_dir, profiles_dir)
200 | logger.info(f"dbt command result: success={result['success']}, returncode={result.get('returncode')}")
201 |
202 | # Use the centralized result processor with ls_formatter
203 | formatter = partial(ls_formatter, output_format=output_format, verbose=verbose)
204 |
205 | return await process_command_result(
206 | result,
207 | command_name="ls",
208 | output_formatter=formatter,
209 | include_debug_info=True # Include extra debug info for this command
210 | )
211 |
212 | @mcp.tool()
213 | async def dbt_compile(
214 | models: Optional[str] = Field(
215 | default=None,
216 | description="Specific models to compile, using the dbt selection syntax"
217 | ),
218 | selector: Optional[str] = Field(
219 | default=None,
220 | description="Named selector to use"
221 | ),
222 | exclude: Optional[str] = Field(
223 | default=None,
224 | description="Models to exclude"
225 | ),
226 | project_dir: str = Field(
227 | default=".",
228 | description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
229 | ),
230 | profiles_dir: Optional[str] = Field(
231 | default=None,
232 | description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
233 | )
234 | ) -> str:
235 | """Compile dbt models. An AI agent should use this tool when it needs to generate the SQL that will be executed without actually running it against the database. This is valuable for validating SQL syntax, previewing transformations, or investigating how dbt interprets models before committing to execution.
236 |
237 | Returns:
238 | Output from the dbt compile command as text (this command does not support JSON output format)
239 | """
240 | command = ["compile"]
241 |
242 | if models:
243 | command.extend(["-s", models])
244 |
245 | if selector:
246 | command.extend(["--selector", selector])
247 |
248 | if exclude:
249 | command.extend(["--exclude", exclude])
250 |
251 | # The --no-print flag is not supported by dbt Cloud CLI
252 | # We'll rely on proper parsing to handle any print macros
253 |
254 | result = await execute_dbt_command(command, project_dir, profiles_dir)
255 |
256 | # Use the centralized result processor
257 | return await process_command_result(result, command_name="compile")
258 |
259 | @mcp.tool()
260 | async def dbt_debug(
261 | project_dir: str = Field(
262 | default=".",
263 | description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
264 | ),
265 | profiles_dir: Optional[str] = Field(
266 | default=None,
267 | description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
268 | )
269 | ) -> str:
270 | """Run dbt debug to validate the project setup. An AI agent should use this tool when it needs to troubleshoot configuration issues, check database connectivity, or verify that all project dependencies are properly installed. This is essential for diagnosing problems before attempting to run models or tests.
271 |
272 | Returns:
273 | Output from the dbt debug command as text (this command does not support JSON output format)
274 | """
275 | command = ["debug"]
276 |
277 | # The --no-print flag is not supported by dbt Cloud CLI
278 | # We'll rely on proper parsing to handle any print macros
279 |
280 | result = await execute_dbt_command(command, project_dir, profiles_dir)
281 |
282 | # Use the centralized result processor
283 | return await process_command_result(result, command_name="debug")
284 |
285 | @mcp.tool()
286 | async def dbt_deps(
287 | project_dir: str = Field(
288 | default=".",
289 | description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
290 | ),
291 | profiles_dir: Optional[str] = Field(
292 | default=None,
293 | description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
294 | )
295 | ) -> str:
296 | """Install dbt package dependencies. An AI agent should use this tool when it needs to install or update external packages that the dbt project depends on. This ensures that all required modules, macros, and models from other packages are available before running the project.
297 |
298 | Returns:
299 | Output from the dbt deps command as text (this command does not support JSON output format)
300 | """
301 | command = ["deps"]
302 |
303 | # The --no-print flag is not supported by dbt Cloud CLI
304 | # We'll rely on proper parsing to handle any print macros
305 |
306 | result = await execute_dbt_command(command, project_dir, profiles_dir)
307 |
308 | # Use the centralized result processor
309 | return await process_command_result(result, command_name="deps")
310 |
311 | @mcp.tool()
312 | async def dbt_seed(
313 | selector: Optional[str] = Field(
314 | default=None,
315 | description="Named selector to use"
316 | ),
317 | exclude: Optional[str] = Field(
318 | default=None,
319 | description="Seeds to exclude"
320 | ),
321 | project_dir: str = Field(
322 | default=".",
323 | description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
324 | ),
325 | profiles_dir: Optional[str] = Field(
326 | default=None,
327 | description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
328 | )
329 | ) -> str:
330 | """Load CSV files as seed data. An AI agent should use this tool when it needs to load initial data from CSV files into the database. This is essential for creating reference tables, test datasets, or any static data that models will depend on.
331 |
332 | Returns:
333 | Output from the dbt seed command as text (this command does not support JSON output format)
334 | """
335 | command = ["seed"]
336 |
337 | # The --no-print flag is not supported by dbt Cloud CLI
338 | # We'll rely on proper parsing to handle any print macros
339 |
340 | if selector:
341 | command.extend(["--selector", selector])
342 |
343 | if exclude:
344 | command.extend(["--exclude", exclude])
345 |
346 | result = await execute_dbt_command(command, project_dir, profiles_dir)
347 |
348 | # Use the centralized result processor
349 | return await process_command_result(result, command_name="seed")
350 |
351 | @mcp.tool()
352 | async def dbt_show(
353 | models: str = Field(
354 | description="Specific model to show. For model references, use standard dbt syntax like 'model_name'. For inline SQL, use the format 'select * from {{ ref(\"model_name\") }}' to reference other models."
355 | ),
356 | project_dir: str = Field(
357 | default=".",
358 | description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
359 | ),
360 | profiles_dir: Optional[str] = Field(
361 | default=None,
362 | description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
363 | ),
364 | limit: Optional[int] = Field(
365 | default=None,
366 | description="Limit the number of rows returned"
367 | ),
368 | output: Optional[str] = Field(
369 | default="json",
370 | description="Output format (json, table, etc.)"
371 | )
372 | ) -> str:
373 | """Preview the results of a model. An AI agent should use this tool when it needs to preview data from a specific model without materializing it. This helps inspect transformation results, debug issues, or demonstrate how data looks after processing without modifying the target database.
374 |
375 | Returns:
376 | Output from the dbt show command, defaulting to JSON format if not specified
377 | """
378 | # Use enhanced SQL detection
379 | is_inline_sql, sql_type = is_inline_sql_query(models)
380 |
381 | # If it's SQL, check for security risks
382 | if is_inline_sql:
383 | has_risk, risk_reason = contains_mutation_risk(models)
384 | if has_risk:
385 | logger.warning(f"Security risk detected in SQL: {risk_reason}")
386 | error_result = {
387 | "success": False,
388 | "output": f"Security validation failed: {risk_reason}. For security reasons, mutation operations are not allowed.",
389 | "error": "SecurityValidationError",
390 | "returncode": 1
391 | }
392 | return await process_command_result(
393 | error_result,
394 | command_name="show",
395 | include_debug_info=True
396 | )
397 |
398 | logger.info(f"dbt_show called with models={models}, is_inline_sql={is_inline_sql}")
399 |
400 | # If it's inline SQL, strip out any LIMIT clause as we'll handle it with the --limit parameter
401 | if is_inline_sql:
402 | # Use regex to remove LIMIT clause from the SQL
403 | original_models = models
404 | models = re.sub(r'\bLIMIT\s+\d+\b', '', models, flags=re.IGNORECASE)
405 | logger.info(f"Stripped LIMIT clause: {original_models} -> {models}")
406 |
407 | # For inline SQL, use the --inline flag with the SQL as its value
408 | command = ["show", f"--inline={models}", "--output", output or "json"]
409 |
410 | # Only add --limit if the inline type is WITH or SELECT (select_inline vs meta_inline)
411 | if limit and sql_type in ["WITH", "SELECT"]:
412 | command.extend(["--limit", str(limit)])
413 |
414 | logger.info(f"Executing dbt command: {' '.join(command)}")
415 | # Don't use --quiet for inline SQL to ensure we get error messages
416 | result = await execute_dbt_command(command, project_dir, profiles_dir)
417 |
418 | logger.info(f"Command result: success={result['success']}, returncode={result.get('returncode')}")
419 | if isinstance(result["output"], str):
420 | logger.info(f"Output (first 100 chars): {result['output'][:100]}")
421 | elif isinstance(result["output"], (dict, list)):
422 | logger.info(f"Output structure: {json.dumps(result['output'])[:100]}")
423 |
424 | # Check for specific error patterns in the output
425 | if not result["success"] or (
426 | isinstance(result["output"], str) and
427 | any(err in result["output"].lower() for err in ["error", "failed", "syntax", "exception"])
428 | ):
429 | logger.warning(f"Error detected in output: {result['output'][:200]}")
430 | error_result = {
431 | "success": False,
432 | "output": f"Error executing inline SQL\n{result['output']}",
433 | "error": result["error"],
434 | "returncode": result["returncode"]
435 | }
436 | return await process_command_result(
437 | error_result,
438 | command_name="show",
439 | include_debug_info=True
440 | )
441 | else:
442 | # For regular model references, check if the model exists first
443 | check_command = ["ls", "-s", models]
444 | check_result = await execute_dbt_command(check_command, project_dir, profiles_dir)
445 |
446 | # If the model doesn't exist, return the error message
447 | if not check_result["success"] or "does not match any enabled nodes" in str(check_result["output"]):
448 | error_result = {
449 | "success": False,
450 | "output": f"Model does not exist or is not enabled\n{check_result['output']}",
451 | "error": check_result["error"],
452 | "returncode": check_result["returncode"]
453 | }
454 | return await process_command_result(
455 | error_result,
456 | command_name="show",
457 | include_debug_info=True
458 | )
459 |
460 | # If the model exists, run the show command with --quiet and --output json
461 | command = ["show", "-s", models, "--quiet", "--output", output or "json"]
462 |
463 | if limit:
464 | command.extend(["--limit", str(limit)])
465 |
466 | result = await execute_dbt_command(command, project_dir, profiles_dir)
467 |
468 | # Use the centralized result processor
469 | return await process_command_result(
470 | result,
471 | command_name="show",
472 | output_formatter=show_formatter,
473 | include_debug_info=True
474 | )
475 |
476 | @mcp.tool()
477 | async def dbt_build(
478 | models: Optional[str] = Field(
479 | default=None,
480 | description="Specific models to build, using the dbt selection syntax"
481 | ),
482 | selector: Optional[str] = Field(
483 | default=None,
484 | description="Named selector to use"
485 | ),
486 | exclude: Optional[str] = Field(
487 | default=None,
488 | description="Models to exclude"
489 | ),
490 | project_dir: str = Field(
491 | default=".",
492 | description="ABSOLUTE PATH to the directory containing the dbt project (e.g. '/Users/username/projects/dbt_project' not '.')"
493 | ),
494 | profiles_dir: Optional[str] = Field(
495 | default=None,
496 | description="Directory containing the profiles.yml file (defaults to project_dir if not specified)"
497 | ),
498 | full_refresh: bool = Field(
499 | default=False,
500 | description="Whether to perform a full refresh"
501 | )
502 | ) -> str:
503 | """Run build command (seeds, tests, snapshots, and models). An AI agent should use this tool when it needs to execute a comprehensive build process that runs seeds, snapshots, models, and tests in the correct order. This is ideal for complete project deployment or ensuring all components work together.
504 |
505 | Returns:
506 | Output from the dbt build command as text (this command does not support JSON output format)
507 | """
508 | command = ["build"]
509 |
510 | if models:
511 | command.extend(["-s", models])
512 |
513 | if selector:
514 | command.extend(["--selector", selector])
515 |
516 | if exclude:
517 | command.extend(["--exclude", exclude])
518 |
519 | if full_refresh:
520 | command.append("--full-refresh")
521 |
522 | # The --no-print flag is not supported by dbt Cloud CLI
523 | # We'll rely on proper parsing to handle any print macros
524 |
525 | result = await execute_dbt_command(command, project_dir, profiles_dir)
526 |
527 | # Use the centralized result processor
528 | return await process_command_result(result, command_name="build")
529 |
530 | logger.info("Registered all dbt tools")
531 |
532 |
533 | def is_inline_sql_query(query: str) -> tuple[bool, Optional[str]]:
534 | """
535 | Determine if the given string is an inline SQL query or a model reference.
536 |
537 | This function uses multiple heuristics to determine if a string is likely
538 | an SQL query rather than a model name:
539 | 1. Checks for common SQL keywords at the beginning
540 | 2. Looks for SQL syntax patterns
541 | 3. Considers length and complexity
542 | 4. Handles SQL with comments (both single-line and multi-line)
543 | 5. Recognizes dbt templating syntax
544 |
545 | Args:
546 | query: The string to check
547 |
548 | Returns:
549 | A tuple of (is_sql, sql_type) where:
550 | - is_sql: True if the input is SQL, False otherwise
551 | - sql_type: The type of SQL statement if is_sql is True, None otherwise
552 | (e.g., "SELECT", "WITH", "SHOW", etc.)
553 | """
554 | # Normalize the query by trimming whitespace
555 | normalized_query = query.strip()
556 |
557 | # Skip empty queries
558 | if not normalized_query:
559 | return False, None
560 |
561 | # Check if the query contains SQL comments
562 | has_single_line_comment = '--' in normalized_query
563 | has_multi_line_comment = '/*' in normalized_query and '*/' in normalized_query
564 |
565 | # If the query only contains comments, it's still SQL
566 | if has_single_line_comment or has_multi_line_comment:
567 | # Check if it's only comments by removing them and seeing if anything remains
568 | # Remove /* */ style comments
569 | sql_no_comments = re.sub(r'/\*.*?\*/', ' ', normalized_query, flags=re.DOTALL)
570 | # Remove -- style comments
571 | sql_no_comments = re.sub(r'--.*?$', ' ', sql_no_comments, flags=re.MULTILINE)
572 | # Normalize whitespace
573 | sql_no_comments = ' '.join(sql_no_comments.split()).strip()
574 |
575 | if not sql_no_comments:
576 | # If nothing remains after removing comments, it's only comments
577 | return True, "COMMENT"
578 |
579 | # Convert to lowercase for case-insensitive matching
580 | normalized_query_lower = normalized_query.lower()
581 |
582 | # Check for SQL comments at the beginning and skip them for detection
583 | # This handles both single-line (--) and multi-line (/* */) comments
584 | comment_pattern = r'^(\s*(--[^\n]*\n|\s*/\*.*?\*/\s*)*\s*)'
585 | match = re.match(comment_pattern, normalized_query_lower, re.DOTALL)
586 | if match:
587 | # Skip past the comments for keyword detection
588 | start_pos = match.end()
589 | if start_pos >= len(normalized_query_lower):
590 | # If the query is only comments, it's still SQL
591 | return True, "COMMENT"
592 | normalized_query_lower = normalized_query_lower[start_pos:]
593 |
594 | # Common SQL statement starting keywords
595 | sql_starters = {
596 | 'select': 'SELECT',
597 | 'with': 'WITH',
598 | 'show': 'SHOW',
599 | 'describe': 'DESCRIBE',
600 | 'explain': 'EXPLAIN',
601 | 'analyze': 'ANALYZE',
602 | 'use': 'USE',
603 | 'set': 'SET'
604 | }
605 |
606 | # Check if the query starts with a common SQL keyword
607 | for keyword, sql_type in sql_starters.items():
608 | if normalized_query_lower.startswith(keyword + ' '):
609 | return True, sql_type
610 |
611 | # Check for more complex patterns like CTEs
612 | # WITH clause followed by identifier and AS
613 | cte_pattern = r'^\s*with\s+[a-z0-9_]+\s+as\s*\('
614 | if re.search(cte_pattern, normalized_query_lower, re.IGNORECASE):
615 | return True, "WITH"
616 |
617 | # Check for Jinja templating with SQL inside
618 | jinja_sql_pattern = r'{{\s*sql\s*}}'
619 | if re.search(jinja_sql_pattern, normalized_query_lower):
620 | return True, "JINJA"
621 |
622 | # Check for dbt ref or source macros which indicate SQL
623 | dbt_macro_pattern = r'{{\s*(ref|source)\s*\(\s*[\'"]'
624 | if re.search(dbt_macro_pattern, normalized_query_lower):
625 | return True, "DBT_MACRO"
626 |
627 | # If the query contains certain SQL syntax elements, it's likely SQL
628 | sql_syntax_elements = [
629 | r'\bfrom\s+[a-z0-9_]+', # FROM clause
630 | r'\bjoin\s+[a-z0-9_]+', # JOIN clause
631 | r'\bwhere\s+', # WHERE clause
632 | r'\bgroup\s+by\s+', # GROUP BY clause
633 | r'\border\s+by\s+', # ORDER BY clause
634 | r'\bhaving\s+', # HAVING clause
635 | r'\bunion\s+', # UNION operator
636 | r'\bcase\s+when\s+' # CASE expression
637 | ]
638 |
639 | for pattern in sql_syntax_elements:
640 | if re.search(pattern, normalized_query_lower, re.IGNORECASE):
641 | return True, "SQL_SYNTAX"
642 |
643 | # If the query is long and contains spaces, it's more likely to be SQL than a model name
644 | if len(normalized_query_lower) > 30 and ' ' in normalized_query_lower:
645 | return True, "COMPLEX"
646 |
647 | # If none of the above conditions are met, it's likely a model name
648 | return False, None
649 |
650 |
651 | def contains_mutation_risk(sql: str) -> tuple[bool, str]:
652 | """
653 | Check if the SQL query contains potentially dangerous operations.
654 |
655 | This function scans SQL for operations that could modify or delete data,
656 | which should be prohibited in a read-only context like dbt show.
657 |
658 | Args:
659 | sql: The SQL query to check
660 |
661 | Returns:
662 | A tuple of (has_risk, reason) where:
663 | - has_risk: True if the query contains risky operations, False otherwise
664 | - reason: A description of the risk if has_risk is True, empty string otherwise
665 | """
666 | # Normalize the SQL by removing comments and extra whitespace
667 | # This helps prevent comment-based evasion techniques
668 |
669 | # Remove /* */ style comments
670 | sql_no_comments = re.sub(r'/\*.*?\*/', ' ', sql, flags=re.DOTALL)
671 |
672 | # Remove -- style comments
673 | sql_no_comments = re.sub(r'--.*?$', ' ', sql_no_comments, flags=re.MULTILINE)
674 |
675 | # Normalize whitespace
676 | normalized_sql = ' '.join(sql_no_comments.split()).lower()
677 |
678 | # Check for multiple SQL statements (potential SQL injection)
679 | # This needs to be checked first to ensure proper error message
680 | if ';' in normalized_sql:
681 | # Check if there's actual SQL after the semicolon
682 | statements = normalized_sql.split(';')
683 | if len(statements) > 1:
684 | for stmt in statements[1:]:
685 | if stmt.strip():
686 | return True, "Multiple SQL statements detected - potential SQL injection risk"
687 |
688 | # Dangerous SQL operations patterns
689 | dangerous_patterns = [
690 | # Data modification operations
691 | (r'\bdelete\s+from\b', "DELETE operation detected"),
692 | (r'\btruncate\s+table\b', "TRUNCATE operation detected"),
693 | (r'\bdrop\s+table\b', "DROP TABLE operation detected"),
694 | (r'\bdrop\s+database\b', "DROP DATABASE operation detected"),
695 | (r'\bdrop\s+schema\b', "DROP SCHEMA operation detected"),
696 | (r'\balter\s+table\b', "ALTER TABLE operation detected"),
697 | (r'\bcreate\s+table\b', "CREATE TABLE operation detected"),
698 | (r'\bcreate\s+or\s+replace\b', "CREATE OR REPLACE operation detected"),
699 | (r'\binsert\s+into\b', "INSERT operation detected"),
700 | (r'\bupdate\s+.*?\bset\b', "UPDATE operation detected"),
701 | (r'\bmerge\s+into\b', "MERGE operation detected"),
702 |
703 | # Database administration operations
704 | (r'\bgrant\b', "GRANT operation detected"),
705 | (r'\brevoke\b', "REVOKE operation detected"),
706 | (r'\bcreate\s+user\b', "CREATE USER operation detected"),
707 | (r'\balter\s+user\b', "ALTER USER operation detected"),
708 | (r'\bdrop\s+user\b', "DROP USER operation detected"),
709 |
710 | # Execution of arbitrary code
711 | (r'\bexec\b', "EXEC operation detected"),
712 | (r'\bexecute\s+immediate\b', "EXECUTE IMMEDIATE detected"),
713 | (r'\bcall\b', "CALL procedure detected")
714 | ]
715 |
716 | # Check for each dangerous pattern
717 | for pattern, reason in dangerous_patterns:
718 | if re.search(pattern, normalized_sql, re.IGNORECASE):
719 | return True, reason
720 |
721 | # Check for specific Snowflake commands that could be risky
722 | snowflake_patterns = [
723 | (r'\bcopy\s+into\b', "Snowflake COPY INTO operation detected"),
724 | (r'\bunload\s+to\b', "Snowflake UNLOAD operation detected"),
725 | (r'\bput\b', "Snowflake PUT operation detected"),
726 | (r'\bremove\b', "Snowflake REMOVE operation detected"),
727 | (r'\bmodify\b', "Snowflake MODIFY operation detected")
728 | ]
729 |
730 | for pattern, reason in snowflake_patterns:
731 | if re.search(pattern, normalized_sql, re.IGNORECASE):
732 | return True, reason
733 |
734 | # No risks detected
735 | return False, ""
```