#
tokens: 30367/50000 52/52 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .cursor
│   └── rules
│       ├── databricks_api.mdc
│       ├── documentation.mdc
│       ├── environment.mdc
│       ├── mcp_tools.mdc
│       ├── project_structure.mdc
│       ├── python_conventions.mdc
│       └── testing.mdc
├── .cursor.json
├── .env.example
├── .gitignore
├── docs
│   └── phase1.md
├── examples
│   ├── direct_usage.py
│   ├── mcp_client_usage.py
│   └── README.md
├── project_structure.md
├── pyproject.toml
├── README.md
├── scripts
│   ├── run_client_test.ps1
│   ├── run_direct_test.ps1
│   ├── run_direct_test.sh
│   ├── run_list_tools.ps1
│   ├── run_list_tools.sh
│   ├── run_mcp_client_test.ps1
│   ├── run_mcp_client_test.sh
│   ├── run_mcp_test.ps1
│   ├── run_tests.ps1
│   ├── show_clusters.py
│   ├── show_notebooks.py
│   ├── start_mcp_server.ps1
│   ├── start_mcp_server.sh
│   └── test_mcp_server.ps1
├── src
│   ├── __init__.py
│   ├── __main__.py
│   ├── api
│   │   ├── __init__.py
│   │   ├── clusters.py
│   │   ├── dbfs.py
│   │   ├── jobs.py
│   │   ├── notebooks.py
│   │   └── sql.py
│   ├── cli
│   │   ├── __init__.py
│   │   └── commands.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── auth.py
│   │   ├── config.py
│   │   └── utils.py
│   ├── main.py
│   └── server
│       ├── __init__.py
│       ├── __main__.py
│       ├── app.py
│       └── databricks_mcp_server.py
├── start_mcp_server.ps1
├── start_mcp_server.sh
├── SUMMARY.md
├── tests
│   ├── __init__.py
│   ├── README.md
│   ├── test_clusters.py
│   ├── test_direct.py
│   ├── test_mcp_client.py
│   ├── test_mcp_server.py
│   └── test_tools.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------

```
# Databricks API configuration
DATABRICKS_HOST=https://adb-xxxxxxxxxxxx.xx.azuredatabricks.net
DATABRICKS_TOKEN=your_databricks_token_here

# Server configuration
SERVER_HOST=0.0.0.0
SERVER_PORT=8000
DEBUG=False

# Logging
LOG_LEVEL=INFO 
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python virtual environments
venv/
.venv/
env/
ENV/

# Python bytecode
__pycache__/
*.py[cod]
*$py.class

# Distribution / packaging
dist/
build/
*.egg-info/

# Local development settings
.env
.env.local

# IDE settings
.idea/
.vscode/
*.swp
*.swo

# OS specific files
.DS_Store
Thumbs.db

# Logs
*.log
logs/

# Temporary files
tmp/
temp/

# uv package manager files
.uv/
uv.lock

# Databricks-specific
*.dbfs

# C extensions
*.so

# Distribution / packaging
.Python
develop-eggs/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/

# Environments
env.bak/
venv.bak/

# IDEs and editors
*.swp
*.swo
*~

# OS generated files
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db

# Log files
*.log 
```

--------------------------------------------------------------------------------
/.cursor.json:
--------------------------------------------------------------------------------

```json
{
  "name": "Databricks MCP Server",
  "description": "A Model Completion Protocol (MCP) server for interacting with Databricks services",
  "version": "0.1.0",
  "repository": "https://github.com/JustTryAI/databricks-mcp-server",
  
  "structure": {
    "src": {
      "description": "Source code for the project",
      "children": {
        "server": {
          "description": "MCP server implementation",
          "patterns": ["*_mcp_server.py", "*.py"]
        },
        "api": {
          "description": "API client for Databricks services",
          "patterns": ["*.py"]
        },
        "core": {
          "description": "Core functionality and utilities",
          "patterns": ["*.py"]
        },
        "cli": {
          "description": "Command-line interface",
          "patterns": ["*.py"]
        }
      }
    },
    "tests": {
      "description": "Test files for the project",
      "patterns": ["test_*.py"],
      "rules": [
        "Each file in src/ should have a corresponding test file in tests/"
      ]
    },
    "examples": {
      "description": "Example usage of the MCP server",
      "patterns": ["*.py"]
    },
    "scripts": {
      "description": "Helper scripts for running the server and tests",
      "patterns": ["*.ps1", "*.sh"]
    }
  },
  
  "conventions": {
    "python": {
      "style": {
        "lineLength": 100,
        "indentation": {
          "type": "spaces",
          "size": 4
        },
        "quotes": {
          "default": "double",
          "avoidEscape": true
        }
      },
      "imports": {
        "ordering": [
          "standard_library",
          "third_party",
          "first_party"
        ],
        "grouping": true,
        "alphabetize": true
      },
      "docstrings": {
        "style": "google",
        "required": ["classes", "methods", "functions"]
      },
      "typings": {
        "required": true,
        "ignorePatterns": ["tests/*"]
      }
    },
    "naming": {
      "variables": "snake_case",
      "constants": "UPPER_SNAKE_CASE",
      "classes": "PascalCase",
      "functions": "snake_case",
      "methods": "snake_case",
      "files": "snake_case"
    }
  },
  
  "patterns": {
    "mcp_tools": {
      "description": "Pattern for MCP tool definitions",
      "example": "async def tool_name(params: Dict[str, Any]) -> Dict[str, Any]: ...",
      "rules": [
        "Tool functions should be async",
        "Tool functions should have clear docstrings describing purpose and parameters",
        "Tool functions should have proper error handling",
        "Tool functions should return a dictionary that matches the MCP protocol spec"
      ]
    },
    "databricks_api": {
      "description": "Pattern for Databricks API calls",
      "example": "async def api_call(client, **params): ...",
      "rules": [
        "API functions should be async",
        "API functions should handle rate limiting and retries",
        "API functions should provide clear error messages",
        "API responses should be validated before returning"
      ]
    }
  },
  
  "files": {
    "required": [
      "README.md",
      "pyproject.toml",
      ".gitignore",
      "src/server/databricks_mcp_server.py"
    ],
    "linting": {
      "enabled": true,
      "pylint": true,
      "flake8": true,
      "mypy": true
    }
  },
  
  "mcp": {
    "protocol_version": "1.0",
    "tool_documentation": {
      "required_fields": ["name", "description", "parameters", "returns"],
      "example": {
        "name": "list_clusters",
        "description": "Lists all available Databricks clusters",
        "parameters": {},
        "returns": "List of cluster objects"
      }
    },
    "tool_implementation": {
      "error_handling": "All tool functions must return errors as part of the result object with isError: true",
      "timeouts": "All tool functions should implement appropriate timeouts",
      "progress_reporting": "Long-running operations should provide progress updates"
    }
  },
  
  "references": {
    "mcp_protocol": "https://modelcontextprotocol.io/llms-full.txt",
    "databricks_api": "https://docs.databricks.com/api/azure/workspace/clusters/edit",
    "python_sdk": "https://github.com/modelcontextprotocol/python-sdk",
    "python_style_guide": "https://peps.python.org/pep-0008/"
  },
  
  "testing": {
    "frameworks": ["pytest"],
    "coverage": {
      "minimum": 80,
      "exclude": ["scripts/*", "examples/*"]
    },
    "strategies": [
      "unit_tests",
      "integration_tests",
      "mcp_protocol_tests"
    ]
  },
  
  "documentation": {
    "required": [
      "README.md",
      "tests/README.md",
      "examples/README.md"
    ],
    "api_docs": {
      "style": "sphinx",
      "output_dir": "docs/api"
    }
  },
  
  "environment": {
    "python_version": ">=3.10",
    "package_manager": "uv",
    "virtual_env": ".venv"
  }
} 
```

--------------------------------------------------------------------------------
/tests/README.md:
--------------------------------------------------------------------------------

```markdown
# Tests for Databricks MCP Server

This directory contains test scripts for the Databricks MCP server.

## Test Files

1. **Direct Test (direct_test.py)**
   
   This test directly instantiates the Databricks MCP server and calls its tools
   without going through the MCP protocol. It's useful for testing the core
   functionality without the overhead of the MCP protocol.

2. **MCP Client Test (mcp_client_test.py)**
   
   This test uses the MCP client to connect to the Databricks MCP server and test
   its tools through the MCP protocol. It's useful for testing the server's
   compatibility with the MCP protocol.

3. **List Tools Test (list_tools_test.py)**
   
   This test connects to the Databricks MCP server using the MCP client and lists
   all available tools. It's a simple test to verify that the server is running
   and properly responding to the MCP protocol.

## Running Tests

You can run the tests using the provided shell scripts in the project root:

### Windows (PowerShell)

```powershell
.\run_direct_test.ps1     # Run the direct test
.\run_list_tools.ps1      # Run the list tools test
.\run_mcp_client_test.ps1 # Run the MCP client test
```

### Linux/Mac

```bash
./run_direct_test.sh     # Run the direct test
./run_list_tools.sh      # Run the list tools test
./run_mcp_client_test.sh # Run the MCP client test
```

## Running Tests Manually

If you want to run the tests manually:

```bash
# Activate the environment
source .venv/bin/activate  # Linux/Mac
# or
.\.venv\Scripts\activate   # Windows

# Run the tests
uv run -m tests.direct_test
uv run -m tests.list_tools_test
uv run -m tests.mcp_client_test
```

## Adding New Tests

When adding new tests, please follow these guidelines:

1. Create a new Python file in the `tests` directory.
2. Import the necessary modules from the `src` directory.
3. Create a shell script in the project root to run the test.
4. Document the test in this README. 
```

--------------------------------------------------------------------------------
/examples/README.md:
--------------------------------------------------------------------------------

```markdown
# Databricks MCP Server Examples

This directory contains examples of how to use the Databricks MCP server.

## Example Files

1. **Direct Usage (direct_usage.py)**
   
   This example shows how to directly instantiate and use the Databricks MCP server
   without going through the MCP protocol. It demonstrates:
   
   - Creating a server instance
   - Calling tools directly
   - Processing the results

   To run this example:
   ```bash
   uv run examples/direct_usage.py
   ```

2. **MCP Client Usage (mcp_client_usage.py)**
   
   This example shows how to use the MCP client to connect to the Databricks MCP server
   and call its tools through the MCP protocol. It demonstrates:
   
   - Connecting to the server using the MCP protocol
   - Listing available tools
   - Calling tools through the MCP protocol
   - Processing the results

   To run this example:
   ```bash
   uv run examples/mcp_client_usage.py
   ```

## Running Examples

Make sure you have the following prerequisites:

1. Python 3.10+ installed
2. `uv` package manager installed (see project README for installation instructions)
3. Project environment set up with `uv venv`
4. Dependencies installed with `uv add`
5. Environment variables set (DATABRICKS_HOST, DATABRICKS_TOKEN)

First, make sure you're in the project root directory and the virtual environment is activated:

```bash
# Windows
.\.venv\Scripts\activate

# Linux/Mac
source .venv/bin/activate
```

Then you can run the examples as shown above.

## Example Outputs

### Direct Usage Example Output

```
Databricks MCP Server - Direct Usage Example
===========================================

Databricks Clusters:
====================

Cluster 1:
  ID: 0220-221815-kzacbcps
  Name: Lloyd Burley's Cluster LTS
  State: TERMINATED
  Spark Version: 15.4.x-scala2.12
  Node Type: Standard_DS3_v2

Databricks Notebooks in /:
================================

Notebook: /Shared/example_notebook
Directory: /Users/

Databricks Jobs:
================

Job 1:
  ID: 12345
  Name: Daily ETL Job
  Created: 1740089895875
```

### MCP Client Usage Example Output

```
Databricks MCP Server - MCP Client Usage Example
=============================================
2025-03-13 10:05:23,456 - __main__ - INFO - Connecting to Databricks MCP server...
2025-03-13 10:05:23,457 - __main__ - INFO - Launching server process...
2025-03-13 10:05:23,789 - __main__ - INFO - Server launched, creating session...
2025-03-13 10:05:23,790 - __main__ - INFO - Initializing session...

Available Tools:
================
- list_clusters: List all Databricks clusters
- create_cluster: Create a new Databricks cluster with parameters: cluster_name (required), spark_version (required), node_type_id (required), num_workers, autotermination_minutes
- terminate_cluster: Terminate a Databricks cluster with parameter: cluster_id (required)
- get_cluster: Get information about a specific Databricks cluster with parameter: cluster_id (required)
- start_cluster: Start a terminated Databricks cluster with parameter: cluster_id (required)
- list_jobs: List all Databricks jobs
- run_job: Run a Databricks job with parameters: job_id (required), notebook_params (optional)
- list_notebooks: List notebooks in a workspace directory with parameter: path (required)
- export_notebook: Export a notebook from the workspace with parameters: path (required), format (optional, one of: SOURCE, HTML, JUPYTER, DBC)
- list_files: List files and directories in a DBFS path with parameter: dbfs_path (required)
- execute_sql: Execute a SQL statement with parameters: statement (required), warehouse_id (required), catalog (optional), schema (optional)

Select a tool to run (or 'quit' to exit):
1. list_clusters
2. create_cluster
3. terminate_cluster
4. get_cluster
5. start_cluster
6. list_jobs
7. run_job
8. list_notebooks
9. export_notebook
10. list_files
11. execute_sql
``` 
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Databricks MCP Server

A Model Completion Protocol (MCP) server for Databricks that provides access to Databricks functionality via the MCP protocol. This allows LLM-powered tools to interact with Databricks clusters, jobs, notebooks, and more.

## Features

- **MCP Protocol Support**: Implements the MCP protocol to allow LLMs to interact with Databricks
- **Databricks API Integration**: Provides access to Databricks REST API functionality
- **Tool Registration**: Exposes Databricks functionality as MCP tools
- **Async Support**: Built with asyncio for efficient operation

## Available Tools

The Databricks MCP Server exposes the following tools:

- **list_clusters**: List all Databricks clusters
- **create_cluster**: Create a new Databricks cluster
- **terminate_cluster**: Terminate a Databricks cluster
- **get_cluster**: Get information about a specific Databricks cluster
- **start_cluster**: Start a terminated Databricks cluster
- **list_jobs**: List all Databricks jobs
- **run_job**: Run a Databricks job
- **list_notebooks**: List notebooks in a workspace directory
- **export_notebook**: Export a notebook from the workspace
- **list_files**: List files and directories in a DBFS path
- **execute_sql**: Execute a SQL statement

## Installation

### Prerequisites

- Python 3.10 or higher
- `uv` package manager (recommended for MCP servers)

### Setup

1. Install `uv` if you don't have it already:

   ```bash
   # MacOS/Linux
   curl -LsSf https://astral.sh/uv/install.sh | sh
   
   # Windows (in PowerShell)
   irm https://astral.sh/uv/install.ps1 | iex
   ```

   Restart your terminal after installation.

2. Clone the repository:
   ```bash
   git clone https://github.com/JustTryAI/databricks-mcp-server.git
   cd databricks-mcp-server
   ```

3. Set up the project with `uv`:
   ```bash
   # Create and activate virtual environment
   uv venv
   
   # On Windows
   .\.venv\Scripts\activate
   
   # On Linux/Mac
   source .venv/bin/activate
   
   # Install dependencies in development mode
   uv pip install -e .
   
   # Install development dependencies
   uv pip install -e ".[dev]"
   ```

4. Set up environment variables:
   ```bash
   # Windows
   set DATABRICKS_HOST=https://your-databricks-instance.azuredatabricks.net
   set DATABRICKS_TOKEN=your-personal-access-token
   
   # Linux/Mac
   export DATABRICKS_HOST=https://your-databricks-instance.azuredatabricks.net
   export DATABRICKS_TOKEN=your-personal-access-token
   ```

   You can also create an `.env` file based on the `.env.example` template.

## Running the MCP Server

To start the MCP server, run:

```bash
# Windows
.\start_mcp_server.ps1

# Linux/Mac
./start_mcp_server.sh
```

These wrapper scripts will execute the actual server scripts located in the `scripts` directory. The server will start and be ready to accept MCP protocol connections.

You can also directly run the server scripts from the scripts directory:

```bash
# Windows
.\scripts\start_mcp_server.ps1

# Linux/Mac
./scripts/start_mcp_server.sh
```

## Querying Databricks Resources

The repository includes utility scripts to quickly view Databricks resources:

```bash
# View all clusters
uv run scripts/show_clusters.py

# View all notebooks
uv run scripts/show_notebooks.py
```

## Project Structure

```
databricks-mcp-server/
├── src/                             # Source code
│   ├── __init__.py                  # Makes src a package
│   ├── __main__.py                  # Main entry point for the package
│   ├── main.py                      # Entry point for the MCP server
│   ├── api/                         # Databricks API clients
│   ├── core/                        # Core functionality
│   ├── server/                      # Server implementation
│   │   ├── databricks_mcp_server.py # Main MCP server
│   │   └── app.py                   # FastAPI app for tests
│   └── cli/                         # Command-line interface
├── tests/                           # Test directory
├── scripts/                         # Helper scripts
│   ├── start_mcp_server.ps1         # Server startup script (Windows)
│   ├── run_tests.ps1                # Test runner script
│   ├── show_clusters.py             # Script to show clusters
│   └── show_notebooks.py            # Script to show notebooks
├── examples/                        # Example usage
├── docs/                            # Documentation
└── pyproject.toml                   # Project configuration
```

See `project_structure.md` for a more detailed view of the project structure.

## Development

### Code Standards

- Python code follows PEP 8 style guide with a maximum line length of 100 characters
- Use 4 spaces for indentation (no tabs)
- Use double quotes for strings
- All classes, methods, and functions should have Google-style docstrings
- Type hints are required for all code except tests

### Linting

The project uses the following linting tools:

```bash
# Run all linters
uv run pylint src/ tests/
uv run flake8 src/ tests/
uv run mypy src/
```

## Testing

The project uses pytest for testing. To run the tests:

```bash
# Run all tests with our convenient script
.\scripts\run_tests.ps1

# Run with coverage report
.\scripts\run_tests.ps1 -Coverage

# Run specific tests with verbose output
.\scripts\run_tests.ps1 -Verbose -Coverage tests/test_clusters.py
```

You can also run the tests directly with pytest:

```bash
# Run all tests
uv run pytest tests/

# Run with coverage report
uv run pytest --cov=src tests/ --cov-report=term-missing
```

A minimum code coverage of 80% is the goal for the project.

## Documentation

- API documentation is generated using Sphinx and can be found in the `docs/api` directory
- All code includes Google-style docstrings
- See the `examples/` directory for usage examples

## Examples

Check the `examples/` directory for usage examples. To run examples:

```bash
# Run example scripts with uv
uv run examples/direct_usage.py
uv run examples/mcp_client_usage.py
```

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

1. Ensure your code follows the project's coding standards
2. Add tests for any new functionality
3. Update documentation as necessary
4. Verify all tests pass before submitting

## License

This project is licensed under the MIT License - see the LICENSE file for details. 
```

--------------------------------------------------------------------------------
/src/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/api/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/cli/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/core/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/server/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/__main__.py:
--------------------------------------------------------------------------------

```python
"""
Main entry point for running the databricks-mcp-server package.
This allows the package to be run with 'python -m src' or 'uv run src'.
"""

import asyncio
from src.main import main

if __name__ == "__main__":
    asyncio.run(main()) 
```

--------------------------------------------------------------------------------
/src/server/__main__.py:
--------------------------------------------------------------------------------

```python
"""
Main entry point for running the server module directly.
This allows the module to be run with 'python -m src.server' or 'uv run src.server'.
"""

import asyncio
from src.server.databricks_mcp_server import main

if __name__ == "__main__":
    asyncio.run(main()) 
```

--------------------------------------------------------------------------------
/start_mcp_server.sh:
--------------------------------------------------------------------------------

```bash
#!/bin/bash
# Wrapper script to run the MCP server start script from scripts directory

# Get the directory of this script
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"

# Change to the script directory
cd "$SCRIPT_DIR"

# Run the actual server script
"$SCRIPT_DIR/scripts/start_mcp_server.sh" 
```

--------------------------------------------------------------------------------
/start_mcp_server.ps1:
--------------------------------------------------------------------------------

```
#!/usr/bin/env pwsh
# Wrapper script to run the MCP server start script from scripts directory

param(
    [switch]$SkipPrompt
)

# Get the directory of this script
$scriptPath = $MyInvocation.MyCommand.Path
$scriptDir = Split-Path $scriptPath -Parent

# Change to the script directory
Set-Location $scriptDir

# Run the actual server script with any parameters passed to this script
if ($SkipPrompt) {
    & "$scriptDir\scripts\start_mcp_server.ps1" -SkipPrompt
} else {
    & "$scriptDir\scripts\start_mcp_server.ps1"
} 
```

--------------------------------------------------------------------------------
/scripts/show_clusters.py:
--------------------------------------------------------------------------------

```python
"""
Simple script to show clusters from Databricks
"""

import asyncio
import json
import sys
from src.api import clusters

async def show_all_clusters():
    """Show all clusters in the Databricks workspace."""
    print("Fetching clusters from Databricks...")
    try:
        result = await clusters.list_clusters()
        print("\nClusters found:")
        print(json.dumps(result, indent=2))
        return result
    except Exception as e:
        print(f"Error listing clusters: {e}")
        return None

if __name__ == "__main__":
    asyncio.run(show_all_clusters()) 
```

--------------------------------------------------------------------------------
/scripts/show_notebooks.py:
--------------------------------------------------------------------------------

```python
"""
Simple script to show notebooks from Databricks
"""

import asyncio
import json
import sys
from src.api import notebooks

async def show_all_notebooks():
    """Show all notebooks in the Databricks workspace."""
    print("Fetching notebooks from Databricks...")
    try:
        result = await notebooks.list_notebooks(path="/")
        print("\nNotebooks found:")
        print(json.dumps(result, indent=2))
        return result
    except Exception as e:
        print(f"Error listing notebooks: {e}")
        return None

if __name__ == "__main__":
    asyncio.run(show_all_notebooks()) 
```

--------------------------------------------------------------------------------
/scripts/run_direct_test.sh:
--------------------------------------------------------------------------------

```bash
#!/bin/bash

# Check if the virtual environment exists
if [ ! -d ".venv" ]; then
    echo "Virtual environment not found. Please create it first:"
    echo "uv venv"
    exit 1
fi

# Activate the virtual environment
source .venv/bin/activate

# Check if environment variables are set
if [ -z "$DATABRICKS_HOST" ] || [ -z "$DATABRICKS_TOKEN" ]; then
    echo "Warning: DATABRICKS_HOST and/or DATABRICKS_TOKEN environment variables are not set."
    echo "Please set them before running the test."
    exit 1
fi

# Run the direct test
echo "Running direct test at $(date)"
echo "Databricks Host: $DATABRICKS_HOST"

uv run -m tests.direct_test

echo "Test completed at $(date)" 
```

--------------------------------------------------------------------------------
/scripts/run_tests.ps1:
--------------------------------------------------------------------------------

```
#!/usr/bin/env pwsh
# Run tests for the Databricks MCP server

param(
    [string]$TestPath = "tests/",
    [switch]$Coverage,
    [switch]$Verbose
)

# Check if the virtual environment exists
if (-not (Test-Path -Path ".venv")) {
    Write-Host "Virtual environment not found. Please create it first:"
    Write-Host "uv venv"
    exit 1
}

# Activate virtual environment
. .\.venv\Scripts\Activate.ps1

# Base command
$cmd = "uv run pytest"

# Add verbose flag if specified
if ($Verbose) {
    $cmd += " -v"
}

# Add coverage if specified
if ($Coverage) {
    $cmd += " --cov=src --cov-report=term-missing"
}

# Add test path
$cmd += " $TestPath"

Write-Host "Running: $cmd"
Invoke-Expression $cmd

# Print summary
Write-Host "`nTest run completed at $(Get-Date)" 
```

--------------------------------------------------------------------------------
/scripts/run_direct_test.ps1:
--------------------------------------------------------------------------------

```
#!/usr/bin/env pwsh
# PowerShell script to run the direct test

# Check if the virtual environment exists
if (-not (Test-Path -Path ".venv")) {
    Write-Host "Virtual environment not found. Please create it first:"
    Write-Host "uv venv"
    exit 1
}

# Activate virtual environment
. .\.venv\Scripts\Activate.ps1

# Check if environment variables are set
if (-not (Get-Item -Path Env:DATABRICKS_HOST -ErrorAction SilentlyContinue) -or 
    -not (Get-Item -Path Env:DATABRICKS_TOKEN -ErrorAction SilentlyContinue)) {
    Write-Host "Warning: DATABRICKS_HOST and/or DATABRICKS_TOKEN environment variables are not set."
    Write-Host "Please set them before running the test."
    exit 1
}

# Run the direct test
Write-Host "Running direct test at $(Get-Date)"
Write-Host "Databricks Host: $env:DATABRICKS_HOST"

uv run -m tests.direct_test

Write-Host "Test completed at $(Get-Date)" 
```

--------------------------------------------------------------------------------
/scripts/start_mcp_server.sh:
--------------------------------------------------------------------------------

```bash
#!/bin/bash

# Check if the virtual environment exists
if [ ! -d ".venv" ]; then
    echo "Virtual environment not found. Please create it first:"
    echo "uv venv"
    exit 1
fi

# Activate the virtual environment
source .venv/bin/activate

# Check if environment variables are set
if [ -z "$DATABRICKS_HOST" ] || [ -z "$DATABRICKS_TOKEN" ]; then
    echo "Warning: DATABRICKS_HOST and/or DATABRICKS_TOKEN environment variables are not set."
    echo "You can set them now or the server will look for them in other sources."
    read -p "Do you want to continue? (y/n) " -n 1 -r
    echo
    if [[ ! $REPLY =~ ^[Yy]$ ]]; then
        exit 1
    fi
fi

# Start the server by running the module directly
echo "Starting Databricks MCP server at $(date)"
if [ -n "$DATABRICKS_HOST" ]; then
    echo "Databricks Host: $DATABRICKS_HOST"
fi

uv run src.server.databricks_mcp_server

echo "Server stopped at $(date)" 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "databricks-mcp-server"
version = "0.1.0"
description = "A Model Completion Protocol (MCP) server for Databricks"
authors = [
    {name = "MCP Server Team", email = "[email protected]"}
]
requires-python = ">=3.10"
readme = "README.md"
license = {text = "MIT"}
classifiers = [
    "Programming Language :: Python :: 3",
    "Programming Language :: Python :: 3.10",
    "Programming Language :: Python :: 3.11",
    "License :: OSI Approved :: MIT License",
    "Operating System :: OS Independent",
]
dependencies = [
    "mcp[cli]>=1.2.0",
    "httpx",
    "databricks-sdk",
]

[project.optional-dependencies]
cli = [
    "click",
]
dev = [
    "black",
    "pylint",
    "pytest",
    "pytest-asyncio",
]

[project.scripts]
databricks-mcp = "src.cli.commands:main"

[tool.hatch.build.targets.wheel]
packages = ["src"] 

```

--------------------------------------------------------------------------------
/scripts/run_list_tools.sh:
--------------------------------------------------------------------------------

```bash
#!/bin/bash

# Check if the virtual environment exists
if [ ! -d ".venv" ]; then
    echo "Virtual environment not found. Please create it first:"
    echo "uv venv"
    exit 1
fi

# Activate the virtual environment
source .venv/bin/activate

# Run the list tools test
echo "Running list tools test at $(date)"

# Check if the server is already running
if ! pgrep -f "uv run.*src.server.databricks_mcp_server" > /dev/null; then
    echo "Starting MCP server in the background..."
    # Start the server in the background
    uv run src.server.databricks_mcp_server > server.log 2>&1 &
    SERVER_PID=$!
    echo "Server started with PID $SERVER_PID"
    # Give the server a moment to start
    sleep 2
    SERVER_STARTED=true
else
    echo "MCP server is already running"
    SERVER_STARTED=false
fi

# Run the list tools test
echo "Running list tools test..."
uv run -m tests.list_tools_test

# If we started the server, stop it
if [ "$SERVER_STARTED" = true ]; then
    echo "Stopping MCP server (PID $SERVER_PID)..."
    kill $SERVER_PID
    echo "Server stopped"
fi

echo "Test completed at $(date)" 
```

--------------------------------------------------------------------------------
/scripts/run_mcp_client_test.sh:
--------------------------------------------------------------------------------

```bash
#!/bin/bash

# Check if the virtual environment exists
if [ ! -d ".venv" ]; then
    echo "Virtual environment not found. Please create it first:"
    echo "uv venv"
    exit 1
fi

# Activate the virtual environment
source .venv/bin/activate

# Run the MCP client test
echo "Running MCP client test at $(date)"

# Check if the server is already running
if ! pgrep -f "uv run.*src.server.databricks_mcp_server" > /dev/null; then
    echo "Starting MCP server in the background..."
    # Start the server in the background
    uv run src.server.databricks_mcp_server > server.log 2>&1 &
    SERVER_PID=$!
    echo "Server started with PID $SERVER_PID"
    # Give the server a moment to start
    sleep 2
    SERVER_STARTED=true
else
    echo "MCP server is already running"
    SERVER_STARTED=false
fi

# Run the MCP client test
echo "Running MCP client test..."
uv run -m tests.mcp_client_test

# If we started the server, stop it
if [ "$SERVER_STARTED" = true ]; then
    echo "Stopping MCP server (PID $SERVER_PID)..."
    kill $SERVER_PID
    echo "Server stopped"
fi

echo "Test completed at $(date)" 
```

--------------------------------------------------------------------------------
/scripts/test_mcp_server.ps1:
--------------------------------------------------------------------------------

```
#!/usr/bin/env pwsh
# PowerShell script to run the MCP server test

# Check if the virtual environment exists
if (-not (Test-Path -Path ".venv")) {
    Write-Host "Virtual environment not found. Please create it first:"
    Write-Host "uv venv"
    exit 1
}

# Activate virtual environment
. .\.venv\Scripts\Activate.ps1

# Ensure no MCP servers are already running
$serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*start_mcp_server.ps1*" }
if ($serverProcesses) {
    Write-Host "Found existing MCP server processes, stopping them first..."
    $serverProcesses | ForEach-Object { 
        Stop-Process -Id $_.Id -Force 
        Write-Host "Stopped process $($_.Id)"
    }
}

# Run the test 
Write-Host "Running MCP server tests..."
uv run test_mcp_server.py

# When done, clean up any leftover processes
$serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*start_mcp_server.ps1*" }
if ($serverProcesses) {
    Write-Host "Cleaning up any remaining MCP server processes..."
    $serverProcesses | ForEach-Object { 
        Stop-Process -Id $_.Id -Force 
        Write-Host "Stopped process $($_.Id)"
    }
} 
```

--------------------------------------------------------------------------------
/scripts/start_mcp_server.ps1:
--------------------------------------------------------------------------------

```
#!/usr/bin/env pwsh
# Start script for the Databricks MCP server

param(
    [switch]$SkipPrompt
)

# Check if the virtual environment exists
if (-not (Test-Path -Path ".venv")) {
    Write-Host "Virtual environment not found. Please create it first:"
    Write-Host "uv venv"
    exit 1
}

# Activate virtual environment
. .\.venv\Scripts\Activate.ps1

# Check if environment variables are set
if (-not (Get-Item -Path Env:DATABRICKS_HOST -ErrorAction SilentlyContinue) -or 
    -not (Get-Item -Path Env:DATABRICKS_TOKEN -ErrorAction SilentlyContinue)) {
    Write-Host "Warning: DATABRICKS_HOST and/or DATABRICKS_TOKEN environment variables are not set."
    Write-Host "You can set them now or the server will look for them in other sources."
    
    # Skip prompt when called from tests
    if ($SkipPrompt) {
        Write-Host "Auto-continuing due to SkipPrompt flag..."
    } else {
        $continue = Read-Host "Do you want to continue? (y/n)"
        if ($continue -ne "y") {
            exit 1
        }
    }
}

# Start the server
Write-Host "Starting Databricks MCP server at $(Get-Date)"
if (Get-Item -Path Env:DATABRICKS_HOST -ErrorAction SilentlyContinue) {
    Write-Host "Databricks Host: $env:DATABRICKS_HOST"
}

# Try to run the module using python -m
Write-Host "Attempting to start server using module path..."
python -m src.main

# If the above fails, fallback to direct script execution
if ($LASTEXITCODE -ne 0) {
    Write-Host "Module execution failed, trying direct script execution..."
    python "$PSScriptRoot\..\src\main.py"
}

Write-Host "Server stopped at $(Get-Date)" 
```

--------------------------------------------------------------------------------
/scripts/run_mcp_test.ps1:
--------------------------------------------------------------------------------

```
#!/usr/bin/env pwsh
# Start MCP server and run tests

# Check if the virtual environment exists
if (-not (Test-Path -Path ".venv")) {
    Write-Host "Virtual environment not found. Please create it first:"
    Write-Host "uv venv"
    exit 1
}

# Activate virtual environment
. .\.venv\Scripts\Activate.ps1

# Make sure there are no existing MCP server processes
$serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*start_mcp_server.ps1*" }
if ($serverProcesses) {
    Write-Host "Found existing MCP server processes, stopping them first..."
    $serverProcesses | ForEach-Object { 
        Stop-Process -Id $_.Id -Force 
        Write-Host "Stopped process $($_.Id)"
    }
}

# Start the MCP server in a new PowerShell window
$serverProcess = Start-Process pwsh -ArgumentList "-File", "scripts\start_mcp_server.ps1" -PassThru -WindowStyle Minimized

# Give it time to initialize
Write-Host "Waiting for MCP server to initialize..."
Start-Sleep -Seconds 5

try {
    # Run the test
    Write-Host "Running test against the MCP server..."
    uv run test_running_server.py
}
finally {
    # Clean up: stop the server
    if ($serverProcess -and !$serverProcess.HasExited) {
        Write-Host "Stopping MCP server..."
        Stop-Process -Id $serverProcess.Id -Force
    }
    
    # Make sure all MCP server processes are stopped
    $serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*start_mcp_server.ps1*" }
    if ($serverProcesses) {
        $serverProcesses | ForEach-Object { 
            Stop-Process -Id $_.Id -Force 
            Write-Host "Stopped process $($_.Id)"
        }
    }
} 
```

--------------------------------------------------------------------------------
/src/main.py:
--------------------------------------------------------------------------------

```python
"""
Main entry point for the Databricks MCP server.
"""

import asyncio
import logging
import os
import sys
from typing import Optional

from src.core.config import settings
from src.server.databricks_mcp_server import DatabricksMCPServer

# Function to start the server - extracted from the server file
async def start_mcp_server():
    """Start the MCP server."""
    server = DatabricksMCPServer()
    await server.run_stdio_async()


def setup_logging(log_level: Optional[str] = None):
    """
    Set up logging configuration.
    
    Args:
        log_level: Optional log level to override the default
    """
    level = getattr(logging, log_level or settings.LOG_LEVEL)
    
    logging.basicConfig(
        level=level,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
        handlers=[
            logging.StreamHandler(sys.stdout),
        ],
    )


async def main():
    """Main entry point."""
    # Set up logging
    setup_logging()
    
    # Log startup information
    logger = logging.getLogger(__name__)
    logger.info(f"Starting Databricks MCP server v{settings.VERSION}")
    logger.info(f"Databricks host: {settings.DATABRICKS_HOST}")
    
    # Start the MCP server
    await start_mcp_server()


if __name__ == "__main__":
    # Parse command line arguments
    import argparse
    
    parser = argparse.ArgumentParser(description="Databricks MCP Server")
    parser.add_argument(
        "--log-level",
        choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
        help="Set the log level",
    )
    
    args = parser.parse_args()
    
    # Set up logging with command line arguments
    setup_logging(args.log_level)
    
    # Run the main function
    asyncio.run(main()) 
```

--------------------------------------------------------------------------------
/scripts/run_client_test.ps1:
--------------------------------------------------------------------------------

```
#!/usr/bin/env pwsh
# PowerShell script to run the MCP client test

# Check if the virtual environment exists
if (-not (Test-Path -Path ".venv")) {
    Write-Host "Virtual environment not found. Please create it first:"
    Write-Host "uv venv"
    exit 1
}

# Activate virtual environment
. .\.venv\Scripts\Activate.ps1

# Make sure there are no existing MCP server processes
$serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*start_mcp_server.ps1*" }
if ($serverProcesses) {
    Write-Host "Found existing MCP server processes, stopping them first..."
    $serverProcesses | ForEach-Object { 
        Stop-Process -Id $_.Id -Force 
        Write-Host "Stopped process $($_.Id)"
    }
}

# Set timeout in seconds
$timeout = 60

# Run the test with a timeout
Write-Host "Running MCP client test with a $timeout second timeout..."
$job = Start-Job -ScriptBlock { 
    cd $using:PWD
    uv run mcp_client_test.py 
}

# Monitor the job and output in real-time
$start = Get-Date
while ($job.State -eq "Running") {
    # Get any new output
    $output = Receive-Job -Job $job
    if ($output) {
        Write-Host $output
    }
    
    # Check if we've hit the timeout
    $elapsed = (Get-Date) - $start
    if ($elapsed.TotalSeconds -gt $timeout) {
        Write-Host "Test is taking too long, terminating..."
        Stop-Job -Job $job
        break
    }
    
    # Sleep briefly
    Start-Sleep -Seconds 1
}

# Output final results
$output = Receive-Job -Job $job
if ($output) {
    Write-Host $output
}

Remove-Job -Job $job -Force

# Clean up any leftover processes
$serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*start_mcp_server.ps1*" }
if ($serverProcesses) {
    Write-Host "Cleaning up any remaining MCP server processes..."
    $serverProcesses | ForEach-Object { 
        Stop-Process -Id $_.Id -Force 
        Write-Host "Stopped process $($_.Id)"
    }
} 
```

--------------------------------------------------------------------------------
/scripts/run_list_tools.ps1:
--------------------------------------------------------------------------------

```
#!/usr/bin/env pwsh
# PowerShell script to run the simple tool lister

# Check if virtual environment exists
if (-not (Test-Path -Path ".venv")) {
    Write-Host "Virtual environment not found. Please create it first:"
    Write-Host "uv venv"
    exit 1
}

# Activate virtual environment
. .\.venv\Scripts\Activate.ps1

# Make sure there are no existing MCP server processes
$serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*start_mcp_server.ps1*" }
if ($serverProcesses) {
    Write-Host "Found existing MCP server processes, stopping them first..."
    $serverProcesses | ForEach-Object { 
        Stop-Process -Id $_.Id -Force 
        Write-Host "Stopped process $($_.Id)"
    }
}

# Set timeout in seconds
$timeout = 20

# Run the CLI command with a timeout
Write-Host "Running CLI tool listing with a $timeout second timeout..."
$job = Start-Job -ScriptBlock { 
    cd $using:PWD
    cd ..
    uv run -m src.cli.commands list-tools
}

# Monitor the job and output in real-time
$start = Get-Date
while ($job.State -eq "Running") {
    # Get any new output
    $output = Receive-Job -Job $job
    if ($output) {
        Write-Host $output
    }
    
    # Check if we've hit the timeout
    $elapsed = (Get-Date) - $start
    if ($elapsed.TotalSeconds -gt $timeout) {
        Write-Host "Command is taking too long, terminating..."
        Stop-Job -Job $job
        break
    }
    
    # Sleep briefly
    Start-Sleep -Milliseconds 500
}

# Output final results
$output = Receive-Job -Job $job
if ($output) {
    Write-Host $output
}

Remove-Job -Job $job -Force

# Clean up any leftover processes
$serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*start_mcp_server.ps1*" }
if ($serverProcesses) {
    Write-Host "Cleaning up any remaining MCP server processes..."
    $serverProcesses | ForEach-Object { 
        Stop-Process -Id $_.Id -Force 
        Write-Host "Stopped process $($_.Id)"
    }
} 
```

--------------------------------------------------------------------------------
/scripts/run_mcp_client_test.ps1:
--------------------------------------------------------------------------------

```
#!/usr/bin/env pwsh
# PowerShell script to run the MCP client test

# Check if the virtual environment exists
if (-not (Test-Path -Path ".venv")) {
    Write-Host "Virtual environment not found. Please create it first:"
    Write-Host "uv venv"
    exit 1
}

# Activate virtual environment
. .\.venv\Scripts\Activate.ps1

# Make sure there are no existing MCP server processes
$serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*databricks_mcp_server*" }
if ($serverProcesses) {
    Write-Host "Found existing MCP server processes, stopping them first..."
    $serverProcesses | ForEach-Object { 
        Stop-Process -Id $_.Id -Force 
        Write-Host "Stopped process $($_.Id)"
    }
}

# Set timeout in seconds
$timeout = 60

# Run the test with a timeout
Write-Host "Running MCP client test with a $timeout second timeout..."
$job = Start-Job -ScriptBlock { 
    cd $using:PWD
    
    # Run the MCP client test
    uv run -m tests.mcp_client_test
}

# Monitor the job and output in real-time
$start = Get-Date
while ($job.State -eq "Running") {
    # Get any new output
    $output = Receive-Job -Job $job
    if ($output) {
        Write-Host $output
    }
    
    # Check if we've hit the timeout
    $elapsed = (Get-Date) - $start
    if ($elapsed.TotalSeconds -gt $timeout) {
        Write-Host "Test is taking too long, terminating..."
        Stop-Job -Job $job
        break
    }
    
    # Sleep briefly
    Start-Sleep -Seconds 1
}

# Output final results
$output = Receive-Job -Job $job
if ($output) {
    Write-Host $output
}

Remove-Job -Job $job -Force

# Clean up any leftover processes
$serverProcesses = Get-Process -Name pwsh | Where-Object { $_.CommandLine -like "*databricks_mcp_server*" }
if ($serverProcesses) {
    Write-Host "Cleaning up any remaining MCP server processes..."
    $serverProcesses | ForEach-Object { 
        Stop-Process -Id $_.Id -Force 
        Write-Host "Stopped process $($_.Id)"
    }
} 
```

--------------------------------------------------------------------------------
/SUMMARY.md:
--------------------------------------------------------------------------------

```markdown
# Databricks MCP Server - Project Summary

## Overview

We've successfully created a Databricks MCP (Model Context Protocol) server that provides tools for interacting with Databricks APIs. The server follows the MCP standard, which allows AI models to interact with external tools and services in a standardized way.

## Key Accomplishments

1. **Server Implementation**:
   - Created a `DatabricksMCPServer` class that inherits from `FastMCP`
   - Implemented the MCP protocol for communication with clients
   - Set up proper error handling and logging

2. **Tool Registration**:
   - Registered tools for managing Databricks resources
   - Implemented proper parameter validation and error handling
   - Added detailed descriptions for each tool

3. **API Integration**:
   - Implemented functions for interacting with Databricks APIs
   - Set up proper authentication using Databricks tokens
   - Added error handling for API requests

4. **Testing**:
   - Created a direct test script to verify server functionality
   - Successfully tested the `list_clusters` tool
   - Verified that the server can connect to Databricks and retrieve data

5. **Documentation**:
   - Created a README file with installation and usage instructions
   - Documented available tools and their parameters
   - Added a requirements.txt file with necessary dependencies

## Next Steps

1. **Additional Tools**:
   - Implement more tools for managing Databricks resources
   - Add support for Unity Catalog management
   - Add support for Delta Live Tables pipelines

2. **Enhanced Testing**:
   - Create more comprehensive test scripts
   - Add unit tests for individual components
   - Set up continuous integration

3. **Deployment**:
   - Create Docker container for easy deployment
   - Add support for running as a service
   - Implement authentication for the MCP server

4. **Client Integration**:
   - Create example clients for different AI models
   - Add support for popular AI platforms
   - Create documentation for client integration 
```

--------------------------------------------------------------------------------
/src/server/app.py:
--------------------------------------------------------------------------------

```python
"""
FastAPI application for Databricks API.

This is a stub module that provides compatibility with existing tests.
The actual implementation uses the MCP protocol directly.
"""

from fastapi import FastAPI

from src.api import clusters, dbfs, jobs, notebooks, sql
from src.core.config import settings


def create_app() -> FastAPI:
    """
    Create and configure the FastAPI application.
    
    Returns:
        FastAPI: The configured FastAPI application
    """
    app = FastAPI(
        title="Databricks API",
        description="API for interacting with Databricks services",
        version=settings.VERSION,
    )
    
    # Add routes
    @app.get("/api/2.0/clusters/list")
    async def list_clusters():
        """List all clusters."""
        result = await clusters.list_clusters()
        return result
    
    @app.get("/api/2.0/clusters/get/{cluster_id}")
    async def get_cluster(cluster_id: str):
        """Get cluster details."""
        result = await clusters.get_cluster(cluster_id)
        return result
    
    @app.post("/api/2.0/clusters/create")
    async def create_cluster(request_data: dict):
        """Create a new cluster."""
        result = await clusters.create_cluster(request_data)
        return result
    
    @app.post("/api/2.0/clusters/delete")
    async def terminate_cluster(request_data: dict):
        """Terminate a cluster."""
        result = await clusters.terminate_cluster(request_data.get("cluster_id"))
        return result
    
    @app.post("/api/2.0/clusters/start")
    async def start_cluster(request_data: dict):
        """Start a cluster."""
        result = await clusters.start_cluster(request_data.get("cluster_id"))
        return result
    
    @app.post("/api/2.0/clusters/resize")
    async def resize_cluster(request_data: dict):
        """Resize a cluster."""
        result = await clusters.resize_cluster(
            request_data.get("cluster_id"),
            request_data.get("num_workers")
        )
        return result
    
    @app.post("/api/2.0/clusters/restart")
    async def restart_cluster(request_data: dict):
        """Restart a cluster."""
        result = await clusters.restart_cluster(request_data.get("cluster_id"))
        return result
    
    return app 
```

--------------------------------------------------------------------------------
/src/core/auth.py:
--------------------------------------------------------------------------------

```python
"""
Authentication functionality for the Databricks MCP server.
"""

import logging
from typing import Dict, Optional

from fastapi import Depends, HTTPException, Security, status
from fastapi.security import APIKeyHeader

from src.core.config import settings

# Configure logging
logger = logging.getLogger(__name__)

# API key header scheme
API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False)


async def validate_api_key(api_key: Optional[str] = Security(API_KEY_HEADER)) -> Dict[str, str]:
    """
    Validate API key for protected endpoints.
    
    Args:
        api_key: The API key from the request header
        
    Returns:
        Dictionary with authentication info
        
    Raises:
        HTTPException: If authentication fails
    """
    # For now, we're using a simple token comparison
    # In a production environment, you might want to use a database or more secure method
    
    # Check if API key is required in the current environment
    if not settings.DEBUG:
        if not api_key:
            logger.warning("Authentication failed: Missing API key")
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Missing API key",
                headers={"WWW-Authenticate": "ApiKey"},
            )
            
        # In a real scenario, you would validate against a secure storage
        # For demo purposes, we'll just check against an environment variable
        # NEVER do this in production - use a proper authentication system!
        valid_keys = ["test-api-key"]  # Replace with actual implementation
        
        if api_key not in valid_keys:
            logger.warning("Authentication failed: Invalid API key")
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid API key",
                headers={"WWW-Authenticate": "ApiKey"},
            )
    
    # Return authentication info
    return {"authenticated": True}


def get_current_user():
    """
    Dependency to get current user.
    
    For future implementation of user-specific functionality.
    Currently returns a placeholder.
    """
    # This would be expanded in a real application with actual user information
    return {"username": "admin"} 
```

--------------------------------------------------------------------------------
/src/core/config.py:
--------------------------------------------------------------------------------

```python
"""
Configuration settings for the Databricks MCP server.
"""

import os
from typing import Any, Dict, Optional

# Import dotenv if available, but don't require it
try:
    from dotenv import load_dotenv
    # Load .env file if it exists
    load_dotenv()
    print("Successfully loaded dotenv")
except ImportError:
    print("WARNING: python-dotenv not found, environment variables must be set manually")
    # We'll just rely on OS environment variables being set manually

from pydantic import field_validator
from pydantic_settings import BaseSettings

# Version
VERSION = "0.1.0"


class Settings(BaseSettings):
    """Base settings for the application."""

    # Databricks API configuration
    DATABRICKS_HOST: str = os.environ.get("DATABRICKS_HOST", "https://example.databricks.net")
    DATABRICKS_TOKEN: str = os.environ.get("DATABRICKS_TOKEN", "dapi_token_placeholder")

    # Server configuration
    SERVER_HOST: str = os.environ.get("SERVER_HOST", "0.0.0.0") 
    SERVER_PORT: int = int(os.environ.get("SERVER_PORT", "8000"))
    DEBUG: bool = os.environ.get("DEBUG", "False").lower() == "true"

    # Logging
    LOG_LEVEL: str = os.environ.get("LOG_LEVEL", "INFO")
    
    # Version
    VERSION: str = VERSION

    @field_validator("DATABRICKS_HOST")
    def validate_databricks_host(cls, v: str) -> str:
        """Validate Databricks host URL."""
        if not v.startswith(("https://", "http://")):
            raise ValueError("DATABRICKS_HOST must start with http:// or https://")
        return v

    class Config:
        """Pydantic configuration."""

        env_file = ".env"
        case_sensitive = True


# Create global settings instance
settings = Settings()


def get_api_headers() -> Dict[str, str]:
    """Get headers for Databricks API requests."""
    return {
        "Authorization": f"Bearer {settings.DATABRICKS_TOKEN}",
        "Content-Type": "application/json",
    }


def get_databricks_api_url(endpoint: str) -> str:
    """
    Construct the full Databricks API URL.
    
    Args:
        endpoint: The API endpoint path, e.g., "/api/2.0/clusters/list"
    
    Returns:
        Full URL to the Databricks API endpoint
    """
    # Ensure endpoint starts with a slash
    if not endpoint.startswith("/"):
        endpoint = f"/{endpoint}"

    # Remove trailing slash from host if present
    host = settings.DATABRICKS_HOST.rstrip("/")
    
    return f"{host}{endpoint}" 
```

--------------------------------------------------------------------------------
/src/cli/commands.py:
--------------------------------------------------------------------------------

```python
"""
Command-line interface for the Databricks MCP server.

This module provides command-line functionality for interacting with the Databricks MCP server.
"""

import argparse
import asyncio
import logging
import sys
from typing import List, Optional

from src.server.databricks_mcp_server import DatabricksMCPServer, main as server_main

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)


def parse_args(args: Optional[List[str]] = None) -> argparse.Namespace:
    """Parse command-line arguments."""
    parser = argparse.ArgumentParser(description="Databricks MCP Server CLI")
    
    # Create subparsers for different commands
    subparsers = parser.add_subparsers(dest="command", help="Command to run")
    
    # Start server command
    start_parser = subparsers.add_parser("start", help="Start the MCP server")
    start_parser.add_argument(
        "--debug", action="store_true", help="Enable debug logging"
    )
    
    # List tools command
    list_parser = subparsers.add_parser("list-tools", help="List available tools")
    
    # Version command
    subparsers.add_parser("version", help="Show server version")
    
    return parser.parse_args(args)


async def list_tools() -> None:
    """List all available tools in the server."""
    server = DatabricksMCPServer()
    tools = await server.list_tools()
    
    print("\nAvailable tools:")
    for tool in tools:
        print(f"  - {tool.name}: {tool.description}")


def show_version() -> None:
    """Show the server version."""
    server = DatabricksMCPServer()
    print(f"\nDatabricks MCP Server v{server.version}")


def main(args: Optional[List[str]] = None) -> int:
    """Main entry point for the CLI."""
    parsed_args = parse_args(args)
    
    # Set log level
    if hasattr(parsed_args, "debug") and parsed_args.debug:
        logging.getLogger().setLevel(logging.DEBUG)
    
    # Execute the appropriate command
    if parsed_args.command == "start":
        logger.info("Starting Databricks MCP server")
        asyncio.run(server_main())
    elif parsed_args.command == "list-tools":
        asyncio.run(list_tools())
    elif parsed_args.command == "version":
        show_version()
    else:
        # If no command is provided, show help
        parse_args(["--help"])
        return 1
    
    return 0


if __name__ == "__main__":
    sys.exit(main()) 
```

--------------------------------------------------------------------------------
/src/api/clusters.py:
--------------------------------------------------------------------------------

```python
"""
API for managing Databricks clusters.
"""

import logging
from typing import Any, Dict, List, Optional

from src.core.utils import DatabricksAPIError, make_api_request

# Configure logging
logger = logging.getLogger(__name__)


async def create_cluster(cluster_config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Create a new Databricks cluster.
    
    Args:
        cluster_config: Cluster configuration
        
    Returns:
        Response containing the cluster ID
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info("Creating new cluster")
    return make_api_request("POST", "/api/2.0/clusters/create", data=cluster_config)


async def terminate_cluster(cluster_id: str) -> Dict[str, Any]:
    """
    Terminate a Databricks cluster.
    
    Args:
        cluster_id: ID of the cluster to terminate
        
    Returns:
        Empty response on success
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Terminating cluster: {cluster_id}")
    return make_api_request("POST", "/api/2.0/clusters/delete", data={"cluster_id": cluster_id})


async def list_clusters() -> Dict[str, Any]:
    """
    List all Databricks clusters.
    
    Returns:
        Response containing a list of clusters
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info("Listing all clusters")
    return make_api_request("GET", "/api/2.0/clusters/list")


async def get_cluster(cluster_id: str) -> Dict[str, Any]:
    """
    Get information about a specific cluster.
    
    Args:
        cluster_id: ID of the cluster
        
    Returns:
        Response containing cluster information
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Getting information for cluster: {cluster_id}")
    return make_api_request("GET", "/api/2.0/clusters/get", params={"cluster_id": cluster_id})


async def start_cluster(cluster_id: str) -> Dict[str, Any]:
    """
    Start a terminated Databricks cluster.
    
    Args:
        cluster_id: ID of the cluster to start
        
    Returns:
        Empty response on success
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Starting cluster: {cluster_id}")
    return make_api_request("POST", "/api/2.0/clusters/start", data={"cluster_id": cluster_id})


async def resize_cluster(cluster_id: str, num_workers: int) -> Dict[str, Any]:
    """
    Resize a cluster by changing the number of workers.
    
    Args:
        cluster_id: ID of the cluster to resize
        num_workers: New number of workers
        
    Returns:
        Empty response on success
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Resizing cluster {cluster_id} to {num_workers} workers")
    return make_api_request(
        "POST", 
        "/api/2.0/clusters/resize", 
        data={"cluster_id": cluster_id, "num_workers": num_workers}
    )


async def restart_cluster(cluster_id: str) -> Dict[str, Any]:
    """
    Restart a Databricks cluster.
    
    Args:
        cluster_id: ID of the cluster to restart
        
    Returns:
        Empty response on success
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Restarting cluster: {cluster_id}")
    return make_api_request("POST", "/api/2.0/clusters/restart", data={"cluster_id": cluster_id}) 
```

--------------------------------------------------------------------------------
/docs/phase1.md:
--------------------------------------------------------------------------------

```markdown
Develop a Management Control Platform (MCP) server for Azure Databricks, utilising the following REST API endpoints:

**1. Cluster Management:**

- **Create Cluster:** `POST /api/2.0/clusters/create`
- **Terminate Cluster:** `POST /api/2.0/clusters/delete`
- **List Clusters:** `GET /api/2.0/clusters/list`

**2. Job Management:**

- **Create Job:** `POST /api/2.0/jobs/create`
- **Run Job:** `POST /api/2.0/jobs/run-now`
- **List Jobs:** `GET /api/2.0/jobs/list`

**3. Notebook Operations:**

- **Import Notebook:** `POST /api/2.0/workspace/import`
- **Export Notebook:** `GET /api/2.0/workspace/export`
- **List Notebooks:** `GET /api/2.0/workspace/list`

**4. Databricks File System (DBFS):**

- **Upload File:** `POST /api/2.0/dbfs/put`
- **List Files:** `GET /api/2.0/dbfs/list`
- **Delete File:** `POST /api/2.0/dbfs/delete`

**5. SQL Statement Execution:**

- **Execute SQL Statement:** `POST /api/2.0/sql/statements/execute`

**6. Unity Catalog Management:**

- **Catalog Operations:**
  - **Create Catalog:** `POST /api/2.0/unity-catalog/catalogs`
  - **List Catalogs:** `GET /api/2.0/unity-catalog/catalogs`
  - **Delete Catalog:** `DELETE /api/2.0/unity-catalog/catalogs/{name}`

- **Schema Operations:**
  - **Create Schema:** `POST /api/2.0/unity-catalog/schemas`
  - **List Schemas:** `GET /api/2.0/unity-catalog/schemas`
  - **Delete Schema:** `DELETE /api/2.0/unity-catalog/schemas/{full_name}`

- **Table Operations:**
  - **Create Table:** `POST /api/2.0/unity-catalog/tables`
  - **List Tables:** `GET /api/2.0/unity-catalog/tables`
  - **Delete Table:** `DELETE /api/2.0/unity-catalog/tables/{full_name}`

- **Data Lineage:**
  - **Get Table Lineage:** `GET /api/2.0/unity-catalog/lineage-tracking/table-lineage/{table_name}`
  - **Get Column Lineage:** `GET /api/2.0/unity-catalog/lineage-tracking/column-lineage/{column_name}`

**7. Delta Live Tables Pipelines:**

- **Pipeline Management:**
  - **Create Pipeline:** `POST /api/2.0/pipelines`
  - **List Pipelines:** `GET /api/2.0/pipelines`
  - **Get Pipeline:** `GET /api/2.0/pipelines/{pipeline_id}`
  - **Update Pipeline:** `PUT /api/2.0/pipelines/{pipeline_id}`
  - **Delete Pipeline:** `DELETE /api/2.0/pipelines/{pipeline_id}`

- **Pipeline Execution:**
  - **Start Update:** `POST /api/2.0/pipelines/{pipeline_id}/updates`
  - **List Updates:** `GET /api/2.0/pipelines/{pipeline_id}/updates`
  - **Get Update:** `GET /api/2.0/pipelines/{pipeline_id}/updates/{update_id}`

**8. Databricks SQL Queries:**

- **Query Management:**
  - **Create Query:** `POST /api/2.0/preview/sql/queries`
  - **List Queries:** `GET /api/2.0/preview/sql/queries`
  - **Get Query:** `GET /api/2.0/preview/sql/queries/{query_id}`
  - **Update Query:** `POST /api/2.0/preview/sql/queries/{query_id}`
  - **Delete Query:** `DELETE /api/2.0/preview/sql/queries/{query_id}`

**9. Model Serving Endpoints:**

- **Serving Endpoint Management:**
  - **Create Serving Endpoint:** `POST /api/2.0/serving-endpoints`
  - **Get Serving Endpoint:** `GET /api/2.0/serving-endpoints/{name}`
  - **Update Serving Endpoint Config:** `PUT /api/2.0/serving-endpoints/{name}/config`
  - **Delete Serving Endpoint:** `DELETE /api/2.0/serving-endpoints/{name}`

- **Querying Serving Endpoints:**
  - **Query Serving Endpoint:** `POST /serving-endpoints/{name}/invocations`

Integrating these API endpoints into our MCP server will enable comprehensive management of our Azure Databricks environment, covering clusters, jobs, notebooks, file systems, SQL execution, Unity Catalog, Delta Live Tables, SQL queries, and model serving. This will also provide a platform that we can add new features when needed.
```

--------------------------------------------------------------------------------
/tests/test_direct.py:
--------------------------------------------------------------------------------

```python
"""
Direct tests for the Databricks MCP server.

This module contains tests that directly instantiate and test the server without using MCP protocol.
"""

import asyncio
import json
import logging
import sys
from typing import Dict, Any, List

from src.server.databricks_mcp_server import DatabricksMCPServer

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

async def test_list_clusters():
    """Test the list_clusters tool directly."""
    try:
        logger.info("Creating Databricks MCP server instance")
        server = DatabricksMCPServer()
        
        # Test the list_clusters tool
        tool_name = "list_clusters"
        logger.info(f"Testing tool: {tool_name}")
        
        # Call the tool with the required params parameter
        params: Dict[str, Any] = {"params": {}}
        result = await server.call_tool(tool_name, params)
        
        # Extract text content from the result
        if isinstance(result, List) and len(result) > 0:
            # Get the first item in the list
            item = result[0]
            
            # Check if the item has a 'text' attribute
            if hasattr(item, 'text'):
                text = item.text
                logger.info(f"Text content: {text[:100]}...")  # Show first 100 chars
                
                # Parse the JSON from the text
                try:
                    # First level of parsing (the text is a JSON string)
                    parsed_json = json.loads(text)
                    
                    # Check if the parsed JSON has a 'text' field (double JSON encoding)
                    if 'text' in parsed_json:
                        # Second level of parsing (the text field is also a JSON string)
                        inner_json = json.loads(parsed_json['text'])
                        logger.info(f"Parsed clusters data: {json.dumps(inner_json, indent=2)}")
                        
                        # Extract cluster information
                        if 'clusters' in inner_json:
                            clusters = inner_json['clusters']
                            logger.info(f"Found {len(clusters)} clusters")
                            
                            # Print information about each cluster
                            for i, cluster in enumerate(clusters):
                                logger.info(f"Cluster {i+1}:")
                                logger.info(f"  ID: {cluster.get('cluster_id')}")
                                logger.info(f"  Name: {cluster.get('cluster_name')}")
                                logger.info(f"  State: {cluster.get('state')}")
                            
                            return True
                    else:
                        logger.info(f"Parsed JSON: {json.dumps(parsed_json, indent=2)}")
                        
                except json.JSONDecodeError as e:
                    logger.error(f"Error parsing JSON: {e}")
        
        logger.error("Test failed: Could not parse cluster data")
        return False
        
    except Exception as e:
        logger.error(f"Error: {e}", exc_info=True)
        return False


async def main():
    """Run all tests."""
    logger.info("Running direct tests for Databricks MCP server")
    
    # Run tests
    success = await test_list_clusters()
    
    if success:
        logger.info("All tests passed!")
        return 0
    else:
        logger.error("Tests failed")
        return 1


if __name__ == "__main__":
    sys.exit(asyncio.run(main())) 
```

--------------------------------------------------------------------------------
/src/core/utils.py:
--------------------------------------------------------------------------------

```python
"""
Utility functions for the Databricks MCP server.
"""

import json
import logging
from typing import Any, Dict, List, Optional, Union

import requests
from requests.exceptions import RequestException

from src.core.config import get_api_headers, get_databricks_api_url

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)


class DatabricksAPIError(Exception):
    """Exception raised for errors in the Databricks API."""

    def __init__(self, message: str, status_code: Optional[int] = None, response: Optional[Any] = None):
        self.message = message
        self.status_code = status_code
        self.response = response
        super().__init__(self.message)


def make_api_request(
    method: str,
    endpoint: str,
    data: Optional[Dict[str, Any]] = None,
    params: Optional[Dict[str, Any]] = None,
    files: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
    """
    Make a request to the Databricks API.
    
    Args:
        method: HTTP method ("GET", "POST", "PUT", "DELETE")
        endpoint: API endpoint path
        data: Request body data
        params: Query parameters
        files: Files to upload
        
    Returns:
        Response data as a dictionary
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    url = get_databricks_api_url(endpoint)
    headers = get_api_headers()
    
    try:
        # Log the request (omit sensitive information)
        safe_data = "**REDACTED**" if data else None
        logger.debug(f"API Request: {method} {url} Params: {params} Data: {safe_data}")
        
        # Convert data to JSON string if provided
        json_data = json.dumps(data) if data and not files else data
        
        # Make the request
        response = requests.request(
            method=method,
            url=url,
            headers=headers,
            params=params,
            data=json_data if not files else data,
            files=files,
        )
        
        # Check for HTTP errors
        response.raise_for_status()
        
        # Parse response
        if response.content:
            return response.json()
        return {}
        
    except RequestException as e:
        # Handle request exceptions
        status_code = getattr(e.response, "status_code", None) if hasattr(e, "response") else None
        error_msg = f"API request failed: {str(e)}"
        
        # Try to extract error details from response
        error_response = None
        if hasattr(e, "response") and e.response is not None:
            try:
                error_response = e.response.json()
                error_msg = f"{error_msg} - {error_response.get('error', '')}"
            except ValueError:
                error_response = e.response.text
        
        # Log the error
        logger.error(f"API Error: {error_msg}", exc_info=True)
        
        # Raise custom exception
        raise DatabricksAPIError(error_msg, status_code, error_response) from e


def format_response(
    success: bool, 
    data: Optional[Union[Dict[str, Any], List[Any]]] = None, 
    error: Optional[str] = None,
    status_code: int = 200
) -> Dict[str, Any]:
    """
    Format a standardized response.
    
    Args:
        success: Whether the operation was successful
        data: Response data
        error: Error message if not successful
        status_code: HTTP status code
        
    Returns:
        Formatted response dictionary
    """
    response = {
        "success": success,
        "status_code": status_code,
    }
    
    if data is not None:
        response["data"] = data
        
    if error:
        response["error"] = error
        
    return response 
```

--------------------------------------------------------------------------------
/examples/mcp_client_usage.py:
--------------------------------------------------------------------------------

```python
"""
Example of using the MCP client with the Databricks MCP server.

This example shows how to use the MCP client to connect to the Databricks MCP server
and call its tools through the MCP protocol.
"""

import asyncio
import json
import logging
import os
import sys
from typing import Any, Dict, List, Optional

from mcp.client.stdio import StdioServerParameters, stdio_client
from mcp.client.session import ClientSession

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)


async def connect_and_list_tools():
    """Connect to the Databricks MCP server and list its tools."""
    logger.info("Connecting to Databricks MCP server...")
    
    # Define the environment variables the server needs
    env = os.environ.copy()
    
    # Create parameters for connecting to the server
    params = StdioServerParameters(
        command="pwsh",  # Use PowerShell
        args=["-File", "./scripts/start_server.ps1"],  # Run the startup script
        env=env  # Pass environment variables
    )
    
    # Use the client to start the server and connect to it
    logger.info("Launching server process...")
    
    async with stdio_client(params) as (recv, send):
        logger.info("Server launched, creating session...")
        session = ClientSession(recv, send)
        
        logger.info("Initializing session...")
        await session.initialize()
        
        # List available tools
        tools_response = await session.list_tools()
        tools = tools_response.tools
        
        print("\nAvailable Tools:")
        print("================")
        for tool in tools:
            print(f"- {tool.name}: {tool.description}")
        
        # Let the user select a tool to run
        if tools:
            while True:
                print("\nSelect a tool to run (or 'quit' to exit):")
                for i, tool in enumerate(tools):
                    print(f"{i+1}. {tool.name}")
                
                choice = input("Enter choice (number or name): ")
                
                if choice.lower() == 'quit':
                    break
                
                # Find the selected tool
                selected_tool = None
                if choice.isdigit():
                    idx = int(choice) - 1
                    if 0 <= idx < len(tools):
                        selected_tool = tools[idx]
                else:
                    for tool in tools:
                        if tool.name == choice:
                            selected_tool = tool
                            break
                
                if not selected_tool:
                    print("Invalid choice. Please try again.")
                    continue
                
                # Call the selected tool
                print(f"\nRunning tool: {selected_tool.name}")
                print("Enter parameters as JSON (empty for no parameters):")
                params_str = input("> ")
                
                try:
                    params = json.loads(params_str) if params_str else {}
                    result = await session.call_tool(selected_tool.name, params)
                    print("\nResult:")
                    print(json.dumps(result, indent=2))
                except Exception as e:
                    print(f"Error calling tool: {e}")


async def main():
    """Run the example."""
    print("Databricks MCP Server - MCP Client Usage Example")
    print("=============================================")
    
    try:
        await connect_and_list_tools()
        return 0
    except Exception as e:
        logger.error(f"Error: {e}", exc_info=True)
        return 1


if __name__ == "__main__":
    sys.exit(asyncio.run(main())) 
```

--------------------------------------------------------------------------------
/src/api/jobs.py:
--------------------------------------------------------------------------------

```python
"""
API for managing Databricks jobs.
"""

import logging
from typing import Any, Dict, List, Optional

from src.core.utils import DatabricksAPIError, make_api_request

# Configure logging
logger = logging.getLogger(__name__)


async def create_job(job_config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Create a new Databricks job.
    
    Args:
        job_config: Job configuration
        
    Returns:
        Response containing the job ID
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info("Creating new job")
    return make_api_request("POST", "/api/2.0/jobs/create", data=job_config)


async def run_job(job_id: int, notebook_params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    """
    Run a job now.
    
    Args:
        job_id: ID of the job to run
        notebook_params: Optional parameters for the notebook
        
    Returns:
        Response containing the run ID
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Running job: {job_id}")
    
    run_params = {"job_id": job_id}
    if notebook_params:
        run_params["notebook_params"] = notebook_params
        
    return make_api_request("POST", "/api/2.0/jobs/run-now", data=run_params)


async def list_jobs() -> Dict[str, Any]:
    """
    List all jobs.
    
    Returns:
        Response containing a list of jobs
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info("Listing all jobs")
    return make_api_request("GET", "/api/2.0/jobs/list")


async def get_job(job_id: int) -> Dict[str, Any]:
    """
    Get information about a specific job.
    
    Args:
        job_id: ID of the job
        
    Returns:
        Response containing job information
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Getting information for job: {job_id}")
    return make_api_request("GET", "/api/2.0/jobs/get", params={"job_id": job_id})


async def update_job(job_id: int, new_settings: Dict[str, Any]) -> Dict[str, Any]:
    """
    Update an existing job.
    
    Args:
        job_id: ID of the job to update
        new_settings: New job settings
        
    Returns:
        Empty response on success
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Updating job: {job_id}")
    
    update_data = {
        "job_id": job_id,
        "new_settings": new_settings
    }
    
    return make_api_request("POST", "/api/2.0/jobs/update", data=update_data)


async def delete_job(job_id: int) -> Dict[str, Any]:
    """
    Delete a job.
    
    Args:
        job_id: ID of the job to delete
        
    Returns:
        Empty response on success
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Deleting job: {job_id}")
    return make_api_request("POST", "/api/2.0/jobs/delete", data={"job_id": job_id})


async def get_run(run_id: int) -> Dict[str, Any]:
    """
    Get information about a specific job run.
    
    Args:
        run_id: ID of the run
        
    Returns:
        Response containing run information
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Getting information for run: {run_id}")
    return make_api_request("GET", "/api/2.0/jobs/runs/get", params={"run_id": run_id})


async def cancel_run(run_id: int) -> Dict[str, Any]:
    """
    Cancel a job run.
    
    Args:
        run_id: ID of the run to cancel
        
    Returns:
        Empty response on success
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Cancelling run: {run_id}")
    return make_api_request("POST", "/api/2.0/jobs/runs/cancel", data={"run_id": run_id}) 
```

--------------------------------------------------------------------------------
/examples/direct_usage.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""
Databricks MCP Server - Direct Usage Example

This example demonstrates how to directly use the Databricks MCP server
without going through the MCP protocol. It shows how to instantiate the
server class and call its methods directly.
"""

import json
import logging
import os
import sys
from typing import Any, Dict, List, Optional

# Add the parent directory to the Python path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))

from src.server.databricks_mcp_server import DatabricksMCPServer

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

def print_section_header(title: str) -> None:
    """Print a section header with the given title."""
    print(f"\n{title}")
    print("=" * len(title))

def print_clusters(clusters: List[Dict[str, Any]]) -> None:
    """Print information about Databricks clusters."""
    print_section_header("Databricks Clusters")
    
    for i, cluster in enumerate(clusters, 1):
        print(f"\nCluster {i}:")
        print(f"  ID: {cluster.get('cluster_id')}")
        print(f"  Name: {cluster.get('cluster_name')}")
        print(f"  State: {cluster.get('state')}")
        print(f"  Spark Version: {cluster.get('spark_version')}")
        print(f"  Node Type: {cluster.get('node_type_id')}")

def print_notebooks(notebooks: List[Dict[str, Any]], path: str) -> None:
    """Print information about Databricks notebooks."""
    print_section_header(f"Databricks Notebooks in {path}")
    
    for notebook in notebooks:
        if notebook.get('object_type') == 'NOTEBOOK':
            print(f"\nNotebook: {notebook.get('path')}")
        elif notebook.get('object_type') == 'DIRECTORY':
            print(f"Directory: {notebook.get('path')}")

def print_jobs(jobs: List[Dict[str, Any]]) -> None:
    """Print information about Databricks jobs."""
    print_section_header("Databricks Jobs")
    
    for i, job in enumerate(jobs, 1):
        print(f"\nJob {i}:")
        print(f"  ID: {job.get('job_id')}")
        print(f"  Name: {job.get('settings', {}).get('name')}")
        print(f"  Created: {job.get('created_time')}")

def main() -> None:
    """Main function for the direct usage example."""
    print("\nDatabricks MCP Server - Direct Usage Example")
    print("===========================================")
    
    # Check for Databricks credentials
    if not os.environ.get("DATABRICKS_HOST") or not os.environ.get("DATABRICKS_TOKEN"):
        logger.error("Please set DATABRICKS_HOST and DATABRICKS_TOKEN environment variables")
        sys.exit(1)
    
    # Create the Databricks MCP server
    server = DatabricksMCPServer()
    
    try:
        # List clusters
        logger.info("Listing Databricks clusters...")
        clusters_result = server.list_clusters()
        clusters_data = json.loads(clusters_result)
        if 'error' in clusters_data:
            logger.error(f"Error listing clusters: {clusters_data['error']}")
        else:
            print_clusters(clusters_data.get('clusters', []))
        
        # List notebooks in root path
        logger.info("Listing Databricks notebooks...")
        notebooks_result = server.list_notebooks({"path": "/"})
        notebooks_data = json.loads(notebooks_result)
        if 'error' in notebooks_data:
            logger.error(f"Error listing notebooks: {notebooks_data['error']}")
        else:
            print_notebooks(notebooks_data.get('objects', []), "/")
        
        # List jobs
        logger.info("Listing Databricks jobs...")
        jobs_result = server.list_jobs()
        jobs_data = json.loads(jobs_result)
        if 'error' in jobs_data:
            logger.error(f"Error listing jobs: {jobs_data['error']}")
        else:
            print_jobs(jobs_data.get('jobs', []))
        
    except Exception as e:
        logger.error(f"An error occurred: {str(e)}")
        sys.exit(1)

if __name__ == "__main__":
    main() 
```

--------------------------------------------------------------------------------
/src/api/notebooks.py:
--------------------------------------------------------------------------------

```python
"""
API for managing Databricks notebooks.
"""

import base64
import logging
from typing import Any, Dict, List, Optional

from src.core.utils import DatabricksAPIError, make_api_request

# Configure logging
logger = logging.getLogger(__name__)


async def import_notebook(
    path: str,
    content: str,
    format: str = "SOURCE",
    language: Optional[str] = None,
    overwrite: bool = False,
) -> Dict[str, Any]:
    """
    Import a notebook into the workspace.
    
    Args:
        path: The path where the notebook should be stored
        content: The content of the notebook (base64 encoded)
        format: The format of the notebook (SOURCE, HTML, JUPYTER, DBC)
        language: The language of the notebook (SCALA, PYTHON, SQL, R)
        overwrite: Whether to overwrite an existing notebook
        
    Returns:
        Empty response on success
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Importing notebook to path: {path}")
    
    # Ensure content is base64 encoded
    if not is_base64(content):
        content = base64.b64encode(content.encode("utf-8")).decode("utf-8")
    
    import_data = {
        "path": path,
        "format": format,
        "content": content,
        "overwrite": overwrite,
    }
    
    if language:
        import_data["language"] = language
        
    return make_api_request("POST", "/api/2.0/workspace/import", data=import_data)


async def export_notebook(
    path: str,
    format: str = "SOURCE",
) -> Dict[str, Any]:
    """
    Export a notebook from the workspace.
    
    Args:
        path: The path of the notebook to export
        format: The format to export (SOURCE, HTML, JUPYTER, DBC)
        
    Returns:
        Response containing the notebook content
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Exporting notebook from path: {path}")
    
    params = {
        "path": path,
        "format": format,
    }
    
    response = make_api_request("GET", "/api/2.0/workspace/export", params=params)
    
    # Optionally decode base64 content
    if "content" in response and format in ["SOURCE", "JUPYTER"]:
        try:
            response["decoded_content"] = base64.b64decode(response["content"]).decode("utf-8")
        except Exception as e:
            logger.warning(f"Failed to decode notebook content: {str(e)}")
            
    return response


async def list_notebooks(path: str) -> Dict[str, Any]:
    """
    List notebooks in a workspace directory.
    
    Args:
        path: The path to list
        
    Returns:
        Response containing the directory listing
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Listing notebooks in path: {path}")
    return make_api_request("GET", "/api/2.0/workspace/list", params={"path": path})


async def delete_notebook(path: str, recursive: bool = False) -> Dict[str, Any]:
    """
    Delete a notebook or directory.
    
    Args:
        path: The path to delete
        recursive: Whether to recursively delete directories
        
    Returns:
        Empty response on success
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Deleting path: {path}")
    return make_api_request(
        "POST", 
        "/api/2.0/workspace/delete", 
        data={"path": path, "recursive": recursive}
    )


async def create_directory(path: str) -> Dict[str, Any]:
    """
    Create a directory in the workspace.
    
    Args:
        path: The path to create
        
    Returns:
        Empty response on success
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Creating directory: {path}")
    return make_api_request("POST", "/api/2.0/workspace/mkdirs", data={"path": path})


def is_base64(content: str) -> bool:
    """
    Check if a string is already base64 encoded.
    
    Args:
        content: The string to check
        
    Returns:
        True if the string is base64 encoded, False otherwise
    """
    try:
        return base64.b64encode(base64.b64decode(content)) == content.encode('utf-8')
    except Exception:
        return False 
```

--------------------------------------------------------------------------------
/src/api/sql.py:
--------------------------------------------------------------------------------

```python
"""
API for executing SQL statements on Databricks.
"""

import logging
from typing import Any, Dict, List, Optional

from src.core.utils import DatabricksAPIError, make_api_request

# Configure logging
logger = logging.getLogger(__name__)


async def execute_statement(
    statement: str,
    warehouse_id: str,
    catalog: Optional[str] = None,
    schema: Optional[str] = None,
    parameters: Optional[Dict[str, Any]] = None,
    row_limit: int = 10000,
    byte_limit: int = 100000000,  # 100MB
) -> Dict[str, Any]:
    """
    Execute a SQL statement.
    
    Args:
        statement: The SQL statement to execute
        warehouse_id: ID of the SQL warehouse to use
        catalog: Optional catalog to use
        schema: Optional schema to use
        parameters: Optional statement parameters
        row_limit: Maximum number of rows to return
        byte_limit: Maximum number of bytes to return
        
    Returns:
        Response containing query results
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Executing SQL statement: {statement[:100]}...")
    
    request_data = {
        "statement": statement,
        "warehouse_id": warehouse_id,
        "wait_timeout": "0s",  # Wait indefinitely
        "row_limit": row_limit,
        "byte_limit": byte_limit,
    }
    
    if catalog:
        request_data["catalog"] = catalog
        
    if schema:
        request_data["schema"] = schema
        
    if parameters:
        request_data["parameters"] = parameters
        
    return make_api_request("POST", "/api/2.0/sql/statements/execute", data=request_data)


async def execute_and_wait(
    statement: str,
    warehouse_id: str,
    catalog: Optional[str] = None,
    schema: Optional[str] = None,
    parameters: Optional[Dict[str, Any]] = None,
    timeout_seconds: int = 300,  # 5 minutes
    poll_interval_seconds: int = 1,
) -> Dict[str, Any]:
    """
    Execute a SQL statement and wait for completion.
    
    Args:
        statement: The SQL statement to execute
        warehouse_id: ID of the SQL warehouse to use
        catalog: Optional catalog to use
        schema: Optional schema to use
        parameters: Optional statement parameters
        timeout_seconds: Maximum time to wait for completion
        poll_interval_seconds: How often to poll for status
        
    Returns:
        Response containing query results
        
    Raises:
        DatabricksAPIError: If the API request fails
        TimeoutError: If query execution times out
    """
    import asyncio
    import time
    
    logger.info(f"Executing SQL statement with waiting: {statement[:100]}...")
    
    # Start execution
    response = await execute_statement(
        statement=statement,
        warehouse_id=warehouse_id,
        catalog=catalog,
        schema=schema,
        parameters=parameters,
    )
    
    statement_id = response.get("statement_id")
    if not statement_id:
        raise ValueError("No statement_id returned from execution")
    
    # Poll for completion
    start_time = time.time()
    status = response.get("status", {}).get("state", "")
    
    while status in ["PENDING", "RUNNING"]:
        # Check timeout
        if time.time() - start_time > timeout_seconds:
            raise TimeoutError(f"Query execution timed out after {timeout_seconds} seconds")
        
        # Wait before polling again
        await asyncio.sleep(poll_interval_seconds)
        
        # Check status
        status_response = await get_statement_status(statement_id)
        status = status_response.get("status", {}).get("state", "")
        
        if status == "SUCCEEDED":
            return status_response
        elif status in ["FAILED", "CANCELED", "CLOSED"]:
            error_message = status_response.get("status", {}).get("error", {}).get("message", "Unknown error")
            raise DatabricksAPIError(f"Query execution failed: {error_message}", response=status_response)
    
    return response


async def get_statement_status(statement_id: str) -> Dict[str, Any]:
    """
    Get the status of a SQL statement.
    
    Args:
        statement_id: ID of the statement to check
        
    Returns:
        Response containing statement status
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Getting status of SQL statement: {statement_id}")
    return make_api_request("GET", f"/api/2.0/sql/statements/{statement_id}", params={})


async def cancel_statement(statement_id: str) -> Dict[str, Any]:
    """
    Cancel a running SQL statement.
    
    Args:
        statement_id: ID of the statement to cancel
        
    Returns:
        Empty response on success
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Cancelling SQL statement: {statement_id}")
    return make_api_request("POST", f"/api/2.0/sql/statements/{statement_id}/cancel", data={}) 
```

--------------------------------------------------------------------------------
/tests/test_tools.py:
--------------------------------------------------------------------------------

```python
"""
Tests for individual tools in the Databricks MCP server.

This module contains tests for each individual tool in the Databricks MCP server.
"""

import asyncio
import json
import logging
import sys
from typing import Dict, Any, List

from src.server.databricks_mcp_server import DatabricksMCPServer

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


async def test_list_clusters():
    """Test the list_clusters tool."""
    logger.info("Testing list_clusters tool")
    server = DatabricksMCPServer()
    
    result = await server.call_tool("list_clusters", {"params": {}})
    
    # Check if result is valid
    assert isinstance(result, List), "Result should be a List"
    assert len(result) > 0, "Result should not be empty"
    assert hasattr(result[0], 'text'), "Result item should have 'text' attribute"
    
    # Parse the JSON data
    text = result[0].text
    data = json.loads(text)
    
    assert 'text' in data, "Result should contain 'text' field"
    inner_data = json.loads(data['text'])
    
    assert 'clusters' in inner_data, "Result should contain 'clusters' field"
    logger.info(f"Found {len(inner_data['clusters'])} clusters")
    
    return True


async def test_list_notebooks():
    """Test the list_notebooks tool."""
    logger.info("Testing list_notebooks tool")
    server = DatabricksMCPServer()
    
    result = await server.call_tool("list_notebooks", {"params": {"path": "/"}})
    
    # Check if result is valid
    assert isinstance(result, List), "Result should be a List"
    assert len(result) > 0, "Result should not be empty"
    assert hasattr(result[0], 'text'), "Result item should have 'text' attribute"
    
    # Parse the JSON data
    text = result[0].text
    data = json.loads(text)
    
    assert 'text' in data, "Result should contain 'text' field"
    inner_data = json.loads(data['text'])
    
    assert 'objects' in inner_data, "Result should contain 'objects' field"
    logger.info(f"Found {len(inner_data['objects'])} objects")
    
    return True


async def test_list_jobs():
    """Test the list_jobs tool."""
    logger.info("Testing list_jobs tool")
    server = DatabricksMCPServer()
    
    result = await server.call_tool("list_jobs", {"params": {}})
    
    # Check if result is valid
    assert isinstance(result, List), "Result should be a List"
    assert len(result) > 0, "Result should not be empty"
    assert hasattr(result[0], 'text'), "Result item should have 'text' attribute"
    
    # Parse the JSON data
    text = result[0].text
    data = json.loads(text)
    
    assert 'text' in data, "Result should contain 'text' field"
    inner_data = json.loads(data['text'])
    
    assert 'jobs' in inner_data, "Result should contain 'jobs' field"
    logger.info(f"Found {len(inner_data['jobs'])} jobs")
    
    return True


async def test_list_files():
    """Test the list_files tool."""
    logger.info("Testing list_files tool")
    server = DatabricksMCPServer()
    
    result = await server.call_tool("list_files", {"params": {"dbfs_path": "/"}})
    
    # Check if result is valid
    assert isinstance(result, List), "Result should be a List"
    assert len(result) > 0, "Result should not be empty"
    assert hasattr(result[0], 'text'), "Result item should have 'text' attribute"
    
    # Parse the JSON data
    text = result[0].text
    data = json.loads(text)
    
    assert 'text' in data, "Result should contain 'text' field"
    inner_data = json.loads(data['text'])
    
    assert 'files' in inner_data, "Result should contain 'files' field"
    logger.info(f"Found {len(inner_data['files'])} files")
    
    return True


async def main():
    """Run all tool tests."""
    logger.info("Running tool tests for Databricks MCP server")
    
    try:
        # Run tests
        tests = [
            ("list_clusters", test_list_clusters),
            ("list_notebooks", test_list_notebooks),
            ("list_jobs", test_list_jobs),
            ("list_files", test_list_files),
        ]
        
        success = True
        for name, test_func in tests:
            try:
                logger.info(f"Running test for {name}")
                result = await test_func()
                if result:
                    logger.info(f"Test for {name} passed")
                else:
                    logger.error(f"Test for {name} failed")
                    success = False
            except Exception as e:
                logger.error(f"Error in test for {name}: {e}", exc_info=True)
                success = False
        
        if success:
            logger.info("All tool tests passed!")
            return 0
        else:
            logger.error("Some tool tests failed")
            return 1
    except Exception as e:
        logger.error(f"Error in tests: {e}", exc_info=True)
        return 1


if __name__ == "__main__":
    sys.exit(asyncio.run(main())) 
```

--------------------------------------------------------------------------------
/tests/test_clusters.py:
--------------------------------------------------------------------------------

```python
"""
Tests for the clusters API.
"""

import json
import os
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from fastapi import status
from fastapi.testclient import TestClient

from src.api import clusters
from src.server.app import create_app


@pytest.fixture
def client():
    """Create a test client for the API."""
    app = create_app()
    return TestClient(app)


@pytest.fixture
def mock_cluster_response():
    """Mock response for cluster operations."""
    return {
        "cluster_id": "1234-567890-abcdef",
        "cluster_name": "Test Cluster",
        "spark_version": "10.4.x-scala2.12",
        "node_type_id": "Standard_D3_v2",
        "num_workers": 2,
        "state": "RUNNING",
        "creator_user_name": "[email protected]",
    }


@pytest.mark.asyncio
async def test_create_cluster():
    """Test creating a cluster."""
    # Mock the API call
    clusters.create_cluster = AsyncMock(return_value={"cluster_id": "1234-567890-abcdef"})
    
    # Create cluster config
    cluster_config = {
        "cluster_name": "Test Cluster",
        "spark_version": "10.4.x-scala2.12",
        "node_type_id": "Standard_D3_v2",
        "num_workers": 2,
    }
    
    # Call the function
    response = await clusters.create_cluster(cluster_config)
    
    # Check the response
    assert response["cluster_id"] == "1234-567890-abcdef"
    
    # Verify the mock was called with the correct arguments
    clusters.create_cluster.assert_called_once_with(cluster_config)


@pytest.mark.asyncio
async def test_list_clusters():
    """Test listing clusters."""
    # Mock the API call
    mock_response = {
        "clusters": [
            {
                "cluster_id": "1234-567890-abcdef",
                "cluster_name": "Test Cluster 1",
                "state": "RUNNING",
            },
            {
                "cluster_id": "9876-543210-fedcba",
                "cluster_name": "Test Cluster 2",
                "state": "TERMINATED",
            },
        ]
    }
    clusters.list_clusters = AsyncMock(return_value=mock_response)
    
    # Call the function
    response = await clusters.list_clusters()
    
    # Check the response
    assert len(response["clusters"]) == 2
    assert response["clusters"][0]["cluster_id"] == "1234-567890-abcdef"
    assert response["clusters"][1]["cluster_id"] == "9876-543210-fedcba"
    
    # Verify the mock was called
    clusters.list_clusters.assert_called_once()


@pytest.mark.asyncio
async def test_get_cluster():
    """Test getting cluster information."""
    # Mock the API call
    mock_response = {
        "cluster_id": "1234-567890-abcdef",
        "cluster_name": "Test Cluster",
        "state": "RUNNING",
    }
    clusters.get_cluster = AsyncMock(return_value=mock_response)
    
    # Call the function
    response = await clusters.get_cluster("1234-567890-abcdef")
    
    # Check the response
    assert response["cluster_id"] == "1234-567890-abcdef"
    assert response["state"] == "RUNNING"
    
    # Verify the mock was called with the correct arguments
    clusters.get_cluster.assert_called_once_with("1234-567890-abcdef")


@pytest.mark.asyncio
async def test_terminate_cluster():
    """Test terminating a cluster."""
    # Mock the API call
    clusters.terminate_cluster = AsyncMock(return_value={})
    
    # Call the function
    response = await clusters.terminate_cluster("1234-567890-abcdef")
    
    # Check the response
    assert response == {}
    
    # Verify the mock was called with the correct arguments
    clusters.terminate_cluster.assert_called_once_with("1234-567890-abcdef")


@pytest.mark.asyncio
async def test_start_cluster():
    """Test starting a cluster."""
    # Mock the API call
    clusters.start_cluster = AsyncMock(return_value={})
    
    # Call the function
    response = await clusters.start_cluster("1234-567890-abcdef")
    
    # Check the response
    assert response == {}
    
    # Verify the mock was called with the correct arguments
    clusters.start_cluster.assert_called_once_with("1234-567890-abcdef")


@pytest.mark.asyncio
async def test_resize_cluster():
    """Test resizing a cluster."""
    # Mock the API call
    clusters.resize_cluster = AsyncMock(return_value={})
    
    # Call the function
    response = await clusters.resize_cluster("1234-567890-abcdef", 4)
    
    # Check the response
    assert response == {}
    
    # Verify the mock was called with the correct arguments
    clusters.resize_cluster.assert_called_once_with("1234-567890-abcdef", 4)


@pytest.mark.asyncio
async def test_restart_cluster():
    """Test restarting a cluster."""
    # Mock the API call
    clusters.restart_cluster = AsyncMock(return_value={})
    
    # Call the function
    response = await clusters.restart_cluster("1234-567890-abcdef")
    
    # Check the response
    assert response == {}
    
    # Verify the mock was called with the correct arguments
    clusters.restart_cluster.assert_called_once_with("1234-567890-abcdef") 
```

--------------------------------------------------------------------------------
/tests/test_mcp_client.py:
--------------------------------------------------------------------------------

```python
"""
MCP client tests for the Databricks MCP server.

This module contains tests that use the MCP client to connect to and test the server.
"""

import asyncio
import json
import logging
import os
import sys
from typing import Any, Dict, List, Optional

import pytest
from mcp.client.stdio import StdioServerParameters, stdio_client
from mcp.client.session import ClientSession

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)


async def run_tests():
    """Connect to and test the Databricks MCP server."""
    logger.info("Connecting to Databricks MCP server...")
    
    # IMPORTANT: In MCP, the client launches the server process
    # We don't connect to an already running server!
    
    # Define the environment variables the server needs
    env = os.environ.copy()
    
    # Create parameters for connecting to the server
    # This will launch the server using the PowerShell script
    params = StdioServerParameters(
        command="pwsh",  # Use PowerShell
        args=["-File", "./scripts/start_server.ps1"],  # Run the startup script
        env=env  # Pass environment variables
    )
    
    # Use the client to start the server and connect to it
    logger.info("Launching server process...")
    
    try:
        async with stdio_client(params) as (recv, send):
            logger.info("Server launched, creating session...")
            session = ClientSession(recv, send)
            
            logger.info("Initializing session...")
            await session.initialize()
            
            # List available tools
            tools_response = await session.list_tools()
            tool_names = [t.name for t in tools_response.tools]
            logger.info(f"Available tools: {tool_names}")
            
            # Run tests for clusters
            if "list_clusters" in tool_names:
                await test_list_clusters(session)
                await test_get_cluster(session)
            else:
                logger.warning("Cluster tools not available")
            
            # Run tests for notebooks
            if "list_notebooks" in tool_names:
                await test_list_notebooks(session)
                await test_export_notebook(session)
            else:
                logger.warning("Notebook tools not available")
            
            logger.info("All tests completed successfully!")
            return True
    except Exception as e:
        logger.error(f"Error during tests: {e}", exc_info=True)
        return False


# Skip all these tests until we fix the hanging issues
@pytest.mark.skip(reason="Test causes hanging issues - needs further investigation")
@pytest.mark.asyncio
async def test_list_clusters(session):
    """Test listing clusters."""
    logger.info("Testing list_clusters...")
    response = await session.call_tool("list_clusters", {})
    logger.info(f"list_clusters response: {json.dumps(response, indent=2)}")
    assert "clusters" in response, "Response should contain 'clusters' key"
    return response


@pytest.mark.skip(reason="Test causes hanging issues - needs further investigation")
@pytest.mark.asyncio
async def test_get_cluster(session):
    """Test getting cluster details."""
    logger.info("Testing get_cluster...")
    
    # First list clusters to get a cluster_id
    clusters_response = await test_list_clusters(session)
    if not clusters_response.get("clusters"):
        logger.warning("No clusters found to test get_cluster")
        return
    
    # Get the first cluster ID
    cluster_id = clusters_response["clusters"][0]["cluster_id"]
    
    # Get cluster details
    response = await session.call_tool("get_cluster", {"cluster_id": cluster_id})
    logger.info(f"get_cluster response: {json.dumps(response, indent=2)}")
    assert "cluster_id" in response, "Response should contain 'cluster_id' key"
    assert response["cluster_id"] == cluster_id, "Returned cluster ID should match requested ID"


@pytest.mark.skip(reason="Test causes hanging issues - needs further investigation")
@pytest.mark.asyncio
async def test_list_notebooks(session):
    """Test listing notebooks."""
    logger.info("Testing list_notebooks...")
    response = await session.call_tool("list_notebooks", {"path": "/"})
    logger.info(f"list_notebooks response: {json.dumps(response, indent=2)}")
    assert "objects" in response, "Response should contain 'objects' key"
    return response


@pytest.mark.skip(reason="Test causes hanging issues - needs further investigation")
@pytest.mark.asyncio
async def test_export_notebook(session):
    """Test exporting a notebook."""
    logger.info("Testing export_notebook...")
    
    # First list notebooks to get a notebook path
    notebooks_response = await test_list_notebooks(session)
    if not notebooks_response.get("objects"):
        logger.warning("No notebooks found to test export_notebook")
        return
    
    # Find the first notebook (not a directory)
    notebook = None
    for obj in notebooks_response["objects"]:
        if obj.get("object_type") == "NOTEBOOK":
            notebook = obj
            break
    
    if not notebook:
        logger.warning("No notebooks found to test export_notebook")
        return
    
    # Get notebook path
    notebook_path = notebook["path"]
    
    # Export notebook
    response = await session.call_tool(
        "export_notebook", 
        {"path": notebook_path, "format": "SOURCE"}
    )
    logger.info(f"export_notebook response (truncated): {str(response)[:200]}...")
    assert "content" in response, "Response should contain 'content' key"


async def main():
    """Run the tests."""
    success = await run_tests()
    return 0 if success else 1


if __name__ == "__main__":
    """Run the tests directly."""
    sys.exit(asyncio.run(main())) 
```

--------------------------------------------------------------------------------
/tests/test_mcp_server.py:
--------------------------------------------------------------------------------

```python
"""
Tests for the Databricks MCP server.

This test file connects to the MCP server using the MCP client library
and tests the cluster and notebook operations.
"""

import asyncio
import json
import logging
import os
import subprocess
import sys
import time
from typing import Any, Dict, List, Optional, Tuple

import anyio
import pytest
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioServerParameters, stdio_client

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger(__name__)


class DatabricksMCPClient:
    """Client for testing the Databricks MCP server."""

    def __init__(self):
        self.session: Optional[ClientSession] = None
        self.stdio_transport: Optional[Tuple[Any, Any]] = None
        self.server_process: Optional[subprocess.Popen] = None

    async def connect(self):
        """Connect to the MCP server."""
        logger.info("Starting Databricks MCP server...")
        
        # Set up environment variables if needed
        # os.environ["DATABRICKS_HOST"] = "..."
        # os.environ["DATABRICKS_TOKEN"] = "..."
        
        # Start the server with SkipPrompt flag to avoid interactive prompts
        cmd = ["pwsh", "-File", "start_mcp_server.ps1", "-SkipPrompt"]
        self.server_process = subprocess.Popen(
            cmd, 
            stdout=subprocess.PIPE, 
            stderr=subprocess.PIPE,
            text=True,
            bufsize=1
        )
        
        # Wait for server to start
        time.sleep(2)
        
        # Connect to the server with SkipPrompt flag
        logger.info("Connecting to MCP server...")
        params = StdioServerParameters(
            command="pwsh",
            args=["-File", "start_mcp_server.ps1", "-SkipPrompt"],
            env=None
        )
        
        async with anyio.create_task_group() as tg:
            async with stdio_client(params) as stdio_transport:
                self.stdio_transport = stdio_transport
                stdio, write = stdio_transport
                self.session = ClientSession(stdio, write)
                await self.session.initialize()
                
                # Log available tools
                tools_response = await self.session.list_tools()
                logger.info(f"Available tools: {[t.name for t in tools_response.tools]}")
                
                # Run tests and then exit
                await tg.start(self.run_tests)
    
    async def run_tests(self):
        """Run the tests for the Databricks MCP server."""
        try:
            await self.test_list_clusters()
            await self.test_get_cluster()
            await self.test_list_notebooks()
            await self.test_export_notebook()
            logger.info("All tests completed successfully!")
        except Exception as e:
            logger.error(f"Test failed: {e}")
            raise
        finally:
            if self.server_process:
                self.server_process.terminate()
    
    async def test_list_clusters(self):
        """Test listing clusters."""
        logger.info("Testing list_clusters...")
        response = await self.session.call_tool("list_clusters", {})
        logger.info(f"list_clusters response: {json.dumps(response, indent=2)}")
        assert "clusters" in response, "Response should contain 'clusters' key"
        return response
    
    async def test_get_cluster(self):
        """Test getting cluster details."""
        logger.info("Testing get_cluster...")
        
        # First list clusters to get a cluster_id
        clusters_response = await self.test_list_clusters()
        if not clusters_response.get("clusters"):
            logger.warning("No clusters found to test get_cluster")
            return
        
        # Get the first cluster ID
        cluster_id = clusters_response["clusters"][0]["cluster_id"]
        
        # Get cluster details
        response = await self.session.call_tool("get_cluster", {"cluster_id": cluster_id})
        logger.info(f"get_cluster response: {json.dumps(response, indent=2)}")
        assert "cluster_id" in response, "Response should contain 'cluster_id' key"
        assert response["cluster_id"] == cluster_id, "Returned cluster ID should match requested ID"
    
    async def test_list_notebooks(self):
        """Test listing notebooks."""
        logger.info("Testing list_notebooks...")
        response = await self.session.call_tool("list_notebooks", {"path": "/"})
        logger.info(f"list_notebooks response: {json.dumps(response, indent=2)}")
        assert "objects" in response, "Response should contain 'objects' key"
        return response
    
    async def test_export_notebook(self):
        """Test exporting a notebook."""
        logger.info("Testing export_notebook...")
        
        # First list notebooks to get a notebook path
        notebooks_response = await self.test_list_notebooks()
        if not notebooks_response.get("objects"):
            logger.warning("No notebooks found to test export_notebook")
            return
        
        # Find the first notebook (not a directory)
        notebook = None
        for obj in notebooks_response["objects"]:
            if obj.get("object_type") == "NOTEBOOK":
                notebook = obj
                break
        
        if not notebook:
            logger.warning("No notebooks found to test export_notebook")
            return
        
        # Get notebook path
        notebook_path = notebook["path"]
        
        # Export notebook
        response = await self.session.call_tool(
            "export_notebook", 
            {"path": notebook_path, "format": "SOURCE"}
        )
        logger.info(f"export_notebook response (truncated): {str(response)[:200]}...")
        assert "content" in response, "Response should contain 'content' key"


# Skip this test for now as it causes hanging issues
@pytest.mark.skip(reason="Test causes hanging issues - needs further investigation")
@pytest.mark.asyncio
async def test_databricks_mcp_server():
    """Test the Databricks MCP server."""
    client = DatabricksMCPClient()
    await client.connect()


if __name__ == "__main__":
    """Run the tests directly."""
    asyncio.run(DatabricksMCPClient().connect()) 
```

--------------------------------------------------------------------------------
/src/api/dbfs.py:
--------------------------------------------------------------------------------

```python
"""
API for managing Databricks File System (DBFS).
"""

import base64
import logging
import os
from typing import Any, Dict, List, Optional, BinaryIO

from src.core.utils import DatabricksAPIError, make_api_request

# Configure logging
logger = logging.getLogger(__name__)


async def put_file(
    dbfs_path: str,
    file_content: bytes,
    overwrite: bool = True,
) -> Dict[str, Any]:
    """
    Upload a file to DBFS.
    
    Args:
        dbfs_path: The path where the file should be stored in DBFS
        file_content: The content of the file as bytes
        overwrite: Whether to overwrite an existing file
        
    Returns:
        Empty response on success
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Uploading file to DBFS path: {dbfs_path}")
    
    # Convert bytes to base64
    content_base64 = base64.b64encode(file_content).decode("utf-8")
    
    return make_api_request(
        "POST",
        "/api/2.0/dbfs/put",
        data={
            "path": dbfs_path,
            "contents": content_base64,
            "overwrite": overwrite,
        },
    )


async def upload_large_file(
    dbfs_path: str,
    local_file_path: str,
    overwrite: bool = True,
    buffer_size: int = 1024 * 1024,  # 1MB chunks
) -> Dict[str, Any]:
    """
    Upload a large file to DBFS in chunks.
    
    Args:
        dbfs_path: The path where the file should be stored in DBFS
        local_file_path: Local path to the file to upload
        overwrite: Whether to overwrite an existing file
        buffer_size: Size of chunks to upload
        
    Returns:
        Empty response on success
        
    Raises:
        DatabricksAPIError: If the API request fails
        FileNotFoundError: If the local file does not exist
    """
    logger.info(f"Uploading large file from {local_file_path} to DBFS path: {dbfs_path}")
    
    if not os.path.exists(local_file_path):
        raise FileNotFoundError(f"Local file not found: {local_file_path}")
    
    # Create a handle for the upload
    create_response = make_api_request(
        "POST",
        "/api/2.0/dbfs/create",
        data={
            "path": dbfs_path,
            "overwrite": overwrite,
        },
    )
    
    handle = create_response.get("handle")
    
    try:
        with open(local_file_path, "rb") as f:
            chunk_index = 0
            while True:
                chunk = f.read(buffer_size)
                if not chunk:
                    break
                    
                # Convert chunk to base64
                chunk_base64 = base64.b64encode(chunk).decode("utf-8")
                
                # Add to handle
                make_api_request(
                    "POST",
                    "/api/2.0/dbfs/add-block",
                    data={
                        "handle": handle,
                        "data": chunk_base64,
                    },
                )
                
                chunk_index += 1
                logger.debug(f"Uploaded chunk {chunk_index}")
        
        # Close the handle
        return make_api_request(
            "POST",
            "/api/2.0/dbfs/close",
            data={"handle": handle},
        )
        
    except Exception as e:
        # Attempt to abort the upload on error
        try:
            make_api_request(
                "POST",
                "/api/2.0/dbfs/close",
                data={"handle": handle},
            )
        except Exception:
            pass
        
        logger.error(f"Error uploading file: {str(e)}")
        raise


async def get_file(
    dbfs_path: str,
    offset: int = 0,
    length: int = 1024 * 1024,  # Default to 1MB
) -> Dict[str, Any]:
    """
    Get the contents of a file from DBFS.
    
    Args:
        dbfs_path: The path of the file in DBFS
        offset: Starting byte position
        length: Number of bytes to read
        
    Returns:
        Response containing the file content
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Reading file from DBFS path: {dbfs_path}")
    
    response = make_api_request(
        "GET",
        "/api/2.0/dbfs/read",
        params={
            "path": dbfs_path,
            "offset": offset,
            "length": length,
        },
    )
    
    # Decode base64 content
    if "data" in response:
        try:
            response["decoded_data"] = base64.b64decode(response["data"])
        except Exception as e:
            logger.warning(f"Failed to decode file content: {str(e)}")
            
    return response


async def list_files(dbfs_path: str) -> Dict[str, Any]:
    """
    List files and directories in a DBFS path.
    
    Args:
        dbfs_path: The path to list
        
    Returns:
        Response containing the directory listing
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Listing files in DBFS path: {dbfs_path}")
    return make_api_request("GET", "/api/2.0/dbfs/list", params={"path": dbfs_path})


async def delete_file(
    dbfs_path: str,
    recursive: bool = False,
) -> Dict[str, Any]:
    """
    Delete a file or directory from DBFS.
    
    Args:
        dbfs_path: The path to delete
        recursive: Whether to recursively delete directories
        
    Returns:
        Empty response on success
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Deleting DBFS path: {dbfs_path}")
    return make_api_request(
        "POST",
        "/api/2.0/dbfs/delete",
        data={
            "path": dbfs_path,
            "recursive": recursive,
        },
    )


async def get_status(dbfs_path: str) -> Dict[str, Any]:
    """
    Get the status of a file or directory.
    
    Args:
        dbfs_path: The path to check
        
    Returns:
        Response containing file status
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Getting status of DBFS path: {dbfs_path}")
    return make_api_request("GET", "/api/2.0/dbfs/get-status", params={"path": dbfs_path})


async def create_directory(dbfs_path: str) -> Dict[str, Any]:
    """
    Create a directory in DBFS.
    
    Args:
        dbfs_path: The path to create
        
    Returns:
        Empty response on success
        
    Raises:
        DatabricksAPIError: If the API request fails
    """
    logger.info(f"Creating DBFS directory: {dbfs_path}")
    return make_api_request("POST", "/api/2.0/dbfs/mkdirs", data={"path": dbfs_path}) 
```

--------------------------------------------------------------------------------
/src/server/databricks_mcp_server.py:
--------------------------------------------------------------------------------

```python
"""
Databricks MCP Server

This module implements a standalone MCP server that provides tools for interacting
with Databricks APIs. It follows the Model Context Protocol standard, communicating
via stdio and directly connecting to Databricks when tools are invoked.
"""

import asyncio
import json
import logging
import sys
import os
from typing import Any, Dict, List, Optional, Union, cast

from mcp.server import FastMCP
from mcp.types import TextContent
from mcp.server.stdio import stdio_server

from src.api import clusters, dbfs, jobs, notebooks, sql
from src.core.config import settings

# Configure logging
logging.basicConfig(
    level=getattr(logging, settings.LOG_LEVEL),
    filename="databricks_mcp.log",
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)


class DatabricksMCPServer(FastMCP):
    """An MCP server for Databricks APIs."""

    def __init__(self):
        """Initialize the Databricks MCP server."""
        super().__init__(name="databricks-mcp", 
                         version="1.0.0", 
                         instructions="Use this server to manage Databricks resources")
        logger.info("Initializing Databricks MCP server")
        logger.info(f"Databricks host: {settings.DATABRICKS_HOST}")
        
        # Register tools
        self._register_tools()
    
    def _register_tools(self):
        """Register all Databricks MCP tools."""
        
        # Cluster management tools
        @self.tool(
            name="list_clusters",
            description="List all Databricks clusters",
        )
        async def list_clusters(params: Dict[str, Any]) -> List[TextContent]:
            logger.info(f"Listing clusters with params: {params}")
            try:
                result = await clusters.list_clusters()
                return [{"text": json.dumps(result)}]
            except Exception as e:
                logger.error(f"Error listing clusters: {str(e)}")
                return [{"text": json.dumps({"error": str(e)})}]
        
        @self.tool(
            name="create_cluster",
            description="Create a new Databricks cluster with parameters: cluster_name (required), spark_version (required), node_type_id (required), num_workers, autotermination_minutes",
        )
        async def create_cluster(params: Dict[str, Any]) -> List[TextContent]:
            logger.info(f"Creating cluster with params: {params}")
            try:
                result = await clusters.create_cluster(params)
                return [{"text": json.dumps(result)}]
            except Exception as e:
                logger.error(f"Error creating cluster: {str(e)}")
                return [{"text": json.dumps({"error": str(e)})}]
        
        @self.tool(
            name="terminate_cluster",
            description="Terminate a Databricks cluster with parameter: cluster_id (required)",
        )
        async def terminate_cluster(params: Dict[str, Any]) -> List[TextContent]:
            logger.info(f"Terminating cluster with params: {params}")
            try:
                result = await clusters.terminate_cluster(params.get("cluster_id"))
                return [{"text": json.dumps(result)}]
            except Exception as e:
                logger.error(f"Error terminating cluster: {str(e)}")
                return [{"text": json.dumps({"error": str(e)})}]
        
        @self.tool(
            name="get_cluster",
            description="Get information about a specific Databricks cluster with parameter: cluster_id (required)",
        )
        async def get_cluster(params: Dict[str, Any]) -> List[TextContent]:
            logger.info(f"Getting cluster info with params: {params}")
            try:
                result = await clusters.get_cluster(params.get("cluster_id"))
                return [{"text": json.dumps(result)}]
            except Exception as e:
                logger.error(f"Error getting cluster info: {str(e)}")
                return [{"text": json.dumps({"error": str(e)})}]
        
        @self.tool(
            name="start_cluster",
            description="Start a terminated Databricks cluster with parameter: cluster_id (required)",
        )
        async def start_cluster(params: Dict[str, Any]) -> List[TextContent]:
            logger.info(f"Starting cluster with params: {params}")
            try:
                result = await clusters.start_cluster(params.get("cluster_id"))
                return [{"text": json.dumps(result)}]
            except Exception as e:
                logger.error(f"Error starting cluster: {str(e)}")
                return [{"text": json.dumps({"error": str(e)})}]
        
        # Job management tools
        @self.tool(
            name="list_jobs",
            description="List all Databricks jobs",
        )
        async def list_jobs(params: Dict[str, Any]) -> List[TextContent]:
            logger.info(f"Listing jobs with params: {params}")
            try:
                result = await jobs.list_jobs()
                return [{"text": json.dumps(result)}]
            except Exception as e:
                logger.error(f"Error listing jobs: {str(e)}")
                return [{"text": json.dumps({"error": str(e)})}]
        
        @self.tool(
            name="run_job",
            description="Run a Databricks job with parameters: job_id (required), notebook_params (optional)",
        )
        async def run_job(params: Dict[str, Any]) -> List[TextContent]:
            logger.info(f"Running job with params: {params}")
            try:
                notebook_params = params.get("notebook_params", {})
                result = await jobs.run_job(params.get("job_id"), notebook_params)
                return [{"text": json.dumps(result)}]
            except Exception as e:
                logger.error(f"Error running job: {str(e)}")
                return [{"text": json.dumps({"error": str(e)})}]
        
        # Notebook management tools
        @self.tool(
            name="list_notebooks",
            description="List notebooks in a workspace directory with parameter: path (required)",
        )
        async def list_notebooks(params: Dict[str, Any]) -> List[TextContent]:
            logger.info(f"Listing notebooks with params: {params}")
            try:
                result = await notebooks.list_notebooks(params.get("path"))
                return [{"text": json.dumps(result)}]
            except Exception as e:
                logger.error(f"Error listing notebooks: {str(e)}")
                return [{"text": json.dumps({"error": str(e)})}]
        
        @self.tool(
            name="export_notebook",
            description="Export a notebook from the workspace with parameters: path (required), format (optional, one of: SOURCE, HTML, JUPYTER, DBC)",
        )
        async def export_notebook(params: Dict[str, Any]) -> List[TextContent]:
            logger.info(f"Exporting notebook with params: {params}")
            try:
                format_type = params.get("format", "SOURCE")
                result = await notebooks.export_notebook(params.get("path"), format_type)
                
                # For notebooks, we might want to trim the response for readability
                content = result.get("content", "")
                if len(content) > 1000:
                    summary = f"{content[:1000]}... [content truncated, total length: {len(content)} characters]"
                    result["content"] = summary
                
                return [{"text": json.dumps(result)}]
            except Exception as e:
                logger.error(f"Error exporting notebook: {str(e)}")
                return [{"text": json.dumps({"error": str(e)})}]
        
        # DBFS tools
        @self.tool(
            name="list_files",
            description="List files and directories in a DBFS path with parameter: dbfs_path (required)",
        )
        async def list_files(params: Dict[str, Any]) -> List[TextContent]:
            logger.info(f"Listing files with params: {params}")
            try:
                result = await dbfs.list_files(params.get("dbfs_path"))
                return [{"text": json.dumps(result)}]
            except Exception as e:
                logger.error(f"Error listing files: {str(e)}")
                return [{"text": json.dumps({"error": str(e)})}]
        
        # SQL tools
        @self.tool(
            name="execute_sql",
            description="Execute a SQL statement with parameters: statement (required), warehouse_id (required), catalog (optional), schema (optional)",
        )
        async def execute_sql(params: Dict[str, Any]) -> List[TextContent]:
            logger.info(f"Executing SQL with params: {params}")
            try:
                statement = params.get("statement")
                warehouse_id = params.get("warehouse_id")
                catalog = params.get("catalog")
                schema = params.get("schema")
                
                result = await sql.execute_sql(statement, warehouse_id, catalog, schema)
                return [{"text": json.dumps(result)}]
            except Exception as e:
                logger.error(f"Error executing SQL: {str(e)}")
                return [{"text": json.dumps({"error": str(e)})}]


async def main():
    """Main entry point for the MCP server."""
    try:
        logger.info("Starting Databricks MCP server")
        server = DatabricksMCPServer()
        
        # Use the built-in method for stdio servers
        # This is the recommended approach for MCP servers
        await server.run_stdio_async()
            
    except Exception as e:
        logger.error(f"Error in Databricks MCP server: {str(e)}", exc_info=True)
        raise


if __name__ == "__main__":
    # Turn off buffering in stdout
    if hasattr(sys.stdout, 'reconfigure'):
        sys.stdout.reconfigure(line_buffering=True)
    
    asyncio.run(main()) 
```