# Directory Structure
```
├── .cursor
│ └── rules
│ ├── databricks_api.mdc
│ ├── documentation.mdc
│ ├── environment.mdc
│ ├── mcp_tools.mdc
│ ├── project_structure.mdc
│ ├── python_conventions.mdc
│ └── testing.mdc
├── .cursor.json
├── .env.example
├── .gitignore
├── docs
│ └── phase1.md
├── examples
│ ├── direct_usage.py
│ ├── mcp_client_usage.py
│ └── README.md
├── project_structure.md
├── pyproject.toml
├── README.md
├── scripts
│ ├── run_client_test.ps1
│ ├── run_direct_test.ps1
│ ├── run_direct_test.sh
│ ├── run_list_tools.ps1
│ ├── run_list_tools.sh
│ ├── run_mcp_client_test.ps1
│ ├── run_mcp_client_test.sh
│ ├── run_mcp_test.ps1
│ ├── run_tests.ps1
│ ├── show_clusters.py
│ ├── show_notebooks.py
│ ├── start_mcp_server.ps1
│ ├── start_mcp_server.sh
│ └── test_mcp_server.ps1
├── src
│ ├── __init__.py
│ ├── __main__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── clusters.py
│ │ ├── dbfs.py
│ │ ├── jobs.py
│ │ ├── notebooks.py
│ │ └── sql.py
│ ├── cli
│ │ ├── __init__.py
│ │ └── commands.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ ├── config.py
│ │ └── utils.py
│ ├── main.py
│ └── server
│ ├── __init__.py
│ ├── __main__.py
│ ├── app.py
│ └── databricks_mcp_server.py
├── start_mcp_server.ps1
├── start_mcp_server.sh
├── SUMMARY.md
├── tests
│ ├── __init__.py
│ ├── README.md
│ ├── test_clusters.py
│ ├── test_direct.py
│ ├── test_mcp_client.py
│ ├── test_mcp_server.py
│ └── test_tools.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------
```
1 | # Databricks API configuration
2 | DATABRICKS_HOST=https://adb-xxxxxxxxxxxx.xx.azuredatabricks.net
3 | DATABRICKS_TOKEN=your_databricks_token_here
4 |
5 | # Server configuration
6 | SERVER_HOST=0.0.0.0
7 | SERVER_PORT=8000
8 | DEBUG=False
9 |
10 | # Logging
11 | LOG_LEVEL=INFO
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | # Python virtual environments
2 | venv/
3 | .venv/
4 | env/
5 | ENV/
6 |
7 | # Python bytecode
8 | __pycache__/
9 | *.py[cod]
10 | *$py.class
11 |
12 | # Distribution / packaging
13 | dist/
14 | build/
15 | *.egg-info/
16 |
17 | # Local development settings
18 | .env
19 | .env.local
20 |
21 | # IDE settings
22 | .idea/
23 | .vscode/
24 | *.swp
25 | *.swo
26 |
27 | # OS specific files
28 | .DS_Store
29 | Thumbs.db
30 |
31 | # Logs
32 | *.log
33 | logs/
34 |
35 | # Temporary files
36 | tmp/
37 | temp/
38 |
39 | # uv package manager files
40 | .uv/
41 | uv.lock
42 |
43 | # Databricks-specific
44 | *.dbfs
45 |
46 | # C extensions
47 | *.so
48 |
49 | # Distribution / packaging
50 | .Python
51 | develop-eggs/
52 | downloads/
53 | eggs/
54 | .eggs/
55 | lib/
56 | lib64/
57 | parts/
58 | sdist/
59 | var/
60 | wheels/
61 | .installed.cfg
62 | *.egg
63 | MANIFEST
64 |
65 | # PyInstaller
66 | *.manifest
67 | *.spec
68 |
69 | # Installer logs
70 | pip-log.txt
71 | pip-delete-this-directory.txt
72 |
73 | # Unit test / coverage reports
74 | htmlcov/
75 | .tox/
76 | .coverage
77 | .coverage.*
78 | .cache
79 | nosetests.xml
80 | coverage.xml
81 | *.cover
82 | .hypothesis/
83 | .pytest_cache/
84 |
85 | # Environments
86 | env.bak/
87 | venv.bak/
88 |
89 | # IDEs and editors
90 | *.swp
91 | *.swo
92 | *~
93 |
94 | # OS generated files
95 | .DS_Store?
96 | ._*
97 | .Spotlight-V100
98 | .Trashes
99 | ehthumbs.db
100 | Thumbs.db
101 |
102 | # Log files
103 | *.log
```
--------------------------------------------------------------------------------
/.cursor.json:
--------------------------------------------------------------------------------
```json
1 | {
2 | "name": "Databricks MCP Server",
3 | "description": "A Model Completion Protocol (MCP) server for interacting with Databricks services",
4 | "version": "0.1.0",
5 | "repository": "https://github.com/JustTryAI/databricks-mcp-server",
6 |
7 | "structure": {
8 | "src": {
9 | "description": "Source code for the project",
10 | "children": {
11 | "server": {
12 | "description": "MCP server implementation",
13 | "patterns": ["*_mcp_server.py", "*.py"]
14 | },
15 | "api": {
16 | "description": "API client for Databricks services",
17 | "patterns": ["*.py"]
18 | },
19 | "core": {
20 | "description": "Core functionality and utilities",
21 | "patterns": ["*.py"]
22 | },
23 | "cli": {
24 | "description": "Command-line interface",
25 | "patterns": ["*.py"]
26 | }
27 | }
28 | },
29 | "tests": {
30 | "description": "Test files for the project",
31 | "patterns": ["test_*.py"],
32 | "rules": [
33 | "Each file in src/ should have a corresponding test file in tests/"
34 | ]
35 | },
36 | "examples": {
37 | "description": "Example usage of the MCP server",
38 | "patterns": ["*.py"]
39 | },
40 | "scripts": {
41 | "description": "Helper scripts for running the server and tests",
42 | "patterns": ["*.ps1", "*.sh"]
43 | }
44 | },
45 |
46 | "conventions": {
47 | "python": {
48 | "style": {
49 | "lineLength": 100,
50 | "indentation": {
51 | "type": "spaces",
52 | "size": 4
53 | },
54 | "quotes": {
55 | "default": "double",
56 | "avoidEscape": true
57 | }
58 | },
59 | "imports": {
60 | "ordering": [
61 | "standard_library",
62 | "third_party",
63 | "first_party"
64 | ],
65 | "grouping": true,
66 | "alphabetize": true
67 | },
68 | "docstrings": {
69 | "style": "google",
70 | "required": ["classes", "methods", "functions"]
71 | },
72 | "typings": {
73 | "required": true,
74 | "ignorePatterns": ["tests/*"]
75 | }
76 | },
77 | "naming": {
78 | "variables": "snake_case",
79 | "constants": "UPPER_SNAKE_CASE",
80 | "classes": "PascalCase",
81 | "functions": "snake_case",
82 | "methods": "snake_case",
83 | "files": "snake_case"
84 | }
85 | },
86 |
87 | "patterns": {
88 | "mcp_tools": {
89 | "description": "Pattern for MCP tool definitions",
90 | "example": "async def tool_name(params: Dict[str, Any]) -> Dict[str, Any]: ...",
91 | "rules": [
92 | "Tool functions should be async",
93 | "Tool functions should have clear docstrings describing purpose and parameters",
94 | "Tool functions should have proper error handling",
95 | "Tool functions should return a dictionary that matches the MCP protocol spec"
96 | ]
97 | },
98 | "databricks_api": {
99 | "description": "Pattern for Databricks API calls",
100 | "example": "async def api_call(client, **params): ...",
101 | "rules": [
102 | "API functions should be async",
103 | "API functions should handle rate limiting and retries",
104 | "API functions should provide clear error messages",
105 | "API responses should be validated before returning"
106 | ]
107 | }
108 | },
109 |
110 | "files": {
111 | "required": [
112 | "README.md",
113 | "pyproject.toml",
114 | ".gitignore",
115 | "src/server/databricks_mcp_server.py"
116 | ],
117 | "linting": {
118 | "enabled": true,
119 | "pylint": true,
120 | "flake8": true,
121 | "mypy": true
122 | }
123 | },
124 |
125 | "mcp": {
126 | "protocol_version": "1.0",
127 | "tool_documentation": {
128 | "required_fields": ["name", "description", "parameters", "returns"],
129 | "example": {
130 | "name": "list_clusters",
131 | "description": "Lists all available Databricks clusters",
132 | "parameters": {},
133 | "returns": "List of cluster objects"
134 | }
135 | },
136 | "tool_implementation": {
137 | "error_handling": "All tool functions must return errors as part of the result object with isError: true",
138 | "timeouts": "All tool functions should implement appropriate timeouts",
139 | "progress_reporting": "Long-running operations should provide progress updates"
140 | }
141 | },
142 |
143 | "references": {
144 | "mcp_protocol": "https://modelcontextprotocol.io/llms-full.txt",
145 | "databricks_api": "https://docs.databricks.com/api/azure/workspace/clusters/edit",
146 | "python_sdk": "https://github.com/modelcontextprotocol/python-sdk",
147 | "python_style_guide": "https://peps.python.org/pep-0008/"
148 | },
149 |
150 | "testing": {
151 | "frameworks": ["pytest"],
152 | "coverage": {
153 | "minimum": 80,
154 | "exclude": ["scripts/*", "examples/*"]
155 | },
156 | "strategies": [
157 | "unit_tests",
158 | "integration_tests",
159 | "mcp_protocol_tests"
160 | ]
161 | },
162 |
163 | "documentation": {
164 | "required": [
165 | "README.md",
166 | "tests/README.md",
167 | "examples/README.md"
168 | ],
169 | "api_docs": {
170 | "style": "sphinx",
171 | "output_dir": "docs/api"
172 | }
173 | },
174 |
175 | "environment": {
176 | "python_version": ">=3.10",
177 | "package_manager": "uv",
178 | "virtual_env": ".venv"
179 | }
180 | }
```
--------------------------------------------------------------------------------
/tests/README.md:
--------------------------------------------------------------------------------
```markdown
1 | # Tests for Databricks MCP Server
2 |
3 | This directory contains test scripts for the Databricks MCP server.
4 |
5 | ## Test Files
6 |
7 | 1. **Direct Test (direct_test.py)**
8 |
9 | This test directly instantiates the Databricks MCP server and calls its tools
10 | without going through the MCP protocol. It's useful for testing the core
11 | functionality without the overhead of the MCP protocol.
12 |
13 | 2. **MCP Client Test (mcp_client_test.py)**
14 |
15 | This test uses the MCP client to connect to the Databricks MCP server and test
16 | its tools through the MCP protocol. It's useful for testing the server's
17 | compatibility with the MCP protocol.
18 |
19 | 3. **List Tools Test (list_tools_test.py)**
20 |
21 | This test connects to the Databricks MCP server using the MCP client and lists
22 | all available tools. It's a simple test to verify that the server is running
23 | and properly responding to the MCP protocol.
24 |
25 | ## Running Tests
26 |
27 | You can run the tests using the provided shell scripts in the project root:
28 |
29 | ### Windows (PowerShell)
30 |
31 | ```powershell
32 | .\run_direct_test.ps1 # Run the direct test
33 | .\run_list_tools.ps1 # Run the list tools test
34 | .\run_mcp_client_test.ps1 # Run the MCP client test
35 | ```
36 |
37 | ### Linux/Mac
38 |
39 | ```bash
40 | ./run_direct_test.sh # Run the direct test
41 | ./run_list_tools.sh # Run the list tools test
42 | ./run_mcp_client_test.sh # Run the MCP client test
43 | ```
44 |
45 | ## Running Tests Manually
46 |
47 | If you want to run the tests manually:
48 |
49 | ```bash
50 | # Activate the environment
51 | source .venv/bin/activate # Linux/Mac
52 | # or
53 | .\.venv\Scripts\activate # Windows
54 |
55 | # Run the tests
56 | uv run -m tests.direct_test
57 | uv run -m tests.list_tools_test
58 | uv run -m tests.mcp_client_test
59 | ```
60 |
61 | ## Adding New Tests
62 |
63 | When adding new tests, please follow these guidelines:
64 |
65 | 1. Create a new Python file in the `tests` directory.
66 | 2. Import the necessary modules from the `src` directory.
67 | 3. Create a shell script in the project root to run the test.
68 | 4. Document the test in this README.
```
--------------------------------------------------------------------------------
/examples/README.md:
--------------------------------------------------------------------------------
```markdown
1 | # Databricks MCP Server Examples
2 |
3 | This directory contains examples of how to use the Databricks MCP server.
4 |
5 | ## Example Files
6 |
7 | 1. **Direct Usage (direct_usage.py)**
8 |
9 | This example shows how to directly instantiate and use the Databricks MCP server
10 | without going through the MCP protocol. It demonstrates:
11 |
12 | - Creating a server instance
13 | - Calling tools directly
14 | - Processing the results
15 |
16 | To run this example:
17 | ```bash
18 | uv run examples/direct_usage.py
19 | ```
20 |
21 | 2. **MCP Client Usage (mcp_client_usage.py)**
22 |
23 | This example shows how to use the MCP client to connect to the Databricks MCP server
24 | and call its tools through the MCP protocol. It demonstrates:
25 |
26 | - Connecting to the server using the MCP protocol
27 | - Listing available tools
28 | - Calling tools through the MCP protocol
29 | - Processing the results
30 |
31 | To run this example:
32 | ```bash
33 | uv run examples/mcp_client_usage.py
34 | ```
35 |
36 | ## Running Examples
37 |
38 | Make sure you have the following prerequisites:
39 |
40 | 1. Python 3.10+ installed
41 | 2. `uv` package manager installed (see project README for installation instructions)
42 | 3. Project environment set up with `uv venv`
43 | 4. Dependencies installed with `uv add`
44 | 5. Environment variables set (DATABRICKS_HOST, DATABRICKS_TOKEN)
45 |
46 | First, make sure you're in the project root directory and the virtual environment is activated:
47 |
48 | ```bash
49 | # Windows
50 | .\.venv\Scripts\activate
51 |
52 | # Linux/Mac
53 | source .venv/bin/activate
54 | ```
55 |
56 | Then you can run the examples as shown above.
57 |
58 | ## Example Outputs
59 |
60 | ### Direct Usage Example Output
61 |
62 | ```
63 | Databricks MCP Server - Direct Usage Example
64 | ===========================================
65 |
66 | Databricks Clusters:
67 | ====================
68 |
69 | Cluster 1:
70 | ID: 0220-221815-kzacbcps
71 | Name: Lloyd Burley's Cluster LTS
72 | State: TERMINATED
73 | Spark Version: 15.4.x-scala2.12
74 | Node Type: Standard_DS3_v2
75 |
76 | Databricks Notebooks in /:
77 | ================================
78 |
79 | Notebook: /Shared/example_notebook
80 | Directory: /Users/
81 |
82 | Databricks Jobs:
83 | ================
84 |
85 | Job 1:
86 | ID: 12345
87 | Name: Daily ETL Job
88 | Created: 1740089895875
89 | ```
90 |
91 | ### MCP Client Usage Example Output
92 |
93 | ```
94 | Databricks MCP Server - MCP Client Usage Example
95 | =============================================
96 | 2025-03-13 10:05:23,456 - __main__ - INFO - Connecting to Databricks MCP server...
97 | 2025-03-13 10:05:23,457 - __main__ - INFO - Launching server process...
98 | 2025-03-13 10:05:23,789 - __main__ - INFO - Server launched, creating session...
99 | 2025-03-13 10:05:23,790 - __main__ - INFO - Initializing session...
100 |
101 | Available Tools:
102 | ================
103 | - list_clusters: List all Databricks clusters
104 | - create_cluster: Create a new Databricks cluster with parameters: cluster_name (required), spark_version (required), node_type_id (required), num_workers, autotermination_minutes
105 | - terminate_cluster: Terminate a Databricks cluster with parameter: cluster_id (required)
106 | - get_cluster: Get information about a specific Databricks cluster with parameter: cluster_id (required)
107 | - start_cluster: Start a terminated Databricks cluster with parameter: cluster_id (required)
108 | - list_jobs: List all Databricks jobs
109 | - run_job: Run a Databricks job with parameters: job_id (required), notebook_params (optional)
110 | - list_notebooks: List notebooks in a workspace directory with parameter: path (required)
111 | - export_notebook: Export a notebook from the workspace with parameters: path (required), format (optional, one of: SOURCE, HTML, JUPYTER, DBC)
112 | - list_files: List files and directories in a DBFS path with parameter: dbfs_path (required)
113 | - execute_sql: Execute a SQL statement with parameters: statement (required), warehouse_id (required), catalog (optional), schema (optional)
114 |
115 | Select a tool to run (or 'quit' to exit):
116 | 1. list_clusters
117 | 2. create_cluster
118 | 3. terminate_cluster
119 | 4. get_cluster
120 | 5. start_cluster
121 | 6. list_jobs
122 | 7. run_job
123 | 8. list_notebooks
124 | 9. export_notebook
125 | 10. list_files
126 | 11. execute_sql
127 | ```
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
1 | # Databricks MCP Server
2 |
3 | A Model Completion Protocol (MCP) server for Databricks that provides access to Databricks functionality via the MCP protocol. This allows LLM-powered tools to interact with Databricks clusters, jobs, notebooks, and more.
4 |
5 | ## Features
6 |
7 | - **MCP Protocol Support**: Implements the MCP protocol to allow LLMs to interact with Databricks
8 | - **Databricks API Integration**: Provides access to Databricks REST API functionality
9 | - **Tool Registration**: Exposes Databricks functionality as MCP tools
10 | - **Async Support**: Built with asyncio for efficient operation
11 |
12 | ## Available Tools
13 |
14 | The Databricks MCP Server exposes the following tools:
15 |
16 | - **list_clusters**: List all Databricks clusters
17 | - **create_cluster**: Create a new Databricks cluster
18 | - **terminate_cluster**: Terminate a Databricks cluster
19 | - **get_cluster**: Get information about a specific Databricks cluster
20 | - **start_cluster**: Start a terminated Databricks cluster
21 | - **list_jobs**: List all Databricks jobs
22 | - **run_job**: Run a Databricks job
23 | - **list_notebooks**: List notebooks in a workspace directory
24 | - **export_notebook**: Export a notebook from the workspace
25 | - **list_files**: List files and directories in a DBFS path
26 | - **execute_sql**: Execute a SQL statement
27 |
28 | ## Installation
29 |
30 | ### Prerequisites
31 |
32 | - Python 3.10 or higher
33 | - `uv` package manager (recommended for MCP servers)
34 |
35 | ### Setup
36 |
37 | 1. Install `uv` if you don't have it already:
38 |
39 | ```bash
40 | # MacOS/Linux
41 | curl -LsSf https://astral.sh/uv/install.sh | sh
42 |
43 | # Windows (in PowerShell)
44 | irm https://astral.sh/uv/install.ps1 | iex
45 | ```
46 |
47 | Restart your terminal after installation.
48 |
49 | 2. Clone the repository:
50 | ```bash
51 | git clone https://github.com/JustTryAI/databricks-mcp-server.git
52 | cd databricks-mcp-server
53 | ```
54 |
55 | 3. Set up the project with `uv`:
56 | ```bash
57 | # Create and activate virtual environment
58 | uv venv
59 |
60 | # On Windows
61 | .\.venv\Scripts\activate
62 |
63 | # On Linux/Mac
64 | source .venv/bin/activate
65 |
66 | # Install dependencies in development mode
67 | uv pip install -e .
68 |
69 | # Install development dependencies
70 | uv pip install -e ".[dev]"
71 | ```
72 |
73 | 4. Set up environment variables:
74 | ```bash
75 | # Windows
76 | set DATABRICKS_HOST=https://your-databricks-instance.azuredatabricks.net
77 | set DATABRICKS_TOKEN=your-personal-access-token
78 |
79 | # Linux/Mac
80 | export DATABRICKS_HOST=https://your-databricks-instance.azuredatabricks.net
81 | export DATABRICKS_TOKEN=your-personal-access-token
82 | ```
83 |
84 | You can also create an `.env` file based on the `.env.example` template.
85 |
86 | ## Running the MCP Server
87 |
88 | To start the MCP server, run:
89 |
90 | ```bash
91 | # Windows
92 | .\start_mcp_server.ps1
93 |
94 | # Linux/Mac
95 | ./start_mcp_server.sh
96 | ```
97 |
98 | These wrapper scripts will execute the actual server scripts located in the `scripts` directory. The server will start and be ready to accept MCP protocol connections.
99 |
100 | You can also directly run the server scripts from the scripts directory:
101 |
102 | ```bash
103 | # Windows
104 | .\scripts\start_mcp_server.ps1
105 |
106 | # Linux/Mac
107 | ./scripts/start_mcp_server.sh
108 | ```
109 |
110 | ## Querying Databricks Resources
111 |
112 | The repository includes utility scripts to quickly view Databricks resources:
113 |
114 | ```bash
115 | # View all clusters
116 | uv run scripts/show_clusters.py
117 |
118 | # View all notebooks
119 | uv run scripts/show_notebooks.py
120 | ```
121 |
122 | ## Project Structure
123 |
124 | ```
125 | databricks-mcp-server/
126 | ├── src/ # Source code
127 | │ ├── __init__.py # Makes src a package
128 | │ ├── __main__.py # Main entry point for the package
129 | │ ├── main.py # Entry point for the MCP server
130 | │ ├── api/ # Databricks API clients
131 | │ ├── core/ # Core functionality
132 | │ ├── server/ # Server implementation
133 | │ │ ├── databricks_mcp_server.py # Main MCP server
134 | │ │ └── app.py # FastAPI app for tests
135 | │ └── cli/ # Command-line interface
136 | ├── tests/ # Test directory
137 | ├── scripts/ # Helper scripts
138 | │ ├── start_mcp_server.ps1 # Server startup script (Windows)
139 | │ ├── run_tests.ps1 # Test runner script
140 | │ ├── show_clusters.py # Script to show clusters
141 | │ └── show_notebooks.py # Script to show notebooks
142 | ├── examples/ # Example usage
143 | ├── docs/ # Documentation
144 | └── pyproject.toml # Project configuration
145 | ```
146 |
147 | See `project_structure.md` for a more detailed view of the project structure.
148 |
149 | ## Development
150 |
151 | ### Code Standards
152 |
153 | - Python code follows PEP 8 style guide with a maximum line length of 100 characters
154 | - Use 4 spaces for indentation (no tabs)
155 | - Use double quotes for strings
156 | - All classes, methods, and functions should have Google-style docstrings
157 | - Type hints are required for all code except tests
158 |
159 | ### Linting
160 |
161 | The project uses the following linting tools:
162 |
163 | ```bash
164 | # Run all linters
165 | uv run pylint src/ tests/
166 | uv run flake8 src/ tests/
167 | uv run mypy src/
168 | ```
169 |
170 | ## Testing
171 |
172 | The project uses pytest for testing. To run the tests:
173 |
174 | ```bash
175 | # Run all tests with our convenient script
176 | .\scripts\run_tests.ps1
177 |
178 | # Run with coverage report
179 | .\scripts\run_tests.ps1 -Coverage
180 |
181 | # Run specific tests with verbose output
182 | .\scripts\run_tests.ps1 -Verbose -Coverage tests/test_clusters.py
183 | ```
184 |
185 | You can also run the tests directly with pytest:
186 |
187 | ```bash
188 | # Run all tests
189 | uv run pytest tests/
190 |
191 | # Run with coverage report
192 | uv run pytest --cov=src tests/ --cov-report=term-missing
193 | ```
194 |
195 | A minimum code coverage of 80% is the goal for the project.
196 |
197 | ## Documentation
198 |
199 | - API documentation is generated using Sphinx and can be found in the `docs/api` directory
200 | - All code includes Google-style docstrings
201 | - See the `examples/` directory for usage examples
202 |
203 | ## Examples
204 |
205 | Check the `examples/` directory for usage examples. To run examples:
206 |
207 | ```bash
208 | # Run example scripts with uv
209 | uv run examples/direct_usage.py
210 | uv run examples/mcp_client_usage.py
211 | ```
212 |
213 | ## Contributing
214 |
215 | Contributions are welcome! Please feel free to submit a Pull Request.
216 |
217 | 1. Ensure your code follows the project's coding standards
218 | 2. Add tests for any new functionality
219 | 3. Update documentation as necessary
220 | 4. Verify all tests pass before submitting
221 |
222 | ## License
223 |
224 | This project is licensed under the MIT License - see the LICENSE file for details.
```
--------------------------------------------------------------------------------
/src/__init__.py:
--------------------------------------------------------------------------------
```python
1 |
```
--------------------------------------------------------------------------------
/src/api/__init__.py:
--------------------------------------------------------------------------------
```python
1 |
```
--------------------------------------------------------------------------------
/src/cli/__init__.py:
--------------------------------------------------------------------------------
```python
1 |
```
--------------------------------------------------------------------------------
/src/core/__init__.py:
--------------------------------------------------------------------------------
```python
1 |
```
--------------------------------------------------------------------------------
/src/server/__init__.py:
--------------------------------------------------------------------------------
```python
1 |
```
--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------
```python
1 |
```
--------------------------------------------------------------------------------
/src/__main__.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Main entry point for running the databricks-mcp-server package.
3 | This allows the package to be run with 'python -m src' or 'uv run src'.
4 | """
5 |
6 | import asyncio
7 | from src.main import main
8 |
9 | if __name__ == "__main__":
10 | asyncio.run(main())
```
--------------------------------------------------------------------------------
/src/server/__main__.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Main entry point for running the server module directly.
3 | This allows the module to be run with 'python -m src.server' or 'uv run src.server'.
4 | """
5 |
6 | import asyncio
7 | from src.server.databricks_mcp_server import main
8 |
9 | if __name__ == "__main__":
10 | asyncio.run(main())
```
--------------------------------------------------------------------------------
/start_mcp_server.sh:
--------------------------------------------------------------------------------
```bash
1 | #!/bin/bash
2 | # Wrapper script to run the MCP server start script from scripts directory
3 |
4 | # Get the directory of this script
5 | SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
6 |
7 | # Change to the script directory
8 | cd "$SCRIPT_DIR"
9 |
10 | # Run the actual server script
11 | "$SCRIPT_DIR/scripts/start_mcp_server.sh"
```
--------------------------------------------------------------------------------
/start_mcp_server.ps1:
--------------------------------------------------------------------------------
```
1 | #!/usr/bin/env pwsh
2 | # Wrapper script to run the MCP server start script from scripts directory
3 |
4 | param(
5 | [switch]$SkipPrompt
6 | )
7 |
8 | # Get the directory of this script
9 | $scriptPath = $MyInvocation.MyCommand.Path
10 | $scriptDir = Split-Path $scriptPath -Parent
11 |
12 | # Change to the script directory
13 | Set-Location $scriptDir
14 |
15 | # Run the actual server script with any parameters passed to this script
16 | if ($SkipPrompt) {
17 | & "$scriptDir\scripts\start_mcp_server.ps1" -SkipPrompt
18 | } else {
19 | & "$scriptDir\scripts\start_mcp_server.ps1"
20 | }
```
--------------------------------------------------------------------------------
/scripts/show_clusters.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Simple script to show clusters from Databricks
3 | """
4 |
5 | import asyncio
6 | import json
7 | import sys
8 | from src.api import clusters
9 |
10 | async def show_all_clusters():
11 | """Show all clusters in the Databricks workspace."""
12 | print("Fetching clusters from Databricks...")
13 | try:
14 | result = await clusters.list_clusters()
15 | print("\nClusters found:")
16 | print(json.dumps(result, indent=2))
17 | return result
18 | except Exception as e:
19 | print(f"Error listing clusters: {e}")
20 | return None
21 |
22 | if __name__ == "__main__":
23 | asyncio.run(show_all_clusters())
```
--------------------------------------------------------------------------------
/scripts/show_notebooks.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Simple script to show notebooks from Databricks
3 | """
4 |
5 | import asyncio
6 | import json
7 | import sys
8 | from src.api import notebooks
9 |
10 | async def show_all_notebooks():
11 | """Show all notebooks in the Databricks workspace."""
12 | print("Fetching notebooks from Databricks...")
13 | try:
14 | result = await notebooks.list_notebooks(path="/")
15 | print("\nNotebooks found:")
16 | print(json.dumps(result, indent=2))
17 | return result
18 | except Exception as e:
19 | print(f"Error listing notebooks: {e}")
20 | return None
21 |
22 | if __name__ == "__main__":
23 | asyncio.run(show_all_notebooks())
```
--------------------------------------------------------------------------------
/scripts/run_direct_test.sh:
--------------------------------------------------------------------------------
```bash
1 | #!/bin/bash
2 |
3 | # Check if the virtual environment exists
4 | if [ ! -d ".venv" ]; then
5 | echo "Virtual environment not found. Please create it first:"
6 | echo "uv venv"
7 | exit 1
8 | fi
9 |
10 | # Activate the virtual environment
11 | source .venv/bin/activate
12 |
13 | # Check if environment variables are set
14 | if [ -z "$DATABRICKS_HOST" ] || [ -z "$DATABRICKS_TOKEN" ]; then
15 | echo "Warning: DATABRICKS_HOST and/or DATABRICKS_TOKEN environment variables are not set."
16 | echo "Please set them before running the test."
17 | exit 1
18 | fi
19 |
20 | # Run the direct test
21 | echo "Running direct test at $(date)"
22 | echo "Databricks Host: $DATABRICKS_HOST"
23 |
24 | uv run -m tests.direct_test
25 |
26 | echo "Test completed at $(date)"
```
--------------------------------------------------------------------------------
/scripts/run_tests.ps1:
--------------------------------------------------------------------------------
```
1 | #!/usr/bin/env pwsh
2 | # Run tests for the Databricks MCP server
3 |
4 | param(
5 | [string]$TestPath = "tests/",
6 | [switch]$Coverage,
7 | [switch]$Verbose
8 | )
9 |
10 | # Check if the virtual environment exists
11 | if (-not (Test-Path -Path ".venv")) {
12 | Write-Host "Virtual environment not found. Please create it first:"
13 | Write-Host "uv venv"
14 | exit 1
15 | }
16 |
17 | # Activate virtual environment
18 | . .\.venv\Scripts\Activate.ps1
19 |
20 | # Base command
21 | $cmd = "uv run pytest"
22 |
23 | # Add verbose flag if specified
24 | if ($Verbose) {
25 | $cmd += " -v"
26 | }
27 |
28 | # Add coverage if specified
29 | if ($Coverage) {
30 | $cmd += " --cov=src --cov-report=term-missing"
31 | }
32 |
33 | # Add test path
34 | $cmd += " $TestPath"
35 |
36 | Write-Host "Running: $cmd"
37 | Invoke-Expression $cmd
38 |
39 | # Print summary
40 | Write-Host "`nTest run completed at $(Get-Date)"
```
--------------------------------------------------------------------------------
/scripts/run_direct_test.ps1:
--------------------------------------------------------------------------------
```
1 | #!/usr/bin/env pwsh
2 | # PowerShell script to run the direct test
3 |
4 | # Check if the virtual environment exists
5 | if (-not (Test-Path -Path ".venv")) {
6 | Write-Host "Virtual environment not found. Please create it first:"
7 | Write-Host "uv venv"
8 | exit 1
9 | }
10 |
11 | # Activate virtual environment
12 | . .\.venv\Scripts\Activate.ps1
13 |
14 | # Check if environment variables are set
15 | if (-not (Get-Item -Path Env:DATABRICKS_HOST -ErrorAction SilentlyContinue) -or
16 | -not (Get-Item -Path Env:DATABRICKS_TOKEN -ErrorAction SilentlyContinue)) {
17 | Write-Host "Warning: DATABRICKS_HOST and/or DATABRICKS_TOKEN environment variables are not set."
18 | Write-Host "Please set them before running the test."
19 | exit 1
20 | }
21 |
22 | # Run the direct test
23 | Write-Host "Running direct test at $(Get-Date)"
24 | Write-Host "Databricks Host: $env:DATABRICKS_HOST"
25 |
26 | uv run -m tests.direct_test
27 |
28 | Write-Host "Test completed at $(Get-Date)"
```
--------------------------------------------------------------------------------
/scripts/start_mcp_server.sh:
--------------------------------------------------------------------------------
```bash
1 | #!/bin/bash
2 |
3 | # Check if the virtual environment exists
4 | if [ ! -d ".venv" ]; then
5 | echo "Virtual environment not found. Please create it first:"
6 | echo "uv venv"
7 | exit 1
8 | fi
9 |
10 | # Activate the virtual environment
11 | source .venv/bin/activate
12 |
13 | # Check if environment variables are set
14 | if [ -z "$DATABRICKS_HOST" ] || [ -z "$DATABRICKS_TOKEN" ]; then
15 | echo "Warning: DATABRICKS_HOST and/or DATABRICKS_TOKEN environment variables are not set."
16 | echo "You can set them now or the server will look for them in other sources."
17 | read -p "Do you want to continue? (y/n) " -n 1 -r
18 | echo
19 | if [[ ! $REPLY =~ ^[Yy]$ ]]; then
20 | exit 1
21 | fi
22 | fi
23 |
24 | # Start the server by running the module directly
25 | echo "Starting Databricks MCP server at $(date)"
26 | if [ -n "$DATABRICKS_HOST" ]; then
27 | echo "Databricks Host: $DATABRICKS_HOST"
28 | fi
29 |
30 | uv run src.server.databricks_mcp_server
31 |
32 | echo "Server stopped at $(date)"
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
1 | [build-system]
2 | requires = ["hatchling"]
3 | build-backend = "hatchling.build"
4 |
5 | [project]
6 | name = "databricks-mcp-server"
7 | version = "0.1.0"
8 | description = "A Model Completion Protocol (MCP) server for Databricks"
9 | authors = [
10 | {name = "MCP Server Team", email = "[email protected]"}
11 | ]
12 | requires-python = ">=3.10"
13 | readme = "README.md"
14 | license = {text = "MIT"}
15 | classifiers = [
16 | "Programming Language :: Python :: 3",
17 | "Programming Language :: Python :: 3.10",
18 | "Programming Language :: Python :: 3.11",
19 | "License :: OSI Approved :: MIT License",
20 | "Operating System :: OS Independent",
21 | ]
22 | dependencies = [
23 | "mcp[cli]>=1.2.0",
24 | "httpx",
25 | "databricks-sdk",
26 | ]
27 |
28 | [project.optional-dependencies]
29 | cli = [
30 | "click",
31 | ]
32 | dev = [
33 | "black",
34 | "pylint",
35 | "pytest",
36 | "pytest-asyncio",
37 | ]
38 |
39 | [project.scripts]
40 | databricks-mcp = "src.cli.commands:main"
41 |
42 | [tool.hatch.build.targets.wheel]
43 | packages = ["src"]
44 |
```
--------------------------------------------------------------------------------
/scripts/run_list_tools.sh:
--------------------------------------------------------------------------------
```bash
1 | #!/bin/bash
2 |
3 | # Check if the virtual environment exists
4 | if [ ! -d ".venv" ]; then
5 | echo "Virtual environment not found. Please create it first:"
6 | echo "uv venv"
7 | exit 1
8 | fi
9 |
10 | # Activate the virtual environment
11 | source .venv/bin/activate
12 |
13 | # Run the list tools test
14 | echo "Running list tools test at $(date)"
15 |
16 | # Check if the server is already running
17 | if ! pgrep -f "uv run.*src.server.databricks_mcp_server" > /dev/null; then
18 | echo "Starting MCP server in the background..."
19 | # Start the server in the background
20 | uv run src.server.databricks_mcp_server > server.log 2>&1 &
21 | SERVER_PID=$!
22 | echo "Server started with PID $SERVER_PID"
23 | # Give the server a moment to start
24 | sleep 2
25 | SERVER_STARTED=true
26 | else
27 | echo "MCP server is already running"
28 | SERVER_STARTED=false
29 | fi
30 |
31 | # Run the list tools test
32 | echo "Running list tools test..."
33 | uv run -m tests.list_tools_test
34 |
35 | # If we started the server, stop it
36 | if [ "$SERVER_STARTED" = true ]; then
37 | echo "Stopping MCP server (PID $SERVER_PID)..."
38 | kill $SERVER_PID
39 | echo "Server stopped"
40 | fi
41 |
42 | echo "Test completed at $(date)"
```
--------------------------------------------------------------------------------
/scripts/run_mcp_client_test.sh:
--------------------------------------------------------------------------------
```bash
1 | #!/bin/bash
2 |
3 | # Check if the virtual environment exists
4 | if [ ! -d ".venv" ]; then
5 | echo "Virtual environment not found. Please create it first:"
6 | echo "uv venv"
7 | exit 1
8 | fi
9 |
10 | # Activate the virtual environment
11 | source .venv/bin/activate
12 |
13 | # Run the MCP client test
14 | echo "Running MCP client test at $(date)"
15 |
16 | # Check if the server is already running
17 | if ! pgrep -f "uv run.*src.server.databricks_mcp_server" > /dev/null; then
18 | echo "Starting MCP server in the background..."
19 | # Start the server in the background
20 | uv run src.server.databricks_mcp_server > server.log 2>&1 &
21 | SERVER_PID=$!
22 | echo "Server started with PID $SERVER_PID"
23 | # Give the server a moment to start
24 | sleep 2
25 | SERVER_STARTED=true
26 | else
27 | echo "MCP server is already running"
28 | SERVER_STARTED=false
29 | fi
30 |
31 | # Run the MCP client test
32 | echo "Running MCP client test..."
33 | uv run -m tests.mcp_client_test
34 |
35 | # If we started the server, stop it
36 | if [ "$SERVER_STARTED" = true ]; then
37 | echo "Stopping MCP server (PID $SERVER_PID)..."
38 | kill $SERVER_PID
39 | echo "Server stopped"
40 | fi
41 |
42 | echo "Test completed at $(date)"
```
--------------------------------------------------------------------------------
/scripts/test_mcp_server.ps1:
--------------------------------------------------------------------------------
```
1 | #!/usr/bin/env pwsh
2 | # PowerShell script to run the MCP server test
3 |
4 | # Check if the virtual environment exists
5 | if (-not (Test-Path -Path ".venv")) {
6 | Write-Host "Virtual environment not found. Please create it first:"
7 | Write-Host "uv venv"
8 | exit 1
9 | }
10 |
11 | # Activate virtual environment
12 | . .\.venv\Scripts\Activate.ps1
13 |
14 | # Ensure no MCP servers are already running
15 | $serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*start_mcp_server.ps1*" }
16 | if ($serverProcesses) {
17 | Write-Host "Found existing MCP server processes, stopping them first..."
18 | $serverProcesses | ForEach-Object {
19 | Stop-Process -Id $_.Id -Force
20 | Write-Host "Stopped process $($_.Id)"
21 | }
22 | }
23 |
24 | # Run the test
25 | Write-Host "Running MCP server tests..."
26 | uv run test_mcp_server.py
27 |
28 | # When done, clean up any leftover processes
29 | $serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*start_mcp_server.ps1*" }
30 | if ($serverProcesses) {
31 | Write-Host "Cleaning up any remaining MCP server processes..."
32 | $serverProcesses | ForEach-Object {
33 | Stop-Process -Id $_.Id -Force
34 | Write-Host "Stopped process $($_.Id)"
35 | }
36 | }
```
--------------------------------------------------------------------------------
/scripts/start_mcp_server.ps1:
--------------------------------------------------------------------------------
```
1 | #!/usr/bin/env pwsh
2 | # Start script for the Databricks MCP server
3 |
4 | param(
5 | [switch]$SkipPrompt
6 | )
7 |
8 | # Check if the virtual environment exists
9 | if (-not (Test-Path -Path ".venv")) {
10 | Write-Host "Virtual environment not found. Please create it first:"
11 | Write-Host "uv venv"
12 | exit 1
13 | }
14 |
15 | # Activate virtual environment
16 | . .\.venv\Scripts\Activate.ps1
17 |
18 | # Check if environment variables are set
19 | if (-not (Get-Item -Path Env:DATABRICKS_HOST -ErrorAction SilentlyContinue) -or
20 | -not (Get-Item -Path Env:DATABRICKS_TOKEN -ErrorAction SilentlyContinue)) {
21 | Write-Host "Warning: DATABRICKS_HOST and/or DATABRICKS_TOKEN environment variables are not set."
22 | Write-Host "You can set them now or the server will look for them in other sources."
23 |
24 | # Skip prompt when called from tests
25 | if ($SkipPrompt) {
26 | Write-Host "Auto-continuing due to SkipPrompt flag..."
27 | } else {
28 | $continue = Read-Host "Do you want to continue? (y/n)"
29 | if ($continue -ne "y") {
30 | exit 1
31 | }
32 | }
33 | }
34 |
35 | # Start the server
36 | Write-Host "Starting Databricks MCP server at $(Get-Date)"
37 | if (Get-Item -Path Env:DATABRICKS_HOST -ErrorAction SilentlyContinue) {
38 | Write-Host "Databricks Host: $env:DATABRICKS_HOST"
39 | }
40 |
41 | # Try to run the module using python -m
42 | Write-Host "Attempting to start server using module path..."
43 | python -m src.main
44 |
45 | # If the above fails, fallback to direct script execution
46 | if ($LASTEXITCODE -ne 0) {
47 | Write-Host "Module execution failed, trying direct script execution..."
48 | python "$PSScriptRoot\..\src\main.py"
49 | }
50 |
51 | Write-Host "Server stopped at $(Get-Date)"
```
--------------------------------------------------------------------------------
/scripts/run_mcp_test.ps1:
--------------------------------------------------------------------------------
```
1 | #!/usr/bin/env pwsh
2 | # Start MCP server and run tests
3 |
4 | # Check if the virtual environment exists
5 | if (-not (Test-Path -Path ".venv")) {
6 | Write-Host "Virtual environment not found. Please create it first:"
7 | Write-Host "uv venv"
8 | exit 1
9 | }
10 |
11 | # Activate virtual environment
12 | . .\.venv\Scripts\Activate.ps1
13 |
14 | # Make sure there are no existing MCP server processes
15 | $serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*start_mcp_server.ps1*" }
16 | if ($serverProcesses) {
17 | Write-Host "Found existing MCP server processes, stopping them first..."
18 | $serverProcesses | ForEach-Object {
19 | Stop-Process -Id $_.Id -Force
20 | Write-Host "Stopped process $($_.Id)"
21 | }
22 | }
23 |
24 | # Start the MCP server in a new PowerShell window
25 | $serverProcess = Start-Process pwsh -ArgumentList "-File", "scripts\start_mcp_server.ps1" -PassThru -WindowStyle Minimized
26 |
27 | # Give it time to initialize
28 | Write-Host "Waiting for MCP server to initialize..."
29 | Start-Sleep -Seconds 5
30 |
31 | try {
32 | # Run the test
33 | Write-Host "Running test against the MCP server..."
34 | uv run test_running_server.py
35 | }
36 | finally {
37 | # Clean up: stop the server
38 | if ($serverProcess -and !$serverProcess.HasExited) {
39 | Write-Host "Stopping MCP server..."
40 | Stop-Process -Id $serverProcess.Id -Force
41 | }
42 |
43 | # Make sure all MCP server processes are stopped
44 | $serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*start_mcp_server.ps1*" }
45 | if ($serverProcesses) {
46 | $serverProcesses | ForEach-Object {
47 | Stop-Process -Id $_.Id -Force
48 | Write-Host "Stopped process $($_.Id)"
49 | }
50 | }
51 | }
```
--------------------------------------------------------------------------------
/src/main.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Main entry point for the Databricks MCP server.
3 | """
4 |
5 | import asyncio
6 | import logging
7 | import os
8 | import sys
9 | from typing import Optional
10 |
11 | from src.core.config import settings
12 | from src.server.databricks_mcp_server import DatabricksMCPServer
13 |
14 | # Function to start the server - extracted from the server file
15 | async def start_mcp_server():
16 | """Start the MCP server."""
17 | server = DatabricksMCPServer()
18 | await server.run_stdio_async()
19 |
20 |
21 | def setup_logging(log_level: Optional[str] = None):
22 | """
23 | Set up logging configuration.
24 |
25 | Args:
26 | log_level: Optional log level to override the default
27 | """
28 | level = getattr(logging, log_level or settings.LOG_LEVEL)
29 |
30 | logging.basicConfig(
31 | level=level,
32 | format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
33 | handlers=[
34 | logging.StreamHandler(sys.stdout),
35 | ],
36 | )
37 |
38 |
39 | async def main():
40 | """Main entry point."""
41 | # Set up logging
42 | setup_logging()
43 |
44 | # Log startup information
45 | logger = logging.getLogger(__name__)
46 | logger.info(f"Starting Databricks MCP server v{settings.VERSION}")
47 | logger.info(f"Databricks host: {settings.DATABRICKS_HOST}")
48 |
49 | # Start the MCP server
50 | await start_mcp_server()
51 |
52 |
53 | if __name__ == "__main__":
54 | # Parse command line arguments
55 | import argparse
56 |
57 | parser = argparse.ArgumentParser(description="Databricks MCP Server")
58 | parser.add_argument(
59 | "--log-level",
60 | choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
61 | help="Set the log level",
62 | )
63 |
64 | args = parser.parse_args()
65 |
66 | # Set up logging with command line arguments
67 | setup_logging(args.log_level)
68 |
69 | # Run the main function
70 | asyncio.run(main())
```
--------------------------------------------------------------------------------
/scripts/run_client_test.ps1:
--------------------------------------------------------------------------------
```
1 | #!/usr/bin/env pwsh
2 | # PowerShell script to run the MCP client test
3 |
4 | # Check if the virtual environment exists
5 | if (-not (Test-Path -Path ".venv")) {
6 | Write-Host "Virtual environment not found. Please create it first:"
7 | Write-Host "uv venv"
8 | exit 1
9 | }
10 |
11 | # Activate virtual environment
12 | . .\.venv\Scripts\Activate.ps1
13 |
14 | # Make sure there are no existing MCP server processes
15 | $serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*start_mcp_server.ps1*" }
16 | if ($serverProcesses) {
17 | Write-Host "Found existing MCP server processes, stopping them first..."
18 | $serverProcesses | ForEach-Object {
19 | Stop-Process -Id $_.Id -Force
20 | Write-Host "Stopped process $($_.Id)"
21 | }
22 | }
23 |
24 | # Set timeout in seconds
25 | $timeout = 60
26 |
27 | # Run the test with a timeout
28 | Write-Host "Running MCP client test with a $timeout second timeout..."
29 | $job = Start-Job -ScriptBlock {
30 | cd $using:PWD
31 | uv run mcp_client_test.py
32 | }
33 |
34 | # Monitor the job and output in real-time
35 | $start = Get-Date
36 | while ($job.State -eq "Running") {
37 | # Get any new output
38 | $output = Receive-Job -Job $job
39 | if ($output) {
40 | Write-Host $output
41 | }
42 |
43 | # Check if we've hit the timeout
44 | $elapsed = (Get-Date) - $start
45 | if ($elapsed.TotalSeconds -gt $timeout) {
46 | Write-Host "Test is taking too long, terminating..."
47 | Stop-Job -Job $job
48 | break
49 | }
50 |
51 | # Sleep briefly
52 | Start-Sleep -Seconds 1
53 | }
54 |
55 | # Output final results
56 | $output = Receive-Job -Job $job
57 | if ($output) {
58 | Write-Host $output
59 | }
60 |
61 | Remove-Job -Job $job -Force
62 |
63 | # Clean up any leftover processes
64 | $serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*start_mcp_server.ps1*" }
65 | if ($serverProcesses) {
66 | Write-Host "Cleaning up any remaining MCP server processes..."
67 | $serverProcesses | ForEach-Object {
68 | Stop-Process -Id $_.Id -Force
69 | Write-Host "Stopped process $($_.Id)"
70 | }
71 | }
```
--------------------------------------------------------------------------------
/scripts/run_list_tools.ps1:
--------------------------------------------------------------------------------
```
1 | #!/usr/bin/env pwsh
2 | # PowerShell script to run the simple tool lister
3 |
4 | # Check if virtual environment exists
5 | if (-not (Test-Path -Path ".venv")) {
6 | Write-Host "Virtual environment not found. Please create it first:"
7 | Write-Host "uv venv"
8 | exit 1
9 | }
10 |
11 | # Activate virtual environment
12 | . .\.venv\Scripts\Activate.ps1
13 |
14 | # Make sure there are no existing MCP server processes
15 | $serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*start_mcp_server.ps1*" }
16 | if ($serverProcesses) {
17 | Write-Host "Found existing MCP server processes, stopping them first..."
18 | $serverProcesses | ForEach-Object {
19 | Stop-Process -Id $_.Id -Force
20 | Write-Host "Stopped process $($_.Id)"
21 | }
22 | }
23 |
24 | # Set timeout in seconds
25 | $timeout = 20
26 |
27 | # Run the CLI command with a timeout
28 | Write-Host "Running CLI tool listing with a $timeout second timeout..."
29 | $job = Start-Job -ScriptBlock {
30 | cd $using:PWD
31 | cd ..
32 | uv run -m src.cli.commands list-tools
33 | }
34 |
35 | # Monitor the job and output in real-time
36 | $start = Get-Date
37 | while ($job.State -eq "Running") {
38 | # Get any new output
39 | $output = Receive-Job -Job $job
40 | if ($output) {
41 | Write-Host $output
42 | }
43 |
44 | # Check if we've hit the timeout
45 | $elapsed = (Get-Date) - $start
46 | if ($elapsed.TotalSeconds -gt $timeout) {
47 | Write-Host "Command is taking too long, terminating..."
48 | Stop-Job -Job $job
49 | break
50 | }
51 |
52 | # Sleep briefly
53 | Start-Sleep -Milliseconds 500
54 | }
55 |
56 | # Output final results
57 | $output = Receive-Job -Job $job
58 | if ($output) {
59 | Write-Host $output
60 | }
61 |
62 | Remove-Job -Job $job -Force
63 |
64 | # Clean up any leftover processes
65 | $serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*start_mcp_server.ps1*" }
66 | if ($serverProcesses) {
67 | Write-Host "Cleaning up any remaining MCP server processes..."
68 | $serverProcesses | ForEach-Object {
69 | Stop-Process -Id $_.Id -Force
70 | Write-Host "Stopped process $($_.Id)"
71 | }
72 | }
```
--------------------------------------------------------------------------------
/scripts/run_mcp_client_test.ps1:
--------------------------------------------------------------------------------
```
1 | #!/usr/bin/env pwsh
2 | # PowerShell script to run the MCP client test
3 |
4 | # Check if the virtual environment exists
5 | if (-not (Test-Path -Path ".venv")) {
6 | Write-Host "Virtual environment not found. Please create it first:"
7 | Write-Host "uv venv"
8 | exit 1
9 | }
10 |
11 | # Activate virtual environment
12 | . .\.venv\Scripts\Activate.ps1
13 |
14 | # Make sure there are no existing MCP server processes
15 | $serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*databricks_mcp_server*" }
16 | if ($serverProcesses) {
17 | Write-Host "Found existing MCP server processes, stopping them first..."
18 | $serverProcesses | ForEach-Object {
19 | Stop-Process -Id $_.Id -Force
20 | Write-Host "Stopped process $($_.Id)"
21 | }
22 | }
23 |
24 | # Set timeout in seconds
25 | $timeout = 60
26 |
27 | # Run the test with a timeout
28 | Write-Host "Running MCP client test with a $timeout second timeout..."
29 | $job = Start-Job -ScriptBlock {
30 | cd $using:PWD
31 |
32 | # Run the MCP client test
33 | uv run -m tests.mcp_client_test
34 | }
35 |
36 | # Monitor the job and output in real-time
37 | $start = Get-Date
38 | while ($job.State -eq "Running") {
39 | # Get any new output
40 | $output = Receive-Job -Job $job
41 | if ($output) {
42 | Write-Host $output
43 | }
44 |
45 | # Check if we've hit the timeout
46 | $elapsed = (Get-Date) - $start
47 | if ($elapsed.TotalSeconds -gt $timeout) {
48 | Write-Host "Test is taking too long, terminating..."
49 | Stop-Job -Job $job
50 | break
51 | }
52 |
53 | # Sleep briefly
54 | Start-Sleep -Seconds 1
55 | }
56 |
57 | # Output final results
58 | $output = Receive-Job -Job $job
59 | if ($output) {
60 | Write-Host $output
61 | }
62 |
63 | Remove-Job -Job $job -Force
64 |
65 | # Clean up any leftover processes
66 | $serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*databricks_mcp_server*" }
67 | if ($serverProcesses) {
68 | Write-Host "Cleaning up any remaining MCP server processes..."
69 | $serverProcesses | ForEach-Object {
70 | Stop-Process -Id $_.Id -Force
71 | Write-Host "Stopped process $($_.Id)"
72 | }
73 | }
```
--------------------------------------------------------------------------------
/SUMMARY.md:
--------------------------------------------------------------------------------
```markdown
1 | # Databricks MCP Server - Project Summary
2 |
3 | ## Overview
4 |
5 | We've successfully created a Databricks MCP (Model Context Protocol) server that provides tools for interacting with Databricks APIs. The server follows the MCP standard, which allows AI models to interact with external tools and services in a standardized way.
6 |
7 | ## Key Accomplishments
8 |
9 | 1. **Server Implementation**:
10 | - Created a `DatabricksMCPServer` class that inherits from `FastMCP`
11 | - Implemented the MCP protocol for communication with clients
12 | - Set up proper error handling and logging
13 |
14 | 2. **Tool Registration**:
15 | - Registered tools for managing Databricks resources
16 | - Implemented proper parameter validation and error handling
17 | - Added detailed descriptions for each tool
18 |
19 | 3. **API Integration**:
20 | - Implemented functions for interacting with Databricks APIs
21 | - Set up proper authentication using Databricks tokens
22 | - Added error handling for API requests
23 |
24 | 4. **Testing**:
25 | - Created a direct test script to verify server functionality
26 | - Successfully tested the `list_clusters` tool
27 | - Verified that the server can connect to Databricks and retrieve data
28 |
29 | 5. **Documentation**:
30 | - Created a README file with installation and usage instructions
31 | - Documented available tools and their parameters
32 | - Added a requirements.txt file with necessary dependencies
33 |
34 | ## Next Steps
35 |
36 | 1. **Additional Tools**:
37 | - Implement more tools for managing Databricks resources
38 | - Add support for Unity Catalog management
39 | - Add support for Delta Live Tables pipelines
40 |
41 | 2. **Enhanced Testing**:
42 | - Create more comprehensive test scripts
43 | - Add unit tests for individual components
44 | - Set up continuous integration
45 |
46 | 3. **Deployment**:
47 | - Create Docker container for easy deployment
48 | - Add support for running as a service
49 | - Implement authentication for the MCP server
50 |
51 | 4. **Client Integration**:
52 | - Create example clients for different AI models
53 | - Add support for popular AI platforms
54 | - Create documentation for client integration
```
--------------------------------------------------------------------------------
/src/server/app.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | FastAPI application for Databricks API.
3 |
4 | This is a stub module that provides compatibility with existing tests.
5 | The actual implementation uses the MCP protocol directly.
6 | """
7 |
8 | from fastapi import FastAPI
9 |
10 | from src.api import clusters, dbfs, jobs, notebooks, sql
11 | from src.core.config import settings
12 |
13 |
14 | def create_app() -> FastAPI:
15 | """
16 | Create and configure the FastAPI application.
17 |
18 | Returns:
19 | FastAPI: The configured FastAPI application
20 | """
21 | app = FastAPI(
22 | title="Databricks API",
23 | description="API for interacting with Databricks services",
24 | version=settings.VERSION,
25 | )
26 |
27 | # Add routes
28 | @app.get("/api/2.0/clusters/list")
29 | async def list_clusters():
30 | """List all clusters."""
31 | result = await clusters.list_clusters()
32 | return result
33 |
34 | @app.get("/api/2.0/clusters/get/{cluster_id}")
35 | async def get_cluster(cluster_id: str):
36 | """Get cluster details."""
37 | result = await clusters.get_cluster(cluster_id)
38 | return result
39 |
40 | @app.post("/api/2.0/clusters/create")
41 | async def create_cluster(request_data: dict):
42 | """Create a new cluster."""
43 | result = await clusters.create_cluster(request_data)
44 | return result
45 |
46 | @app.post("/api/2.0/clusters/delete")
47 | async def terminate_cluster(request_data: dict):
48 | """Terminate a cluster."""
49 | result = await clusters.terminate_cluster(request_data.get("cluster_id"))
50 | return result
51 |
52 | @app.post("/api/2.0/clusters/start")
53 | async def start_cluster(request_data: dict):
54 | """Start a cluster."""
55 | result = await clusters.start_cluster(request_data.get("cluster_id"))
56 | return result
57 |
58 | @app.post("/api/2.0/clusters/resize")
59 | async def resize_cluster(request_data: dict):
60 | """Resize a cluster."""
61 | result = await clusters.resize_cluster(
62 | request_data.get("cluster_id"),
63 | request_data.get("num_workers")
64 | )
65 | return result
66 |
67 | @app.post("/api/2.0/clusters/restart")
68 | async def restart_cluster(request_data: dict):
69 | """Restart a cluster."""
70 | result = await clusters.restart_cluster(request_data.get("cluster_id"))
71 | return result
72 |
73 | return app
```
--------------------------------------------------------------------------------
/src/core/auth.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Authentication functionality for the Databricks MCP server.
3 | """
4 |
5 | import logging
6 | from typing import Dict, Optional
7 |
8 | from fastapi import Depends, HTTPException, Security, status
9 | from fastapi.security import APIKeyHeader
10 |
11 | from src.core.config import settings
12 |
13 | # Configure logging
14 | logger = logging.getLogger(__name__)
15 |
16 | # API key header scheme
17 | API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False)
18 |
19 |
20 | async def validate_api_key(api_key: Optional[str] = Security(API_KEY_HEADER)) -> Dict[str, str]:
21 | """
22 | Validate API key for protected endpoints.
23 |
24 | Args:
25 | api_key: The API key from the request header
26 |
27 | Returns:
28 | Dictionary with authentication info
29 |
30 | Raises:
31 | HTTPException: If authentication fails
32 | """
33 | # For now, we're using a simple token comparison
34 | # In a production environment, you might want to use a database or more secure method
35 |
36 | # Check if API key is required in the current environment
37 | if not settings.DEBUG:
38 | if not api_key:
39 | logger.warning("Authentication failed: Missing API key")
40 | raise HTTPException(
41 | status_code=status.HTTP_401_UNAUTHORIZED,
42 | detail="Missing API key",
43 | headers={"WWW-Authenticate": "ApiKey"},
44 | )
45 |
46 | # In a real scenario, you would validate against a secure storage
47 | # For demo purposes, we'll just check against an environment variable
48 | # NEVER do this in production - use a proper authentication system!
49 | valid_keys = ["test-api-key"] # Replace with actual implementation
50 |
51 | if api_key not in valid_keys:
52 | logger.warning("Authentication failed: Invalid API key")
53 | raise HTTPException(
54 | status_code=status.HTTP_401_UNAUTHORIZED,
55 | detail="Invalid API key",
56 | headers={"WWW-Authenticate": "ApiKey"},
57 | )
58 |
59 | # Return authentication info
60 | return {"authenticated": True}
61 |
62 |
63 | def get_current_user():
64 | """
65 | Dependency to get current user.
66 |
67 | For future implementation of user-specific functionality.
68 | Currently returns a placeholder.
69 | """
70 | # This would be expanded in a real application with actual user information
71 | return {"username": "admin"}
```
--------------------------------------------------------------------------------
/src/core/config.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Configuration settings for the Databricks MCP server.
3 | """
4 |
5 | import os
6 | from typing import Any, Dict, Optional
7 |
8 | # Import dotenv if available, but don't require it
9 | try:
10 | from dotenv import load_dotenv
11 | # Load .env file if it exists
12 | load_dotenv()
13 | print("Successfully loaded dotenv")
14 | except ImportError:
15 | print("WARNING: python-dotenv not found, environment variables must be set manually")
16 | # We'll just rely on OS environment variables being set manually
17 |
18 | from pydantic import field_validator
19 | from pydantic_settings import BaseSettings
20 |
21 | # Version
22 | VERSION = "0.1.0"
23 |
24 |
25 | class Settings(BaseSettings):
26 | """Base settings for the application."""
27 |
28 | # Databricks API configuration
29 | DATABRICKS_HOST: str = os.environ.get("DATABRICKS_HOST", "https://example.databricks.net")
30 | DATABRICKS_TOKEN: str = os.environ.get("DATABRICKS_TOKEN", "dapi_token_placeholder")
31 |
32 | # Server configuration
33 | SERVER_HOST: str = os.environ.get("SERVER_HOST", "0.0.0.0")
34 | SERVER_PORT: int = int(os.environ.get("SERVER_PORT", "8000"))
35 | DEBUG: bool = os.environ.get("DEBUG", "False").lower() == "true"
36 |
37 | # Logging
38 | LOG_LEVEL: str = os.environ.get("LOG_LEVEL", "INFO")
39 |
40 | # Version
41 | VERSION: str = VERSION
42 |
43 | @field_validator("DATABRICKS_HOST")
44 | def validate_databricks_host(cls, v: str) -> str:
45 | """Validate Databricks host URL."""
46 | if not v.startswith(("https://", "http://")):
47 | raise ValueError("DATABRICKS_HOST must start with http:// or https://")
48 | return v
49 |
50 | class Config:
51 | """Pydantic configuration."""
52 |
53 | env_file = ".env"
54 | case_sensitive = True
55 |
56 |
57 | # Create global settings instance
58 | settings = Settings()
59 |
60 |
61 | def get_api_headers() -> Dict[str, str]:
62 | """Get headers for Databricks API requests."""
63 | return {
64 | "Authorization": f"Bearer {settings.DATABRICKS_TOKEN}",
65 | "Content-Type": "application/json",
66 | }
67 |
68 |
69 | def get_databricks_api_url(endpoint: str) -> str:
70 | """
71 | Construct the full Databricks API URL.
72 |
73 | Args:
74 | endpoint: The API endpoint path, e.g., "/api/2.0/clusters/list"
75 |
76 | Returns:
77 | Full URL to the Databricks API endpoint
78 | """
79 | # Ensure endpoint starts with a slash
80 | if not endpoint.startswith("/"):
81 | endpoint = f"/{endpoint}"
82 |
83 | # Remove trailing slash from host if present
84 | host = settings.DATABRICKS_HOST.rstrip("/")
85 |
86 | return f"{host}{endpoint}"
```
--------------------------------------------------------------------------------
/src/cli/commands.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Command-line interface for the Databricks MCP server.
3 |
4 | This module provides command-line functionality for interacting with the Databricks MCP server.
5 | """
6 |
7 | import argparse
8 | import asyncio
9 | import logging
10 | import sys
11 | from typing import List, Optional
12 |
13 | from src.server.databricks_mcp_server import DatabricksMCPServer, main as server_main
14 |
15 | # Configure logging
16 | logging.basicConfig(
17 | level=logging.INFO,
18 | format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
19 | )
20 | logger = logging.getLogger(__name__)
21 |
22 |
23 | def parse_args(args: Optional[List[str]] = None) -> argparse.Namespace:
24 | """Parse command-line arguments."""
25 | parser = argparse.ArgumentParser(description="Databricks MCP Server CLI")
26 |
27 | # Create subparsers for different commands
28 | subparsers = parser.add_subparsers(dest="command", help="Command to run")
29 |
30 | # Start server command
31 | start_parser = subparsers.add_parser("start", help="Start the MCP server")
32 | start_parser.add_argument(
33 | "--debug", action="store_true", help="Enable debug logging"
34 | )
35 |
36 | # List tools command
37 | list_parser = subparsers.add_parser("list-tools", help="List available tools")
38 |
39 | # Version command
40 | subparsers.add_parser("version", help="Show server version")
41 |
42 | return parser.parse_args(args)
43 |
44 |
45 | async def list_tools() -> None:
46 | """List all available tools in the server."""
47 | server = DatabricksMCPServer()
48 | tools = await server.list_tools()
49 |
50 | print("\nAvailable tools:")
51 | for tool in tools:
52 | print(f" - {tool.name}: {tool.description}")
53 |
54 |
55 | def show_version() -> None:
56 | """Show the server version."""
57 | server = DatabricksMCPServer()
58 | print(f"\nDatabricks MCP Server v{server.version}")
59 |
60 |
61 | def main(args: Optional[List[str]] = None) -> int:
62 | """Main entry point for the CLI."""
63 | parsed_args = parse_args(args)
64 |
65 | # Set log level
66 | if hasattr(parsed_args, "debug") and parsed_args.debug:
67 | logging.getLogger().setLevel(logging.DEBUG)
68 |
69 | # Execute the appropriate command
70 | if parsed_args.command == "start":
71 | logger.info("Starting Databricks MCP server")
72 | asyncio.run(server_main())
73 | elif parsed_args.command == "list-tools":
74 | asyncio.run(list_tools())
75 | elif parsed_args.command == "version":
76 | show_version()
77 | else:
78 | # If no command is provided, show help
79 | parse_args(["--help"])
80 | return 1
81 |
82 | return 0
83 |
84 |
85 | if __name__ == "__main__":
86 | sys.exit(main())
```
--------------------------------------------------------------------------------
/src/api/clusters.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | API for managing Databricks clusters.
3 | """
4 |
5 | import logging
6 | from typing import Any, Dict, List, Optional
7 |
8 | from src.core.utils import DatabricksAPIError, make_api_request
9 |
10 | # Configure logging
11 | logger = logging.getLogger(__name__)
12 |
13 |
14 | async def create_cluster(cluster_config: Dict[str, Any]) -> Dict[str, Any]:
15 | """
16 | Create a new Databricks cluster.
17 |
18 | Args:
19 | cluster_config: Cluster configuration
20 |
21 | Returns:
22 | Response containing the cluster ID
23 |
24 | Raises:
25 | DatabricksAPIError: If the API request fails
26 | """
27 | logger.info("Creating new cluster")
28 | return make_api_request("POST", "/api/2.0/clusters/create", data=cluster_config)
29 |
30 |
31 | async def terminate_cluster(cluster_id: str) -> Dict[str, Any]:
32 | """
33 | Terminate a Databricks cluster.
34 |
35 | Args:
36 | cluster_id: ID of the cluster to terminate
37 |
38 | Returns:
39 | Empty response on success
40 |
41 | Raises:
42 | DatabricksAPIError: If the API request fails
43 | """
44 | logger.info(f"Terminating cluster: {cluster_id}")
45 | return make_api_request("POST", "/api/2.0/clusters/delete", data={"cluster_id": cluster_id})
46 |
47 |
48 | async def list_clusters() -> Dict[str, Any]:
49 | """
50 | List all Databricks clusters.
51 |
52 | Returns:
53 | Response containing a list of clusters
54 |
55 | Raises:
56 | DatabricksAPIError: If the API request fails
57 | """
58 | logger.info("Listing all clusters")
59 | return make_api_request("GET", "/api/2.0/clusters/list")
60 |
61 |
62 | async def get_cluster(cluster_id: str) -> Dict[str, Any]:
63 | """
64 | Get information about a specific cluster.
65 |
66 | Args:
67 | cluster_id: ID of the cluster
68 |
69 | Returns:
70 | Response containing cluster information
71 |
72 | Raises:
73 | DatabricksAPIError: If the API request fails
74 | """
75 | logger.info(f"Getting information for cluster: {cluster_id}")
76 | return make_api_request("GET", "/api/2.0/clusters/get", params={"cluster_id": cluster_id})
77 |
78 |
79 | async def start_cluster(cluster_id: str) -> Dict[str, Any]:
80 | """
81 | Start a terminated Databricks cluster.
82 |
83 | Args:
84 | cluster_id: ID of the cluster to start
85 |
86 | Returns:
87 | Empty response on success
88 |
89 | Raises:
90 | DatabricksAPIError: If the API request fails
91 | """
92 | logger.info(f"Starting cluster: {cluster_id}")
93 | return make_api_request("POST", "/api/2.0/clusters/start", data={"cluster_id": cluster_id})
94 |
95 |
96 | async def resize_cluster(cluster_id: str, num_workers: int) -> Dict[str, Any]:
97 | """
98 | Resize a cluster by changing the number of workers.
99 |
100 | Args:
101 | cluster_id: ID of the cluster to resize
102 | num_workers: New number of workers
103 |
104 | Returns:
105 | Empty response on success
106 |
107 | Raises:
108 | DatabricksAPIError: If the API request fails
109 | """
110 | logger.info(f"Resizing cluster {cluster_id} to {num_workers} workers")
111 | return make_api_request(
112 | "POST",
113 | "/api/2.0/clusters/resize",
114 | data={"cluster_id": cluster_id, "num_workers": num_workers}
115 | )
116 |
117 |
118 | async def restart_cluster(cluster_id: str) -> Dict[str, Any]:
119 | """
120 | Restart a Databricks cluster.
121 |
122 | Args:
123 | cluster_id: ID of the cluster to restart
124 |
125 | Returns:
126 | Empty response on success
127 |
128 | Raises:
129 | DatabricksAPIError: If the API request fails
130 | """
131 | logger.info(f"Restarting cluster: {cluster_id}")
132 | return make_api_request("POST", "/api/2.0/clusters/restart", data={"cluster_id": cluster_id})
```
--------------------------------------------------------------------------------
/docs/phase1.md:
--------------------------------------------------------------------------------
```markdown
1 | Develop a Management Control Platform (MCP) server for Azure Databricks, utilising the following REST API endpoints:
2 |
3 | **1. Cluster Management:**
4 |
5 | - **Create Cluster:** `POST /api/2.0/clusters/create`
6 | - **Terminate Cluster:** `POST /api/2.0/clusters/delete`
7 | - **List Clusters:** `GET /api/2.0/clusters/list`
8 |
9 | **2. Job Management:**
10 |
11 | - **Create Job:** `POST /api/2.0/jobs/create`
12 | - **Run Job:** `POST /api/2.0/jobs/run-now`
13 | - **List Jobs:** `GET /api/2.0/jobs/list`
14 |
15 | **3. Notebook Operations:**
16 |
17 | - **Import Notebook:** `POST /api/2.0/workspace/import`
18 | - **Export Notebook:** `GET /api/2.0/workspace/export`
19 | - **List Notebooks:** `GET /api/2.0/workspace/list`
20 |
21 | **4. Databricks File System (DBFS):**
22 |
23 | - **Upload File:** `POST /api/2.0/dbfs/put`
24 | - **List Files:** `GET /api/2.0/dbfs/list`
25 | - **Delete File:** `POST /api/2.0/dbfs/delete`
26 |
27 | **5. SQL Statement Execution:**
28 |
29 | - **Execute SQL Statement:** `POST /api/2.0/sql/statements/execute`
30 |
31 | **6. Unity Catalog Management:**
32 |
33 | - **Catalog Operations:**
34 | - **Create Catalog:** `POST /api/2.0/unity-catalog/catalogs`
35 | - **List Catalogs:** `GET /api/2.0/unity-catalog/catalogs`
36 | - **Delete Catalog:** `DELETE /api/2.0/unity-catalog/catalogs/{name}`
37 |
38 | - **Schema Operations:**
39 | - **Create Schema:** `POST /api/2.0/unity-catalog/schemas`
40 | - **List Schemas:** `GET /api/2.0/unity-catalog/schemas`
41 | - **Delete Schema:** `DELETE /api/2.0/unity-catalog/schemas/{full_name}`
42 |
43 | - **Table Operations:**
44 | - **Create Table:** `POST /api/2.0/unity-catalog/tables`
45 | - **List Tables:** `GET /api/2.0/unity-catalog/tables`
46 | - **Delete Table:** `DELETE /api/2.0/unity-catalog/tables/{full_name}`
47 |
48 | - **Data Lineage:**
49 | - **Get Table Lineage:** `GET /api/2.0/unity-catalog/lineage-tracking/table-lineage/{table_name}`
50 | - **Get Column Lineage:** `GET /api/2.0/unity-catalog/lineage-tracking/column-lineage/{column_name}`
51 |
52 | **7. Delta Live Tables Pipelines:**
53 |
54 | - **Pipeline Management:**
55 | - **Create Pipeline:** `POST /api/2.0/pipelines`
56 | - **List Pipelines:** `GET /api/2.0/pipelines`
57 | - **Get Pipeline:** `GET /api/2.0/pipelines/{pipeline_id}`
58 | - **Update Pipeline:** `PUT /api/2.0/pipelines/{pipeline_id}`
59 | - **Delete Pipeline:** `DELETE /api/2.0/pipelines/{pipeline_id}`
60 |
61 | - **Pipeline Execution:**
62 | - **Start Update:** `POST /api/2.0/pipelines/{pipeline_id}/updates`
63 | - **List Updates:** `GET /api/2.0/pipelines/{pipeline_id}/updates`
64 | - **Get Update:** `GET /api/2.0/pipelines/{pipeline_id}/updates/{update_id}`
65 |
66 | **8. Databricks SQL Queries:**
67 |
68 | - **Query Management:**
69 | - **Create Query:** `POST /api/2.0/preview/sql/queries`
70 | - **List Queries:** `GET /api/2.0/preview/sql/queries`
71 | - **Get Query:** `GET /api/2.0/preview/sql/queries/{query_id}`
72 | - **Update Query:** `POST /api/2.0/preview/sql/queries/{query_id}`
73 | - **Delete Query:** `DELETE /api/2.0/preview/sql/queries/{query_id}`
74 |
75 | **9. Model Serving Endpoints:**
76 |
77 | - **Serving Endpoint Management:**
78 | - **Create Serving Endpoint:** `POST /api/2.0/serving-endpoints`
79 | - **Get Serving Endpoint:** `GET /api/2.0/serving-endpoints/{name}`
80 | - **Update Serving Endpoint Config:** `PUT /api/2.0/serving-endpoints/{name}/config`
81 | - **Delete Serving Endpoint:** `DELETE /api/2.0/serving-endpoints/{name}`
82 |
83 | - **Querying Serving Endpoints:**
84 | - **Query Serving Endpoint:** `POST /serving-endpoints/{name}/invocations`
85 |
86 | Integrating these API endpoints into our MCP server will enable comprehensive management of our Azure Databricks environment, covering clusters, jobs, notebooks, file systems, SQL execution, Unity Catalog, Delta Live Tables, SQL queries, and model serving. This will also provide a platform that we can add new features when needed.
```
--------------------------------------------------------------------------------
/tests/test_direct.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Direct tests for the Databricks MCP server.
3 |
4 | This module contains tests that directly instantiate and test the server without using MCP protocol.
5 | """
6 |
7 | import asyncio
8 | import json
9 | import logging
10 | import sys
11 | from typing import Dict, Any, List
12 |
13 | from src.server.databricks_mcp_server import DatabricksMCPServer
14 |
15 | # Configure logging
16 | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
17 | logger = logging.getLogger(__name__)
18 |
19 | async def test_list_clusters():
20 | """Test the list_clusters tool directly."""
21 | try:
22 | logger.info("Creating Databricks MCP server instance")
23 | server = DatabricksMCPServer()
24 |
25 | # Test the list_clusters tool
26 | tool_name = "list_clusters"
27 | logger.info(f"Testing tool: {tool_name}")
28 |
29 | # Call the tool with the required params parameter
30 | params: Dict[str, Any] = {"params": {}}
31 | result = await server.call_tool(tool_name, params)
32 |
33 | # Extract text content from the result
34 | if isinstance(result, List) and len(result) > 0:
35 | # Get the first item in the list
36 | item = result[0]
37 |
38 | # Check if the item has a 'text' attribute
39 | if hasattr(item, 'text'):
40 | text = item.text
41 | logger.info(f"Text content: {text[:100]}...") # Show first 100 chars
42 |
43 | # Parse the JSON from the text
44 | try:
45 | # First level of parsing (the text is a JSON string)
46 | parsed_json = json.loads(text)
47 |
48 | # Check if the parsed JSON has a 'text' field (double JSON encoding)
49 | if 'text' in parsed_json:
50 | # Second level of parsing (the text field is also a JSON string)
51 | inner_json = json.loads(parsed_json['text'])
52 | logger.info(f"Parsed clusters data: {json.dumps(inner_json, indent=2)}")
53 |
54 | # Extract cluster information
55 | if 'clusters' in inner_json:
56 | clusters = inner_json['clusters']
57 | logger.info(f"Found {len(clusters)} clusters")
58 |
59 | # Print information about each cluster
60 | for i, cluster in enumerate(clusters):
61 | logger.info(f"Cluster {i+1}:")
62 | logger.info(f" ID: {cluster.get('cluster_id')}")
63 | logger.info(f" Name: {cluster.get('cluster_name')}")
64 | logger.info(f" State: {cluster.get('state')}")
65 |
66 | return True
67 | else:
68 | logger.info(f"Parsed JSON: {json.dumps(parsed_json, indent=2)}")
69 |
70 | except json.JSONDecodeError as e:
71 | logger.error(f"Error parsing JSON: {e}")
72 |
73 | logger.error("Test failed: Could not parse cluster data")
74 | return False
75 |
76 | except Exception as e:
77 | logger.error(f"Error: {e}", exc_info=True)
78 | return False
79 |
80 |
81 | async def main():
82 | """Run all tests."""
83 | logger.info("Running direct tests for Databricks MCP server")
84 |
85 | # Run tests
86 | success = await test_list_clusters()
87 |
88 | if success:
89 | logger.info("All tests passed!")
90 | return 0
91 | else:
92 | logger.error("Tests failed")
93 | return 1
94 |
95 |
96 | if __name__ == "__main__":
97 | sys.exit(asyncio.run(main()))
```
--------------------------------------------------------------------------------
/src/core/utils.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Utility functions for the Databricks MCP server.
3 | """
4 |
5 | import json
6 | import logging
7 | from typing import Any, Dict, List, Optional, Union
8 |
9 | import requests
10 | from requests.exceptions import RequestException
11 |
12 | from src.core.config import get_api_headers, get_databricks_api_url
13 |
14 | # Configure logging
15 | logging.basicConfig(
16 | level=logging.INFO,
17 | format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
18 | )
19 | logger = logging.getLogger(__name__)
20 |
21 |
22 | class DatabricksAPIError(Exception):
23 | """Exception raised for errors in the Databricks API."""
24 |
25 | def __init__(self, message: str, status_code: Optional[int] = None, response: Optional[Any] = None):
26 | self.message = message
27 | self.status_code = status_code
28 | self.response = response
29 | super().__init__(self.message)
30 |
31 |
32 | def make_api_request(
33 | method: str,
34 | endpoint: str,
35 | data: Optional[Dict[str, Any]] = None,
36 | params: Optional[Dict[str, Any]] = None,
37 | files: Optional[Dict[str, Any]] = None,
38 | ) -> Dict[str, Any]:
39 | """
40 | Make a request to the Databricks API.
41 |
42 | Args:
43 | method: HTTP method ("GET", "POST", "PUT", "DELETE")
44 | endpoint: API endpoint path
45 | data: Request body data
46 | params: Query parameters
47 | files: Files to upload
48 |
49 | Returns:
50 | Response data as a dictionary
51 |
52 | Raises:
53 | DatabricksAPIError: If the API request fails
54 | """
55 | url = get_databricks_api_url(endpoint)
56 | headers = get_api_headers()
57 |
58 | try:
59 | # Log the request (omit sensitive information)
60 | safe_data = "**REDACTED**" if data else None
61 | logger.debug(f"API Request: {method} {url} Params: {params} Data: {safe_data}")
62 |
63 | # Convert data to JSON string if provided
64 | json_data = json.dumps(data) if data and not files else data
65 |
66 | # Make the request
67 | response = requests.request(
68 | method=method,
69 | url=url,
70 | headers=headers,
71 | params=params,
72 | data=json_data if not files else data,
73 | files=files,
74 | )
75 |
76 | # Check for HTTP errors
77 | response.raise_for_status()
78 |
79 | # Parse response
80 | if response.content:
81 | return response.json()
82 | return {}
83 |
84 | except RequestException as e:
85 | # Handle request exceptions
86 | status_code = getattr(e.response, "status_code", None) if hasattr(e, "response") else None
87 | error_msg = f"API request failed: {str(e)}"
88 |
89 | # Try to extract error details from response
90 | error_response = None
91 | if hasattr(e, "response") and e.response is not None:
92 | try:
93 | error_response = e.response.json()
94 | error_msg = f"{error_msg} - {error_response.get('error', '')}"
95 | except ValueError:
96 | error_response = e.response.text
97 |
98 | # Log the error
99 | logger.error(f"API Error: {error_msg}", exc_info=True)
100 |
101 | # Raise custom exception
102 | raise DatabricksAPIError(error_msg, status_code, error_response) from e
103 |
104 |
105 | def format_response(
106 | success: bool,
107 | data: Optional[Union[Dict[str, Any], List[Any]]] = None,
108 | error: Optional[str] = None,
109 | status_code: int = 200
110 | ) -> Dict[str, Any]:
111 | """
112 | Format a standardized response.
113 |
114 | Args:
115 | success: Whether the operation was successful
116 | data: Response data
117 | error: Error message if not successful
118 | status_code: HTTP status code
119 |
120 | Returns:
121 | Formatted response dictionary
122 | """
123 | response = {
124 | "success": success,
125 | "status_code": status_code,
126 | }
127 |
128 | if data is not None:
129 | response["data"] = data
130 |
131 | if error:
132 | response["error"] = error
133 |
134 | return response
```
--------------------------------------------------------------------------------
/examples/mcp_client_usage.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Example of using the MCP client with the Databricks MCP server.
3 |
4 | This example shows how to use the MCP client to connect to the Databricks MCP server
5 | and call its tools through the MCP protocol.
6 | """
7 |
8 | import asyncio
9 | import json
10 | import logging
11 | import os
12 | import sys
13 | from typing import Any, Dict, List, Optional
14 |
15 | from mcp.client.stdio import StdioServerParameters, stdio_client
16 | from mcp.client.session import ClientSession
17 |
18 | # Configure logging
19 | logging.basicConfig(
20 | level=logging.INFO,
21 | format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
22 | )
23 | logger = logging.getLogger(__name__)
24 |
25 |
26 | async def connect_and_list_tools():
27 | """Connect to the Databricks MCP server and list its tools."""
28 | logger.info("Connecting to Databricks MCP server...")
29 |
30 | # Define the environment variables the server needs
31 | env = os.environ.copy()
32 |
33 | # Create parameters for connecting to the server
34 | params = StdioServerParameters(
35 | command="pwsh", # Use PowerShell
36 | args=["-File", "./scripts/start_server.ps1"], # Run the startup script
37 | env=env # Pass environment variables
38 | )
39 |
40 | # Use the client to start the server and connect to it
41 | logger.info("Launching server process...")
42 |
43 | async with stdio_client(params) as (recv, send):
44 | logger.info("Server launched, creating session...")
45 | session = ClientSession(recv, send)
46 |
47 | logger.info("Initializing session...")
48 | await session.initialize()
49 |
50 | # List available tools
51 | tools_response = await session.list_tools()
52 | tools = tools_response.tools
53 |
54 | print("\nAvailable Tools:")
55 | print("================")
56 | for tool in tools:
57 | print(f"- {tool.name}: {tool.description}")
58 |
59 | # Let the user select a tool to run
60 | if tools:
61 | while True:
62 | print("\nSelect a tool to run (or 'quit' to exit):")
63 | for i, tool in enumerate(tools):
64 | print(f"{i+1}. {tool.name}")
65 |
66 | choice = input("Enter choice (number or name): ")
67 |
68 | if choice.lower() == 'quit':
69 | break
70 |
71 | # Find the selected tool
72 | selected_tool = None
73 | if choice.isdigit():
74 | idx = int(choice) - 1
75 | if 0 <= idx < len(tools):
76 | selected_tool = tools[idx]
77 | else:
78 | for tool in tools:
79 | if tool.name == choice:
80 | selected_tool = tool
81 | break
82 |
83 | if not selected_tool:
84 | print("Invalid choice. Please try again.")
85 | continue
86 |
87 | # Call the selected tool
88 | print(f"\nRunning tool: {selected_tool.name}")
89 | print("Enter parameters as JSON (empty for no parameters):")
90 | params_str = input("> ")
91 |
92 | try:
93 | params = json.loads(params_str) if params_str else {}
94 | result = await session.call_tool(selected_tool.name, params)
95 | print("\nResult:")
96 | print(json.dumps(result, indent=2))
97 | except Exception as e:
98 | print(f"Error calling tool: {e}")
99 |
100 |
101 | async def main():
102 | """Run the example."""
103 | print("Databricks MCP Server - MCP Client Usage Example")
104 | print("=============================================")
105 |
106 | try:
107 | await connect_and_list_tools()
108 | return 0
109 | except Exception as e:
110 | logger.error(f"Error: {e}", exc_info=True)
111 | return 1
112 |
113 |
114 | if __name__ == "__main__":
115 | sys.exit(asyncio.run(main()))
```
--------------------------------------------------------------------------------
/src/api/jobs.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | API for managing Databricks jobs.
3 | """
4 |
5 | import logging
6 | from typing import Any, Dict, List, Optional
7 |
8 | from src.core.utils import DatabricksAPIError, make_api_request
9 |
10 | # Configure logging
11 | logger = logging.getLogger(__name__)
12 |
13 |
14 | async def create_job(job_config: Dict[str, Any]) -> Dict[str, Any]:
15 | """
16 | Create a new Databricks job.
17 |
18 | Args:
19 | job_config: Job configuration
20 |
21 | Returns:
22 | Response containing the job ID
23 |
24 | Raises:
25 | DatabricksAPIError: If the API request fails
26 | """
27 | logger.info("Creating new job")
28 | return make_api_request("POST", "/api/2.0/jobs/create", data=job_config)
29 |
30 |
31 | async def run_job(job_id: int, notebook_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
32 | """
33 | Run a job now.
34 |
35 | Args:
36 | job_id: ID of the job to run
37 | notebook_params: Optional parameters for the notebook
38 |
39 | Returns:
40 | Response containing the run ID
41 |
42 | Raises:
43 | DatabricksAPIError: If the API request fails
44 | """
45 | logger.info(f"Running job: {job_id}")
46 |
47 | run_params = {"job_id": job_id}
48 | if notebook_params:
49 | run_params["notebook_params"] = notebook_params
50 |
51 | return make_api_request("POST", "/api/2.0/jobs/run-now", data=run_params)
52 |
53 |
54 | async def list_jobs() -> Dict[str, Any]:
55 | """
56 | List all jobs.
57 |
58 | Returns:
59 | Response containing a list of jobs
60 |
61 | Raises:
62 | DatabricksAPIError: If the API request fails
63 | """
64 | logger.info("Listing all jobs")
65 | return make_api_request("GET", "/api/2.0/jobs/list")
66 |
67 |
68 | async def get_job(job_id: int) -> Dict[str, Any]:
69 | """
70 | Get information about a specific job.
71 |
72 | Args:
73 | job_id: ID of the job
74 |
75 | Returns:
76 | Response containing job information
77 |
78 | Raises:
79 | DatabricksAPIError: If the API request fails
80 | """
81 | logger.info(f"Getting information for job: {job_id}")
82 | return make_api_request("GET", "/api/2.0/jobs/get", params={"job_id": job_id})
83 |
84 |
85 | async def update_job(job_id: int, new_settings: Dict[str, Any]) -> Dict[str, Any]:
86 | """
87 | Update an existing job.
88 |
89 | Args:
90 | job_id: ID of the job to update
91 | new_settings: New job settings
92 |
93 | Returns:
94 | Empty response on success
95 |
96 | Raises:
97 | DatabricksAPIError: If the API request fails
98 | """
99 | logger.info(f"Updating job: {job_id}")
100 |
101 | update_data = {
102 | "job_id": job_id,
103 | "new_settings": new_settings
104 | }
105 |
106 | return make_api_request("POST", "/api/2.0/jobs/update", data=update_data)
107 |
108 |
109 | async def delete_job(job_id: int) -> Dict[str, Any]:
110 | """
111 | Delete a job.
112 |
113 | Args:
114 | job_id: ID of the job to delete
115 |
116 | Returns:
117 | Empty response on success
118 |
119 | Raises:
120 | DatabricksAPIError: If the API request fails
121 | """
122 | logger.info(f"Deleting job: {job_id}")
123 | return make_api_request("POST", "/api/2.0/jobs/delete", data={"job_id": job_id})
124 |
125 |
126 | async def get_run(run_id: int) -> Dict[str, Any]:
127 | """
128 | Get information about a specific job run.
129 |
130 | Args:
131 | run_id: ID of the run
132 |
133 | Returns:
134 | Response containing run information
135 |
136 | Raises:
137 | DatabricksAPIError: If the API request fails
138 | """
139 | logger.info(f"Getting information for run: {run_id}")
140 | return make_api_request("GET", "/api/2.0/jobs/runs/get", params={"run_id": run_id})
141 |
142 |
143 | async def cancel_run(run_id: int) -> Dict[str, Any]:
144 | """
145 | Cancel a job run.
146 |
147 | Args:
148 | run_id: ID of the run to cancel
149 |
150 | Returns:
151 | Empty response on success
152 |
153 | Raises:
154 | DatabricksAPIError: If the API request fails
155 | """
156 | logger.info(f"Cancelling run: {run_id}")
157 | return make_api_request("POST", "/api/2.0/jobs/runs/cancel", data={"run_id": run_id})
```
--------------------------------------------------------------------------------
/examples/direct_usage.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python
2 | """
3 | Databricks MCP Server - Direct Usage Example
4 |
5 | This example demonstrates how to directly use the Databricks MCP server
6 | without going through the MCP protocol. It shows how to instantiate the
7 | server class and call its methods directly.
8 | """
9 |
10 | import json
11 | import logging
12 | import os
13 | import sys
14 | from typing import Any, Dict, List, Optional
15 |
16 | # Add the parent directory to the Python path
17 | sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
18 |
19 | from src.server.databricks_mcp_server import DatabricksMCPServer
20 |
21 | # Set up logging
22 | logging.basicConfig(
23 | level=logging.INFO,
24 | format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
25 | )
26 | logger = logging.getLogger(__name__)
27 |
28 | def print_section_header(title: str) -> None:
29 | """Print a section header with the given title."""
30 | print(f"\n{title}")
31 | print("=" * len(title))
32 |
33 | def print_clusters(clusters: List[Dict[str, Any]]) -> None:
34 | """Print information about Databricks clusters."""
35 | print_section_header("Databricks Clusters")
36 |
37 | for i, cluster in enumerate(clusters, 1):
38 | print(f"\nCluster {i}:")
39 | print(f" ID: {cluster.get('cluster_id')}")
40 | print(f" Name: {cluster.get('cluster_name')}")
41 | print(f" State: {cluster.get('state')}")
42 | print(f" Spark Version: {cluster.get('spark_version')}")
43 | print(f" Node Type: {cluster.get('node_type_id')}")
44 |
45 | def print_notebooks(notebooks: List[Dict[str, Any]], path: str) -> None:
46 | """Print information about Databricks notebooks."""
47 | print_section_header(f"Databricks Notebooks in {path}")
48 |
49 | for notebook in notebooks:
50 | if notebook.get('object_type') == 'NOTEBOOK':
51 | print(f"\nNotebook: {notebook.get('path')}")
52 | elif notebook.get('object_type') == 'DIRECTORY':
53 | print(f"Directory: {notebook.get('path')}")
54 |
55 | def print_jobs(jobs: List[Dict[str, Any]]) -> None:
56 | """Print information about Databricks jobs."""
57 | print_section_header("Databricks Jobs")
58 |
59 | for i, job in enumerate(jobs, 1):
60 | print(f"\nJob {i}:")
61 | print(f" ID: {job.get('job_id')}")
62 | print(f" Name: {job.get('settings', {}).get('name')}")
63 | print(f" Created: {job.get('created_time')}")
64 |
65 | def main() -> None:
66 | """Main function for the direct usage example."""
67 | print("\nDatabricks MCP Server - Direct Usage Example")
68 | print("===========================================")
69 |
70 | # Check for Databricks credentials
71 | if not os.environ.get("DATABRICKS_HOST") or not os.environ.get("DATABRICKS_TOKEN"):
72 | logger.error("Please set DATABRICKS_HOST and DATABRICKS_TOKEN environment variables")
73 | sys.exit(1)
74 |
75 | # Create the Databricks MCP server
76 | server = DatabricksMCPServer()
77 |
78 | try:
79 | # List clusters
80 | logger.info("Listing Databricks clusters...")
81 | clusters_result = server.list_clusters()
82 | clusters_data = json.loads(clusters_result)
83 | if 'error' in clusters_data:
84 | logger.error(f"Error listing clusters: {clusters_data['error']}")
85 | else:
86 | print_clusters(clusters_data.get('clusters', []))
87 |
88 | # List notebooks in root path
89 | logger.info("Listing Databricks notebooks...")
90 | notebooks_result = server.list_notebooks({"path": "/"})
91 | notebooks_data = json.loads(notebooks_result)
92 | if 'error' in notebooks_data:
93 | logger.error(f"Error listing notebooks: {notebooks_data['error']}")
94 | else:
95 | print_notebooks(notebooks_data.get('objects', []), "/")
96 |
97 | # List jobs
98 | logger.info("Listing Databricks jobs...")
99 | jobs_result = server.list_jobs()
100 | jobs_data = json.loads(jobs_result)
101 | if 'error' in jobs_data:
102 | logger.error(f"Error listing jobs: {jobs_data['error']}")
103 | else:
104 | print_jobs(jobs_data.get('jobs', []))
105 |
106 | except Exception as e:
107 | logger.error(f"An error occurred: {str(e)}")
108 | sys.exit(1)
109 |
110 | if __name__ == "__main__":
111 | main()
```
--------------------------------------------------------------------------------
/src/api/notebooks.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | API for managing Databricks notebooks.
3 | """
4 |
5 | import base64
6 | import logging
7 | from typing import Any, Dict, List, Optional
8 |
9 | from src.core.utils import DatabricksAPIError, make_api_request
10 |
11 | # Configure logging
12 | logger = logging.getLogger(__name__)
13 |
14 |
15 | async def import_notebook(
16 | path: str,
17 | content: str,
18 | format: str = "SOURCE",
19 | language: Optional[str] = None,
20 | overwrite: bool = False,
21 | ) -> Dict[str, Any]:
22 | """
23 | Import a notebook into the workspace.
24 |
25 | Args:
26 | path: The path where the notebook should be stored
27 | content: The content of the notebook (base64 encoded)
28 | format: The format of the notebook (SOURCE, HTML, JUPYTER, DBC)
29 | language: The language of the notebook (SCALA, PYTHON, SQL, R)
30 | overwrite: Whether to overwrite an existing notebook
31 |
32 | Returns:
33 | Empty response on success
34 |
35 | Raises:
36 | DatabricksAPIError: If the API request fails
37 | """
38 | logger.info(f"Importing notebook to path: {path}")
39 |
40 | # Ensure content is base64 encoded
41 | if not is_base64(content):
42 | content = base64.b64encode(content.encode("utf-8")).decode("utf-8")
43 |
44 | import_data = {
45 | "path": path,
46 | "format": format,
47 | "content": content,
48 | "overwrite": overwrite,
49 | }
50 |
51 | if language:
52 | import_data["language"] = language
53 |
54 | return make_api_request("POST", "/api/2.0/workspace/import", data=import_data)
55 |
56 |
57 | async def export_notebook(
58 | path: str,
59 | format: str = "SOURCE",
60 | ) -> Dict[str, Any]:
61 | """
62 | Export a notebook from the workspace.
63 |
64 | Args:
65 | path: The path of the notebook to export
66 | format: The format to export (SOURCE, HTML, JUPYTER, DBC)
67 |
68 | Returns:
69 | Response containing the notebook content
70 |
71 | Raises:
72 | DatabricksAPIError: If the API request fails
73 | """
74 | logger.info(f"Exporting notebook from path: {path}")
75 |
76 | params = {
77 | "path": path,
78 | "format": format,
79 | }
80 |
81 | response = make_api_request("GET", "/api/2.0/workspace/export", params=params)
82 |
83 | # Optionally decode base64 content
84 | if "content" in response and format in ["SOURCE", "JUPYTER"]:
85 | try:
86 | response["decoded_content"] = base64.b64decode(response["content"]).decode("utf-8")
87 | except Exception as e:
88 | logger.warning(f"Failed to decode notebook content: {str(e)}")
89 |
90 | return response
91 |
92 |
93 | async def list_notebooks(path: str) -> Dict[str, Any]:
94 | """
95 | List notebooks in a workspace directory.
96 |
97 | Args:
98 | path: The path to list
99 |
100 | Returns:
101 | Response containing the directory listing
102 |
103 | Raises:
104 | DatabricksAPIError: If the API request fails
105 | """
106 | logger.info(f"Listing notebooks in path: {path}")
107 | return make_api_request("GET", "/api/2.0/workspace/list", params={"path": path})
108 |
109 |
110 | async def delete_notebook(path: str, recursive: bool = False) -> Dict[str, Any]:
111 | """
112 | Delete a notebook or directory.
113 |
114 | Args:
115 | path: The path to delete
116 | recursive: Whether to recursively delete directories
117 |
118 | Returns:
119 | Empty response on success
120 |
121 | Raises:
122 | DatabricksAPIError: If the API request fails
123 | """
124 | logger.info(f"Deleting path: {path}")
125 | return make_api_request(
126 | "POST",
127 | "/api/2.0/workspace/delete",
128 | data={"path": path, "recursive": recursive}
129 | )
130 |
131 |
132 | async def create_directory(path: str) -> Dict[str, Any]:
133 | """
134 | Create a directory in the workspace.
135 |
136 | Args:
137 | path: The path to create
138 |
139 | Returns:
140 | Empty response on success
141 |
142 | Raises:
143 | DatabricksAPIError: If the API request fails
144 | """
145 | logger.info(f"Creating directory: {path}")
146 | return make_api_request("POST", "/api/2.0/workspace/mkdirs", data={"path": path})
147 |
148 |
149 | def is_base64(content: str) -> bool:
150 | """
151 | Check if a string is already base64 encoded.
152 |
153 | Args:
154 | content: The string to check
155 |
156 | Returns:
157 | True if the string is base64 encoded, False otherwise
158 | """
159 | try:
160 | return base64.b64encode(base64.b64decode(content)) == content.encode('utf-8')
161 | except Exception:
162 | return False
```
--------------------------------------------------------------------------------
/src/api/sql.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | API for executing SQL statements on Databricks.
3 | """
4 |
5 | import logging
6 | from typing import Any, Dict, List, Optional
7 |
8 | from src.core.utils import DatabricksAPIError, make_api_request
9 |
10 | # Configure logging
11 | logger = logging.getLogger(__name__)
12 |
13 |
14 | async def execute_statement(
15 | statement: str,
16 | warehouse_id: str,
17 | catalog: Optional[str] = None,
18 | schema: Optional[str] = None,
19 | parameters: Optional[Dict[str, Any]] = None,
20 | row_limit: int = 10000,
21 | byte_limit: int = 100000000, # 100MB
22 | ) -> Dict[str, Any]:
23 | """
24 | Execute a SQL statement.
25 |
26 | Args:
27 | statement: The SQL statement to execute
28 | warehouse_id: ID of the SQL warehouse to use
29 | catalog: Optional catalog to use
30 | schema: Optional schema to use
31 | parameters: Optional statement parameters
32 | row_limit: Maximum number of rows to return
33 | byte_limit: Maximum number of bytes to return
34 |
35 | Returns:
36 | Response containing query results
37 |
38 | Raises:
39 | DatabricksAPIError: If the API request fails
40 | """
41 | logger.info(f"Executing SQL statement: {statement[:100]}...")
42 |
43 | request_data = {
44 | "statement": statement,
45 | "warehouse_id": warehouse_id,
46 | "wait_timeout": "0s", # Wait indefinitely
47 | "row_limit": row_limit,
48 | "byte_limit": byte_limit,
49 | }
50 |
51 | if catalog:
52 | request_data["catalog"] = catalog
53 |
54 | if schema:
55 | request_data["schema"] = schema
56 |
57 | if parameters:
58 | request_data["parameters"] = parameters
59 |
60 | return make_api_request("POST", "/api/2.0/sql/statements/execute", data=request_data)
61 |
62 |
63 | async def execute_and_wait(
64 | statement: str,
65 | warehouse_id: str,
66 | catalog: Optional[str] = None,
67 | schema: Optional[str] = None,
68 | parameters: Optional[Dict[str, Any]] = None,
69 | timeout_seconds: int = 300, # 5 minutes
70 | poll_interval_seconds: int = 1,
71 | ) -> Dict[str, Any]:
72 | """
73 | Execute a SQL statement and wait for completion.
74 |
75 | Args:
76 | statement: The SQL statement to execute
77 | warehouse_id: ID of the SQL warehouse to use
78 | catalog: Optional catalog to use
79 | schema: Optional schema to use
80 | parameters: Optional statement parameters
81 | timeout_seconds: Maximum time to wait for completion
82 | poll_interval_seconds: How often to poll for status
83 |
84 | Returns:
85 | Response containing query results
86 |
87 | Raises:
88 | DatabricksAPIError: If the API request fails
89 | TimeoutError: If query execution times out
90 | """
91 | import asyncio
92 | import time
93 |
94 | logger.info(f"Executing SQL statement with waiting: {statement[:100]}...")
95 |
96 | # Start execution
97 | response = await execute_statement(
98 | statement=statement,
99 | warehouse_id=warehouse_id,
100 | catalog=catalog,
101 | schema=schema,
102 | parameters=parameters,
103 | )
104 |
105 | statement_id = response.get("statement_id")
106 | if not statement_id:
107 | raise ValueError("No statement_id returned from execution")
108 |
109 | # Poll for completion
110 | start_time = time.time()
111 | status = response.get("status", {}).get("state", "")
112 |
113 | while status in ["PENDING", "RUNNING"]:
114 | # Check timeout
115 | if time.time() - start_time > timeout_seconds:
116 | raise TimeoutError(f"Query execution timed out after {timeout_seconds} seconds")
117 |
118 | # Wait before polling again
119 | await asyncio.sleep(poll_interval_seconds)
120 |
121 | # Check status
122 | status_response = await get_statement_status(statement_id)
123 | status = status_response.get("status", {}).get("state", "")
124 |
125 | if status == "SUCCEEDED":
126 | return status_response
127 | elif status in ["FAILED", "CANCELED", "CLOSED"]:
128 | error_message = status_response.get("status", {}).get("error", {}).get("message", "Unknown error")
129 | raise DatabricksAPIError(f"Query execution failed: {error_message}", response=status_response)
130 |
131 | return response
132 |
133 |
134 | async def get_statement_status(statement_id: str) -> Dict[str, Any]:
135 | """
136 | Get the status of a SQL statement.
137 |
138 | Args:
139 | statement_id: ID of the statement to check
140 |
141 | Returns:
142 | Response containing statement status
143 |
144 | Raises:
145 | DatabricksAPIError: If the API request fails
146 | """
147 | logger.info(f"Getting status of SQL statement: {statement_id}")
148 | return make_api_request("GET", f"/api/2.0/sql/statements/{statement_id}", params={})
149 |
150 |
151 | async def cancel_statement(statement_id: str) -> Dict[str, Any]:
152 | """
153 | Cancel a running SQL statement.
154 |
155 | Args:
156 | statement_id: ID of the statement to cancel
157 |
158 | Returns:
159 | Empty response on success
160 |
161 | Raises:
162 | DatabricksAPIError: If the API request fails
163 | """
164 | logger.info(f"Cancelling SQL statement: {statement_id}")
165 | return make_api_request("POST", f"/api/2.0/sql/statements/{statement_id}/cancel", data={})
```
--------------------------------------------------------------------------------
/tests/test_tools.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for individual tools in the Databricks MCP server.
3 |
4 | This module contains tests for each individual tool in the Databricks MCP server.
5 | """
6 |
7 | import asyncio
8 | import json
9 | import logging
10 | import sys
11 | from typing import Dict, Any, List
12 |
13 | from src.server.databricks_mcp_server import DatabricksMCPServer
14 |
15 | # Configure logging
16 | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
17 | logger = logging.getLogger(__name__)
18 |
19 |
20 | async def test_list_clusters():
21 | """Test the list_clusters tool."""
22 | logger.info("Testing list_clusters tool")
23 | server = DatabricksMCPServer()
24 |
25 | result = await server.call_tool("list_clusters", {"params": {}})
26 |
27 | # Check if result is valid
28 | assert isinstance(result, List), "Result should be a List"
29 | assert len(result) > 0, "Result should not be empty"
30 | assert hasattr(result[0], 'text'), "Result item should have 'text' attribute"
31 |
32 | # Parse the JSON data
33 | text = result[0].text
34 | data = json.loads(text)
35 |
36 | assert 'text' in data, "Result should contain 'text' field"
37 | inner_data = json.loads(data['text'])
38 |
39 | assert 'clusters' in inner_data, "Result should contain 'clusters' field"
40 | logger.info(f"Found {len(inner_data['clusters'])} clusters")
41 |
42 | return True
43 |
44 |
45 | async def test_list_notebooks():
46 | """Test the list_notebooks tool."""
47 | logger.info("Testing list_notebooks tool")
48 | server = DatabricksMCPServer()
49 |
50 | result = await server.call_tool("list_notebooks", {"params": {"path": "/"}})
51 |
52 | # Check if result is valid
53 | assert isinstance(result, List), "Result should be a List"
54 | assert len(result) > 0, "Result should not be empty"
55 | assert hasattr(result[0], 'text'), "Result item should have 'text' attribute"
56 |
57 | # Parse the JSON data
58 | text = result[0].text
59 | data = json.loads(text)
60 |
61 | assert 'text' in data, "Result should contain 'text' field"
62 | inner_data = json.loads(data['text'])
63 |
64 | assert 'objects' in inner_data, "Result should contain 'objects' field"
65 | logger.info(f"Found {len(inner_data['objects'])} objects")
66 |
67 | return True
68 |
69 |
70 | async def test_list_jobs():
71 | """Test the list_jobs tool."""
72 | logger.info("Testing list_jobs tool")
73 | server = DatabricksMCPServer()
74 |
75 | result = await server.call_tool("list_jobs", {"params": {}})
76 |
77 | # Check if result is valid
78 | assert isinstance(result, List), "Result should be a List"
79 | assert len(result) > 0, "Result should not be empty"
80 | assert hasattr(result[0], 'text'), "Result item should have 'text' attribute"
81 |
82 | # Parse the JSON data
83 | text = result[0].text
84 | data = json.loads(text)
85 |
86 | assert 'text' in data, "Result should contain 'text' field"
87 | inner_data = json.loads(data['text'])
88 |
89 | assert 'jobs' in inner_data, "Result should contain 'jobs' field"
90 | logger.info(f"Found {len(inner_data['jobs'])} jobs")
91 |
92 | return True
93 |
94 |
95 | async def test_list_files():
96 | """Test the list_files tool."""
97 | logger.info("Testing list_files tool")
98 | server = DatabricksMCPServer()
99 |
100 | result = await server.call_tool("list_files", {"params": {"dbfs_path": "/"}})
101 |
102 | # Check if result is valid
103 | assert isinstance(result, List), "Result should be a List"
104 | assert len(result) > 0, "Result should not be empty"
105 | assert hasattr(result[0], 'text'), "Result item should have 'text' attribute"
106 |
107 | # Parse the JSON data
108 | text = result[0].text
109 | data = json.loads(text)
110 |
111 | assert 'text' in data, "Result should contain 'text' field"
112 | inner_data = json.loads(data['text'])
113 |
114 | assert 'files' in inner_data, "Result should contain 'files' field"
115 | logger.info(f"Found {len(inner_data['files'])} files")
116 |
117 | return True
118 |
119 |
120 | async def main():
121 | """Run all tool tests."""
122 | logger.info("Running tool tests for Databricks MCP server")
123 |
124 | try:
125 | # Run tests
126 | tests = [
127 | ("list_clusters", test_list_clusters),
128 | ("list_notebooks", test_list_notebooks),
129 | ("list_jobs", test_list_jobs),
130 | ("list_files", test_list_files),
131 | ]
132 |
133 | success = True
134 | for name, test_func in tests:
135 | try:
136 | logger.info(f"Running test for {name}")
137 | result = await test_func()
138 | if result:
139 | logger.info(f"Test for {name} passed")
140 | else:
141 | logger.error(f"Test for {name} failed")
142 | success = False
143 | except Exception as e:
144 | logger.error(f"Error in test for {name}: {e}", exc_info=True)
145 | success = False
146 |
147 | if success:
148 | logger.info("All tool tests passed!")
149 | return 0
150 | else:
151 | logger.error("Some tool tests failed")
152 | return 1
153 | except Exception as e:
154 | logger.error(f"Error in tests: {e}", exc_info=True)
155 | return 1
156 |
157 |
158 | if __name__ == "__main__":
159 | sys.exit(asyncio.run(main()))
```
--------------------------------------------------------------------------------
/tests/test_clusters.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the clusters API.
3 | """
4 |
5 | import json
6 | import os
7 | from unittest.mock import AsyncMock, MagicMock, patch
8 |
9 | import pytest
10 | from fastapi import status
11 | from fastapi.testclient import TestClient
12 |
13 | from src.api import clusters
14 | from src.server.app import create_app
15 |
16 |
17 | @pytest.fixture
18 | def client():
19 | """Create a test client for the API."""
20 | app = create_app()
21 | return TestClient(app)
22 |
23 |
24 | @pytest.fixture
25 | def mock_cluster_response():
26 | """Mock response for cluster operations."""
27 | return {
28 | "cluster_id": "1234-567890-abcdef",
29 | "cluster_name": "Test Cluster",
30 | "spark_version": "10.4.x-scala2.12",
31 | "node_type_id": "Standard_D3_v2",
32 | "num_workers": 2,
33 | "state": "RUNNING",
34 | "creator_user_name": "[email protected]",
35 | }
36 |
37 |
38 | @pytest.mark.asyncio
39 | async def test_create_cluster():
40 | """Test creating a cluster."""
41 | # Mock the API call
42 | clusters.create_cluster = AsyncMock(return_value={"cluster_id": "1234-567890-abcdef"})
43 |
44 | # Create cluster config
45 | cluster_config = {
46 | "cluster_name": "Test Cluster",
47 | "spark_version": "10.4.x-scala2.12",
48 | "node_type_id": "Standard_D3_v2",
49 | "num_workers": 2,
50 | }
51 |
52 | # Call the function
53 | response = await clusters.create_cluster(cluster_config)
54 |
55 | # Check the response
56 | assert response["cluster_id"] == "1234-567890-abcdef"
57 |
58 | # Verify the mock was called with the correct arguments
59 | clusters.create_cluster.assert_called_once_with(cluster_config)
60 |
61 |
62 | @pytest.mark.asyncio
63 | async def test_list_clusters():
64 | """Test listing clusters."""
65 | # Mock the API call
66 | mock_response = {
67 | "clusters": [
68 | {
69 | "cluster_id": "1234-567890-abcdef",
70 | "cluster_name": "Test Cluster 1",
71 | "state": "RUNNING",
72 | },
73 | {
74 | "cluster_id": "9876-543210-fedcba",
75 | "cluster_name": "Test Cluster 2",
76 | "state": "TERMINATED",
77 | },
78 | ]
79 | }
80 | clusters.list_clusters = AsyncMock(return_value=mock_response)
81 |
82 | # Call the function
83 | response = await clusters.list_clusters()
84 |
85 | # Check the response
86 | assert len(response["clusters"]) == 2
87 | assert response["clusters"][0]["cluster_id"] == "1234-567890-abcdef"
88 | assert response["clusters"][1]["cluster_id"] == "9876-543210-fedcba"
89 |
90 | # Verify the mock was called
91 | clusters.list_clusters.assert_called_once()
92 |
93 |
94 | @pytest.mark.asyncio
95 | async def test_get_cluster():
96 | """Test getting cluster information."""
97 | # Mock the API call
98 | mock_response = {
99 | "cluster_id": "1234-567890-abcdef",
100 | "cluster_name": "Test Cluster",
101 | "state": "RUNNING",
102 | }
103 | clusters.get_cluster = AsyncMock(return_value=mock_response)
104 |
105 | # Call the function
106 | response = await clusters.get_cluster("1234-567890-abcdef")
107 |
108 | # Check the response
109 | assert response["cluster_id"] == "1234-567890-abcdef"
110 | assert response["state"] == "RUNNING"
111 |
112 | # Verify the mock was called with the correct arguments
113 | clusters.get_cluster.assert_called_once_with("1234-567890-abcdef")
114 |
115 |
116 | @pytest.mark.asyncio
117 | async def test_terminate_cluster():
118 | """Test terminating a cluster."""
119 | # Mock the API call
120 | clusters.terminate_cluster = AsyncMock(return_value={})
121 |
122 | # Call the function
123 | response = await clusters.terminate_cluster("1234-567890-abcdef")
124 |
125 | # Check the response
126 | assert response == {}
127 |
128 | # Verify the mock was called with the correct arguments
129 | clusters.terminate_cluster.assert_called_once_with("1234-567890-abcdef")
130 |
131 |
132 | @pytest.mark.asyncio
133 | async def test_start_cluster():
134 | """Test starting a cluster."""
135 | # Mock the API call
136 | clusters.start_cluster = AsyncMock(return_value={})
137 |
138 | # Call the function
139 | response = await clusters.start_cluster("1234-567890-abcdef")
140 |
141 | # Check the response
142 | assert response == {}
143 |
144 | # Verify the mock was called with the correct arguments
145 | clusters.start_cluster.assert_called_once_with("1234-567890-abcdef")
146 |
147 |
148 | @pytest.mark.asyncio
149 | async def test_resize_cluster():
150 | """Test resizing a cluster."""
151 | # Mock the API call
152 | clusters.resize_cluster = AsyncMock(return_value={})
153 |
154 | # Call the function
155 | response = await clusters.resize_cluster("1234-567890-abcdef", 4)
156 |
157 | # Check the response
158 | assert response == {}
159 |
160 | # Verify the mock was called with the correct arguments
161 | clusters.resize_cluster.assert_called_once_with("1234-567890-abcdef", 4)
162 |
163 |
164 | @pytest.mark.asyncio
165 | async def test_restart_cluster():
166 | """Test restarting a cluster."""
167 | # Mock the API call
168 | clusters.restart_cluster = AsyncMock(return_value={})
169 |
170 | # Call the function
171 | response = await clusters.restart_cluster("1234-567890-abcdef")
172 |
173 | # Check the response
174 | assert response == {}
175 |
176 | # Verify the mock was called with the correct arguments
177 | clusters.restart_cluster.assert_called_once_with("1234-567890-abcdef")
```
--------------------------------------------------------------------------------
/tests/test_mcp_client.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | MCP client tests for the Databricks MCP server.
3 |
4 | This module contains tests that use the MCP client to connect to and test the server.
5 | """
6 |
7 | import asyncio
8 | import json
9 | import logging
10 | import os
11 | import sys
12 | from typing import Any, Dict, List, Optional
13 |
14 | import pytest
15 | from mcp.client.stdio import StdioServerParameters, stdio_client
16 | from mcp.client.session import ClientSession
17 |
18 | # Configure logging
19 | logging.basicConfig(
20 | level=logging.INFO,
21 | format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
22 | )
23 | logger = logging.getLogger(__name__)
24 |
25 |
26 | async def run_tests():
27 | """Connect to and test the Databricks MCP server."""
28 | logger.info("Connecting to Databricks MCP server...")
29 |
30 | # IMPORTANT: In MCP, the client launches the server process
31 | # We don't connect to an already running server!
32 |
33 | # Define the environment variables the server needs
34 | env = os.environ.copy()
35 |
36 | # Create parameters for connecting to the server
37 | # This will launch the server using the PowerShell script
38 | params = StdioServerParameters(
39 | command="pwsh", # Use PowerShell
40 | args=["-File", "./scripts/start_server.ps1"], # Run the startup script
41 | env=env # Pass environment variables
42 | )
43 |
44 | # Use the client to start the server and connect to it
45 | logger.info("Launching server process...")
46 |
47 | try:
48 | async with stdio_client(params) as (recv, send):
49 | logger.info("Server launched, creating session...")
50 | session = ClientSession(recv, send)
51 |
52 | logger.info("Initializing session...")
53 | await session.initialize()
54 |
55 | # List available tools
56 | tools_response = await session.list_tools()
57 | tool_names = [t.name for t in tools_response.tools]
58 | logger.info(f"Available tools: {tool_names}")
59 |
60 | # Run tests for clusters
61 | if "list_clusters" in tool_names:
62 | await test_list_clusters(session)
63 | await test_get_cluster(session)
64 | else:
65 | logger.warning("Cluster tools not available")
66 |
67 | # Run tests for notebooks
68 | if "list_notebooks" in tool_names:
69 | await test_list_notebooks(session)
70 | await test_export_notebook(session)
71 | else:
72 | logger.warning("Notebook tools not available")
73 |
74 | logger.info("All tests completed successfully!")
75 | return True
76 | except Exception as e:
77 | logger.error(f"Error during tests: {e}", exc_info=True)
78 | return False
79 |
80 |
81 | # Skip all these tests until we fix the hanging issues
82 | @pytest.mark.skip(reason="Test causes hanging issues - needs further investigation")
83 | @pytest.mark.asyncio
84 | async def test_list_clusters(session):
85 | """Test listing clusters."""
86 | logger.info("Testing list_clusters...")
87 | response = await session.call_tool("list_clusters", {})
88 | logger.info(f"list_clusters response: {json.dumps(response, indent=2)}")
89 | assert "clusters" in response, "Response should contain 'clusters' key"
90 | return response
91 |
92 |
93 | @pytest.mark.skip(reason="Test causes hanging issues - needs further investigation")
94 | @pytest.mark.asyncio
95 | async def test_get_cluster(session):
96 | """Test getting cluster details."""
97 | logger.info("Testing get_cluster...")
98 |
99 | # First list clusters to get a cluster_id
100 | clusters_response = await test_list_clusters(session)
101 | if not clusters_response.get("clusters"):
102 | logger.warning("No clusters found to test get_cluster")
103 | return
104 |
105 | # Get the first cluster ID
106 | cluster_id = clusters_response["clusters"][0]["cluster_id"]
107 |
108 | # Get cluster details
109 | response = await session.call_tool("get_cluster", {"cluster_id": cluster_id})
110 | logger.info(f"get_cluster response: {json.dumps(response, indent=2)}")
111 | assert "cluster_id" in response, "Response should contain 'cluster_id' key"
112 | assert response["cluster_id"] == cluster_id, "Returned cluster ID should match requested ID"
113 |
114 |
115 | @pytest.mark.skip(reason="Test causes hanging issues - needs further investigation")
116 | @pytest.mark.asyncio
117 | async def test_list_notebooks(session):
118 | """Test listing notebooks."""
119 | logger.info("Testing list_notebooks...")
120 | response = await session.call_tool("list_notebooks", {"path": "/"})
121 | logger.info(f"list_notebooks response: {json.dumps(response, indent=2)}")
122 | assert "objects" in response, "Response should contain 'objects' key"
123 | return response
124 |
125 |
126 | @pytest.mark.skip(reason="Test causes hanging issues - needs further investigation")
127 | @pytest.mark.asyncio
128 | async def test_export_notebook(session):
129 | """Test exporting a notebook."""
130 | logger.info("Testing export_notebook...")
131 |
132 | # First list notebooks to get a notebook path
133 | notebooks_response = await test_list_notebooks(session)
134 | if not notebooks_response.get("objects"):
135 | logger.warning("No notebooks found to test export_notebook")
136 | return
137 |
138 | # Find the first notebook (not a directory)
139 | notebook = None
140 | for obj in notebooks_response["objects"]:
141 | if obj.get("object_type") == "NOTEBOOK":
142 | notebook = obj
143 | break
144 |
145 | if not notebook:
146 | logger.warning("No notebooks found to test export_notebook")
147 | return
148 |
149 | # Get notebook path
150 | notebook_path = notebook["path"]
151 |
152 | # Export notebook
153 | response = await session.call_tool(
154 | "export_notebook",
155 | {"path": notebook_path, "format": "SOURCE"}
156 | )
157 | logger.info(f"export_notebook response (truncated): {str(response)[:200]}...")
158 | assert "content" in response, "Response should contain 'content' key"
159 |
160 |
161 | async def main():
162 | """Run the tests."""
163 | success = await run_tests()
164 | return 0 if success else 1
165 |
166 |
167 | if __name__ == "__main__":
168 | """Run the tests directly."""
169 | sys.exit(asyncio.run(main()))
```
--------------------------------------------------------------------------------
/tests/test_mcp_server.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the Databricks MCP server.
3 |
4 | This test file connects to the MCP server using the MCP client library
5 | and tests the cluster and notebook operations.
6 | """
7 |
8 | import asyncio
9 | import json
10 | import logging
11 | import os
12 | import subprocess
13 | import sys
14 | import time
15 | from typing import Any, Dict, List, Optional, Tuple
16 |
17 | import anyio
18 | import pytest
19 | from mcp.client.session import ClientSession
20 | from mcp.client.stdio import StdioServerParameters, stdio_client
21 |
22 | # Configure logging
23 | logging.basicConfig(
24 | level=logging.INFO,
25 | format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
26 | handlers=[logging.StreamHandler(sys.stdout)],
27 | )
28 | logger = logging.getLogger(__name__)
29 |
30 |
31 | class DatabricksMCPClient:
32 | """Client for testing the Databricks MCP server."""
33 |
34 | def __init__(self):
35 | self.session: Optional[ClientSession] = None
36 | self.stdio_transport: Optional[Tuple[Any, Any]] = None
37 | self.server_process: Optional[subprocess.Popen] = None
38 |
39 | async def connect(self):
40 | """Connect to the MCP server."""
41 | logger.info("Starting Databricks MCP server...")
42 |
43 | # Set up environment variables if needed
44 | # os.environ["DATABRICKS_HOST"] = "..."
45 | # os.environ["DATABRICKS_TOKEN"] = "..."
46 |
47 | # Start the server with SkipPrompt flag to avoid interactive prompts
48 | cmd = ["pwsh", "-File", "start_mcp_server.ps1", "-SkipPrompt"]
49 | self.server_process = subprocess.Popen(
50 | cmd,
51 | stdout=subprocess.PIPE,
52 | stderr=subprocess.PIPE,
53 | text=True,
54 | bufsize=1
55 | )
56 |
57 | # Wait for server to start
58 | time.sleep(2)
59 |
60 | # Connect to the server with SkipPrompt flag
61 | logger.info("Connecting to MCP server...")
62 | params = StdioServerParameters(
63 | command="pwsh",
64 | args=["-File", "start_mcp_server.ps1", "-SkipPrompt"],
65 | env=None
66 | )
67 |
68 | async with anyio.create_task_group() as tg:
69 | async with stdio_client(params) as stdio_transport:
70 | self.stdio_transport = stdio_transport
71 | stdio, write = stdio_transport
72 | self.session = ClientSession(stdio, write)
73 | await self.session.initialize()
74 |
75 | # Log available tools
76 | tools_response = await self.session.list_tools()
77 | logger.info(f"Available tools: {[t.name for t in tools_response.tools]}")
78 |
79 | # Run tests and then exit
80 | await tg.start(self.run_tests)
81 |
82 | async def run_tests(self):
83 | """Run the tests for the Databricks MCP server."""
84 | try:
85 | await self.test_list_clusters()
86 | await self.test_get_cluster()
87 | await self.test_list_notebooks()
88 | await self.test_export_notebook()
89 | logger.info("All tests completed successfully!")
90 | except Exception as e:
91 | logger.error(f"Test failed: {e}")
92 | raise
93 | finally:
94 | if self.server_process:
95 | self.server_process.terminate()
96 |
97 | async def test_list_clusters(self):
98 | """Test listing clusters."""
99 | logger.info("Testing list_clusters...")
100 | response = await self.session.call_tool("list_clusters", {})
101 | logger.info(f"list_clusters response: {json.dumps(response, indent=2)}")
102 | assert "clusters" in response, "Response should contain 'clusters' key"
103 | return response
104 |
105 | async def test_get_cluster(self):
106 | """Test getting cluster details."""
107 | logger.info("Testing get_cluster...")
108 |
109 | # First list clusters to get a cluster_id
110 | clusters_response = await self.test_list_clusters()
111 | if not clusters_response.get("clusters"):
112 | logger.warning("No clusters found to test get_cluster")
113 | return
114 |
115 | # Get the first cluster ID
116 | cluster_id = clusters_response["clusters"][0]["cluster_id"]
117 |
118 | # Get cluster details
119 | response = await self.session.call_tool("get_cluster", {"cluster_id": cluster_id})
120 | logger.info(f"get_cluster response: {json.dumps(response, indent=2)}")
121 | assert "cluster_id" in response, "Response should contain 'cluster_id' key"
122 | assert response["cluster_id"] == cluster_id, "Returned cluster ID should match requested ID"
123 |
124 | async def test_list_notebooks(self):
125 | """Test listing notebooks."""
126 | logger.info("Testing list_notebooks...")
127 | response = await self.session.call_tool("list_notebooks", {"path": "/"})
128 | logger.info(f"list_notebooks response: {json.dumps(response, indent=2)}")
129 | assert "objects" in response, "Response should contain 'objects' key"
130 | return response
131 |
132 | async def test_export_notebook(self):
133 | """Test exporting a notebook."""
134 | logger.info("Testing export_notebook...")
135 |
136 | # First list notebooks to get a notebook path
137 | notebooks_response = await self.test_list_notebooks()
138 | if not notebooks_response.get("objects"):
139 | logger.warning("No notebooks found to test export_notebook")
140 | return
141 |
142 | # Find the first notebook (not a directory)
143 | notebook = None
144 | for obj in notebooks_response["objects"]:
145 | if obj.get("object_type") == "NOTEBOOK":
146 | notebook = obj
147 | break
148 |
149 | if not notebook:
150 | logger.warning("No notebooks found to test export_notebook")
151 | return
152 |
153 | # Get notebook path
154 | notebook_path = notebook["path"]
155 |
156 | # Export notebook
157 | response = await self.session.call_tool(
158 | "export_notebook",
159 | {"path": notebook_path, "format": "SOURCE"}
160 | )
161 | logger.info(f"export_notebook response (truncated): {str(response)[:200]}...")
162 | assert "content" in response, "Response should contain 'content' key"
163 |
164 |
165 | # Skip this test for now as it causes hanging issues
166 | @pytest.mark.skip(reason="Test causes hanging issues - needs further investigation")
167 | @pytest.mark.asyncio
168 | async def test_databricks_mcp_server():
169 | """Test the Databricks MCP server."""
170 | client = DatabricksMCPClient()
171 | await client.connect()
172 |
173 |
174 | if __name__ == "__main__":
175 | """Run the tests directly."""
176 | asyncio.run(DatabricksMCPClient().connect())
```
--------------------------------------------------------------------------------
/src/api/dbfs.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | API for managing Databricks File System (DBFS).
3 | """
4 |
5 | import base64
6 | import logging
7 | import os
8 | from typing import Any, Dict, List, Optional, BinaryIO
9 |
10 | from src.core.utils import DatabricksAPIError, make_api_request
11 |
12 | # Configure logging
13 | logger = logging.getLogger(__name__)
14 |
15 |
16 | async def put_file(
17 | dbfs_path: str,
18 | file_content: bytes,
19 | overwrite: bool = True,
20 | ) -> Dict[str, Any]:
21 | """
22 | Upload a file to DBFS.
23 |
24 | Args:
25 | dbfs_path: The path where the file should be stored in DBFS
26 | file_content: The content of the file as bytes
27 | overwrite: Whether to overwrite an existing file
28 |
29 | Returns:
30 | Empty response on success
31 |
32 | Raises:
33 | DatabricksAPIError: If the API request fails
34 | """
35 | logger.info(f"Uploading file to DBFS path: {dbfs_path}")
36 |
37 | # Convert bytes to base64
38 | content_base64 = base64.b64encode(file_content).decode("utf-8")
39 |
40 | return make_api_request(
41 | "POST",
42 | "/api/2.0/dbfs/put",
43 | data={
44 | "path": dbfs_path,
45 | "contents": content_base64,
46 | "overwrite": overwrite,
47 | },
48 | )
49 |
50 |
51 | async def upload_large_file(
52 | dbfs_path: str,
53 | local_file_path: str,
54 | overwrite: bool = True,
55 | buffer_size: int = 1024 * 1024, # 1MB chunks
56 | ) -> Dict[str, Any]:
57 | """
58 | Upload a large file to DBFS in chunks.
59 |
60 | Args:
61 | dbfs_path: The path where the file should be stored in DBFS
62 | local_file_path: Local path to the file to upload
63 | overwrite: Whether to overwrite an existing file
64 | buffer_size: Size of chunks to upload
65 |
66 | Returns:
67 | Empty response on success
68 |
69 | Raises:
70 | DatabricksAPIError: If the API request fails
71 | FileNotFoundError: If the local file does not exist
72 | """
73 | logger.info(f"Uploading large file from {local_file_path} to DBFS path: {dbfs_path}")
74 |
75 | if not os.path.exists(local_file_path):
76 | raise FileNotFoundError(f"Local file not found: {local_file_path}")
77 |
78 | # Create a handle for the upload
79 | create_response = make_api_request(
80 | "POST",
81 | "/api/2.0/dbfs/create",
82 | data={
83 | "path": dbfs_path,
84 | "overwrite": overwrite,
85 | },
86 | )
87 |
88 | handle = create_response.get("handle")
89 |
90 | try:
91 | with open(local_file_path, "rb") as f:
92 | chunk_index = 0
93 | while True:
94 | chunk = f.read(buffer_size)
95 | if not chunk:
96 | break
97 |
98 | # Convert chunk to base64
99 | chunk_base64 = base64.b64encode(chunk).decode("utf-8")
100 |
101 | # Add to handle
102 | make_api_request(
103 | "POST",
104 | "/api/2.0/dbfs/add-block",
105 | data={
106 | "handle": handle,
107 | "data": chunk_base64,
108 | },
109 | )
110 |
111 | chunk_index += 1
112 | logger.debug(f"Uploaded chunk {chunk_index}")
113 |
114 | # Close the handle
115 | return make_api_request(
116 | "POST",
117 | "/api/2.0/dbfs/close",
118 | data={"handle": handle},
119 | )
120 |
121 | except Exception as e:
122 | # Attempt to abort the upload on error
123 | try:
124 | make_api_request(
125 | "POST",
126 | "/api/2.0/dbfs/close",
127 | data={"handle": handle},
128 | )
129 | except Exception:
130 | pass
131 |
132 | logger.error(f"Error uploading file: {str(e)}")
133 | raise
134 |
135 |
136 | async def get_file(
137 | dbfs_path: str,
138 | offset: int = 0,
139 | length: int = 1024 * 1024, # Default to 1MB
140 | ) -> Dict[str, Any]:
141 | """
142 | Get the contents of a file from DBFS.
143 |
144 | Args:
145 | dbfs_path: The path of the file in DBFS
146 | offset: Starting byte position
147 | length: Number of bytes to read
148 |
149 | Returns:
150 | Response containing the file content
151 |
152 | Raises:
153 | DatabricksAPIError: If the API request fails
154 | """
155 | logger.info(f"Reading file from DBFS path: {dbfs_path}")
156 |
157 | response = make_api_request(
158 | "GET",
159 | "/api/2.0/dbfs/read",
160 | params={
161 | "path": dbfs_path,
162 | "offset": offset,
163 | "length": length,
164 | },
165 | )
166 |
167 | # Decode base64 content
168 | if "data" in response:
169 | try:
170 | response["decoded_data"] = base64.b64decode(response["data"])
171 | except Exception as e:
172 | logger.warning(f"Failed to decode file content: {str(e)}")
173 |
174 | return response
175 |
176 |
177 | async def list_files(dbfs_path: str) -> Dict[str, Any]:
178 | """
179 | List files and directories in a DBFS path.
180 |
181 | Args:
182 | dbfs_path: The path to list
183 |
184 | Returns:
185 | Response containing the directory listing
186 |
187 | Raises:
188 | DatabricksAPIError: If the API request fails
189 | """
190 | logger.info(f"Listing files in DBFS path: {dbfs_path}")
191 | return make_api_request("GET", "/api/2.0/dbfs/list", params={"path": dbfs_path})
192 |
193 |
194 | async def delete_file(
195 | dbfs_path: str,
196 | recursive: bool = False,
197 | ) -> Dict[str, Any]:
198 | """
199 | Delete a file or directory from DBFS.
200 |
201 | Args:
202 | dbfs_path: The path to delete
203 | recursive: Whether to recursively delete directories
204 |
205 | Returns:
206 | Empty response on success
207 |
208 | Raises:
209 | DatabricksAPIError: If the API request fails
210 | """
211 | logger.info(f"Deleting DBFS path: {dbfs_path}")
212 | return make_api_request(
213 | "POST",
214 | "/api/2.0/dbfs/delete",
215 | data={
216 | "path": dbfs_path,
217 | "recursive": recursive,
218 | },
219 | )
220 |
221 |
222 | async def get_status(dbfs_path: str) -> Dict[str, Any]:
223 | """
224 | Get the status of a file or directory.
225 |
226 | Args:
227 | dbfs_path: The path to check
228 |
229 | Returns:
230 | Response containing file status
231 |
232 | Raises:
233 | DatabricksAPIError: If the API request fails
234 | """
235 | logger.info(f"Getting status of DBFS path: {dbfs_path}")
236 | return make_api_request("GET", "/api/2.0/dbfs/get-status", params={"path": dbfs_path})
237 |
238 |
239 | async def create_directory(dbfs_path: str) -> Dict[str, Any]:
240 | """
241 | Create a directory in DBFS.
242 |
243 | Args:
244 | dbfs_path: The path to create
245 |
246 | Returns:
247 | Empty response on success
248 |
249 | Raises:
250 | DatabricksAPIError: If the API request fails
251 | """
252 | logger.info(f"Creating DBFS directory: {dbfs_path}")
253 | return make_api_request("POST", "/api/2.0/dbfs/mkdirs", data={"path": dbfs_path})
```
--------------------------------------------------------------------------------
/src/server/databricks_mcp_server.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Databricks MCP Server
3 |
4 | This module implements a standalone MCP server that provides tools for interacting
5 | with Databricks APIs. It follows the Model Context Protocol standard, communicating
6 | via stdio and directly connecting to Databricks when tools are invoked.
7 | """
8 |
9 | import asyncio
10 | import json
11 | import logging
12 | import sys
13 | import os
14 | from typing import Any, Dict, List, Optional, Union, cast
15 |
16 | from mcp.server import FastMCP
17 | from mcp.types import TextContent
18 | from mcp.server.stdio import stdio_server
19 |
20 | from src.api import clusters, dbfs, jobs, notebooks, sql
21 | from src.core.config import settings
22 |
23 | # Configure logging
24 | logging.basicConfig(
25 | level=getattr(logging, settings.LOG_LEVEL),
26 | filename="databricks_mcp.log",
27 | format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
28 | )
29 | logger = logging.getLogger(__name__)
30 |
31 |
32 | class DatabricksMCPServer(FastMCP):
33 | """An MCP server for Databricks APIs."""
34 |
35 | def __init__(self):
36 | """Initialize the Databricks MCP server."""
37 | super().__init__(name="databricks-mcp",
38 | version="1.0.0",
39 | instructions="Use this server to manage Databricks resources")
40 | logger.info("Initializing Databricks MCP server")
41 | logger.info(f"Databricks host: {settings.DATABRICKS_HOST}")
42 |
43 | # Register tools
44 | self._register_tools()
45 |
46 | def _register_tools(self):
47 | """Register all Databricks MCP tools."""
48 |
49 | # Cluster management tools
50 | @self.tool(
51 | name="list_clusters",
52 | description="List all Databricks clusters",
53 | )
54 | async def list_clusters(params: Dict[str, Any]) -> List[TextContent]:
55 | logger.info(f"Listing clusters with params: {params}")
56 | try:
57 | result = await clusters.list_clusters()
58 | return [{"text": json.dumps(result)}]
59 | except Exception as e:
60 | logger.error(f"Error listing clusters: {str(e)}")
61 | return [{"text": json.dumps({"error": str(e)})}]
62 |
63 | @self.tool(
64 | name="create_cluster",
65 | description="Create a new Databricks cluster with parameters: cluster_name (required), spark_version (required), node_type_id (required), num_workers, autotermination_minutes",
66 | )
67 | async def create_cluster(params: Dict[str, Any]) -> List[TextContent]:
68 | logger.info(f"Creating cluster with params: {params}")
69 | try:
70 | result = await clusters.create_cluster(params)
71 | return [{"text": json.dumps(result)}]
72 | except Exception as e:
73 | logger.error(f"Error creating cluster: {str(e)}")
74 | return [{"text": json.dumps({"error": str(e)})}]
75 |
76 | @self.tool(
77 | name="terminate_cluster",
78 | description="Terminate a Databricks cluster with parameter: cluster_id (required)",
79 | )
80 | async def terminate_cluster(params: Dict[str, Any]) -> List[TextContent]:
81 | logger.info(f"Terminating cluster with params: {params}")
82 | try:
83 | result = await clusters.terminate_cluster(params.get("cluster_id"))
84 | return [{"text": json.dumps(result)}]
85 | except Exception as e:
86 | logger.error(f"Error terminating cluster: {str(e)}")
87 | return [{"text": json.dumps({"error": str(e)})}]
88 |
89 | @self.tool(
90 | name="get_cluster",
91 | description="Get information about a specific Databricks cluster with parameter: cluster_id (required)",
92 | )
93 | async def get_cluster(params: Dict[str, Any]) -> List[TextContent]:
94 | logger.info(f"Getting cluster info with params: {params}")
95 | try:
96 | result = await clusters.get_cluster(params.get("cluster_id"))
97 | return [{"text": json.dumps(result)}]
98 | except Exception as e:
99 | logger.error(f"Error getting cluster info: {str(e)}")
100 | return [{"text": json.dumps({"error": str(e)})}]
101 |
102 | @self.tool(
103 | name="start_cluster",
104 | description="Start a terminated Databricks cluster with parameter: cluster_id (required)",
105 | )
106 | async def start_cluster(params: Dict[str, Any]) -> List[TextContent]:
107 | logger.info(f"Starting cluster with params: {params}")
108 | try:
109 | result = await clusters.start_cluster(params.get("cluster_id"))
110 | return [{"text": json.dumps(result)}]
111 | except Exception as e:
112 | logger.error(f"Error starting cluster: {str(e)}")
113 | return [{"text": json.dumps({"error": str(e)})}]
114 |
115 | # Job management tools
116 | @self.tool(
117 | name="list_jobs",
118 | description="List all Databricks jobs",
119 | )
120 | async def list_jobs(params: Dict[str, Any]) -> List[TextContent]:
121 | logger.info(f"Listing jobs with params: {params}")
122 | try:
123 | result = await jobs.list_jobs()
124 | return [{"text": json.dumps(result)}]
125 | except Exception as e:
126 | logger.error(f"Error listing jobs: {str(e)}")
127 | return [{"text": json.dumps({"error": str(e)})}]
128 |
129 | @self.tool(
130 | name="run_job",
131 | description="Run a Databricks job with parameters: job_id (required), notebook_params (optional)",
132 | )
133 | async def run_job(params: Dict[str, Any]) -> List[TextContent]:
134 | logger.info(f"Running job with params: {params}")
135 | try:
136 | notebook_params = params.get("notebook_params", {})
137 | result = await jobs.run_job(params.get("job_id"), notebook_params)
138 | return [{"text": json.dumps(result)}]
139 | except Exception as e:
140 | logger.error(f"Error running job: {str(e)}")
141 | return [{"text": json.dumps({"error": str(e)})}]
142 |
143 | # Notebook management tools
144 | @self.tool(
145 | name="list_notebooks",
146 | description="List notebooks in a workspace directory with parameter: path (required)",
147 | )
148 | async def list_notebooks(params: Dict[str, Any]) -> List[TextContent]:
149 | logger.info(f"Listing notebooks with params: {params}")
150 | try:
151 | result = await notebooks.list_notebooks(params.get("path"))
152 | return [{"text": json.dumps(result)}]
153 | except Exception as e:
154 | logger.error(f"Error listing notebooks: {str(e)}")
155 | return [{"text": json.dumps({"error": str(e)})}]
156 |
157 | @self.tool(
158 | name="export_notebook",
159 | description="Export a notebook from the workspace with parameters: path (required), format (optional, one of: SOURCE, HTML, JUPYTER, DBC)",
160 | )
161 | async def export_notebook(params: Dict[str, Any]) -> List[TextContent]:
162 | logger.info(f"Exporting notebook with params: {params}")
163 | try:
164 | format_type = params.get("format", "SOURCE")
165 | result = await notebooks.export_notebook(params.get("path"), format_type)
166 |
167 | # For notebooks, we might want to trim the response for readability
168 | content = result.get("content", "")
169 | if len(content) > 1000:
170 | summary = f"{content[:1000]}... [content truncated, total length: {len(content)} characters]"
171 | result["content"] = summary
172 |
173 | return [{"text": json.dumps(result)}]
174 | except Exception as e:
175 | logger.error(f"Error exporting notebook: {str(e)}")
176 | return [{"text": json.dumps({"error": str(e)})}]
177 |
178 | # DBFS tools
179 | @self.tool(
180 | name="list_files",
181 | description="List files and directories in a DBFS path with parameter: dbfs_path (required)",
182 | )
183 | async def list_files(params: Dict[str, Any]) -> List[TextContent]:
184 | logger.info(f"Listing files with params: {params}")
185 | try:
186 | result = await dbfs.list_files(params.get("dbfs_path"))
187 | return [{"text": json.dumps(result)}]
188 | except Exception as e:
189 | logger.error(f"Error listing files: {str(e)}")
190 | return [{"text": json.dumps({"error": str(e)})}]
191 |
192 | # SQL tools
193 | @self.tool(
194 | name="execute_sql",
195 | description="Execute a SQL statement with parameters: statement (required), warehouse_id (required), catalog (optional), schema (optional)",
196 | )
197 | async def execute_sql(params: Dict[str, Any]) -> List[TextContent]:
198 | logger.info(f"Executing SQL with params: {params}")
199 | try:
200 | statement = params.get("statement")
201 | warehouse_id = params.get("warehouse_id")
202 | catalog = params.get("catalog")
203 | schema = params.get("schema")
204 |
205 | result = await sql.execute_sql(statement, warehouse_id, catalog, schema)
206 | return [{"text": json.dumps(result)}]
207 | except Exception as e:
208 | logger.error(f"Error executing SQL: {str(e)}")
209 | return [{"text": json.dumps({"error": str(e)})}]
210 |
211 |
212 | async def main():
213 | """Main entry point for the MCP server."""
214 | try:
215 | logger.info("Starting Databricks MCP server")
216 | server = DatabricksMCPServer()
217 |
218 | # Use the built-in method for stdio servers
219 | # This is the recommended approach for MCP servers
220 | await server.run_stdio_async()
221 |
222 | except Exception as e:
223 | logger.error(f"Error in Databricks MCP server: {str(e)}", exc_info=True)
224 | raise
225 |
226 |
227 | if __name__ == "__main__":
228 | # Turn off buffering in stdout
229 | if hasattr(sys.stdout, 'reconfigure'):
230 | sys.stdout.reconfigure(line_buffering=True)
231 |
232 | asyncio.run(main())
```