#
tokens: 15757/50000 9/9 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .gitignore
├── .python-version
├── assets
│   ├── agent_usage.png
│   ├── schema_description.png
│   └── table_columns_description.png
├── databricks_formatter.py
├── databricks_sdk_utils.py
├── Dockerfile
├── LICENSE
├── main.py
├── main.py.backup
├── pyproject.toml
├── README.md
├── requirements.txt
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
1 | 3.10
2 | 
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
 1 | # Python-generated files
 2 | __pycache__/
 3 | *.py[oc]
 4 | build/
 5 | dist/
 6 | wheels/
 7 | *.egg-info
 8 | 
 9 | # Virtual environments
10 | .venv
11 | .env
12 | 
13 | .DS_Store
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | # Databricks MCP Server
  2 | 
  3 | - [Motivation](#motivation)
  4 | - [Overview](#overview)
  5 | - [Practical Benefits of UC Metadata for AI Agents](#practical-benefits-of-uc-metadata-for-ai-agents)
  6 | - [Available Tools and Features](#available-tools-and-features)
  7 | - [Setup](#setup)
  8 |   - [System Requirements](#system-requirements)
  9 |   - [Installation](#installation)
 10 | - [Permissions Requirements](#permissions-requirements)
 11 | - [Running the Server](#running-the-server)
 12 |   - [Standalone Mode](#standalone-mode)
 13 |   - [Using with Cursor](#using-with-cursor)
 14 | - [Example Usage Workflow (for an LLM Agent)](#example-usage-workflow-for-an-llm-agent)
 15 | - [Managing Metadata as Code with Terraform](#managing-metadata-as-code-with-terraform)
 16 | - [Handling Long-Running Queries](#handling-long-running-queries)
 17 | - [Dependencies](#dependencies)
 18 | 
 19 | ## Motivation
 20 | 
 21 | Databricks Unity Catalog (UC) allows for detailed documentation of your data assets, including catalogs, schemas, tables, and columns. Documenting these assets thoroughly requires an investment of time. One common question is: what are the practical benefits of this detailed metadata entry?
 22 | 
 23 | This MCP server provides a strong justification for that effort. It enables Large Language Models (LLMs) to directly access and utilize this Unity Catalog metadata. The more comprehensively your data is described in UC, the more effectively an LLM agent can understand your Databricks environment. This deeper understanding is crucial for the agent to autonomously construct more intelligent and accurate SQL queries to fulfill data requests.
 24 | 
 25 | ## Overview
 26 | 
 27 | This Model Context Protocol (MCP) server is designed to interact with Databricks, with a strong focus on leveraging Unity Catalog (UC) metadata and enabling comprehensive data lineage exploration. The primary goal is to equip an AI agent with a comprehensive set of tools, enabling it to become independent in answering questions about your data. By autonomously exploring UC, understanding data structures, analyzing data lineage (including notebook and job dependencies), and executing SQL queries, the agent can fulfill data requests without direct human intervention for each step.
 28 | 
 29 | Beyond traditional catalog browsing, this server enables agents to discover and analyze the actual code that processes your data. Through enhanced lineage capabilities, agents can identify notebooks and jobs that read from or write to tables, then examine the actual transformation logic, business rules, and data quality checks implemented in those notebooks. This creates a powerful feedback loop where agents not only understand *what* data exists, but also *how* it's processed and transformed.
 30 | 
 31 | When used in an Agent mode, it can successfully iterate over a number of requests to perform complex tasks, including data discovery, impact analysis, and code exploration.
 32 | 
 33 | ## Practical Benefits of UC Metadata for AI Agents
 34 | 
 35 | The tools provided by this MCP server are designed to parse and present the descriptions you've added to Unity Catalog, while also enabling deep exploration of your data processing code. This offers tangible advantages for LLM-based agents, directly impacting their ability to generate useful SQL and understand your data ecosystem:
 36 | 
 37 | *   **Clearer Data Context**: Agents can quickly understand the purpose of tables and columns, reducing ambiguity. This foundational understanding is the first step towards correct query formulation.
 38 | *   **More Accurate Query Generation**: Access to descriptions, data types, and relationships helps agents construct SQL queries with greater precision and semantic correctness.
 39 | *   **Efficient Data Exploration for Query Planning**: Metadata enables agents to navigate through catalogs and schemas more effectively, allowing them to identify the correct tables and columns to include in their SQL queries.
 40 | *   **Comprehensive Data Lineage**: Beyond table-to-table relationships, agents can discover notebooks and jobs that process data, enabling impact analysis and debugging of data pipeline issues.
 41 | *   **Code-Level Understanding**: Through notebook content exploration, agents can analyze actual transformation logic, business rules, and data quality checks, providing deeper insights into how data is processed and transformed.
 42 | *   **End-to-End Data Flow Analysis**: Agents can trace data from raw ingestion through transformation pipelines to final consumption, understanding both the structure and the processing logic at each step.
 43 | 
 44 | Well-documented metadata in Unity Catalog, when accessed via this server, allows an LLM agent to operate with better information and make more informed decisions, culminating in the generation of more effective SQL queries. For instance, schema descriptions help the agent identify relevant data sources for a query:
 45 | 
 46 | ![Schema Description in Unity Catalog](assets/schema_description.png)
 47 | *Fig 1: A schema in Unity Catalog with user-provided descriptions. This MCP server makes this information directly accessible to an LLM, informing its query strategy.*
 48 | 
 49 | Similarly, detailed comments at the column level clarify the semantics of each field, which is crucial for constructing accurate SQL conditions and selections:
 50 | 
 51 | ![Table Column Descriptions in Unity Catalog](assets/table_columns_description.png)
 52 | *Fig 2: Column-level descriptions in Unity Catalog. These details are passed to the LLM, aiding its understanding of the data structure for precise SQL generation.*
 53 | 
 54 | ## Available Tools and Features
 55 | 
 56 | This MCP server provides a suite of tools designed to empower an LLM agent interacting with Databricks:
 57 | 
 58 | **Core Capabilities:**
 59 | 
 60 | *   **Execute SQL Queries**: Run arbitrary SQL queries using the Databricks SDK via the `execute_sql_query(sql: str)` tool. This is ideal for targeted data retrieval or complex operations.
 61 | *   **LLM-Focused Output**: All descriptive tools return information in Markdown format, optimized for consumption by Large Language Models, making it easier for agents to parse and understand the context.
 62 | 
 63 | **Unity Catalog Exploration Tools:**
 64 | 
 65 | The server provides the following tools for navigating and understanding your Unity Catalog assets. These are designed to be used by an LLM agent to gather context before constructing queries or making decisions, in an agentic way.
 66 | 
 67 | 1.  `list_uc_catalogs() -> str`
 68 |     *   **Description**: Lists all available Unity Catalogs with their names, descriptions, and types.
 69 |     *   **When to use**: As a starting point to discover available data sources when you don't know specific catalog names. It provides a high-level overview of all accessible catalogs in the workspace.
 70 | 
 71 | 2.  `describe_uc_catalog(catalog_name: str) -> str`
 72 |     *   **Description**: Provides a summary of a specific Unity Catalog, listing all its schemas with their names and descriptions.
 73 |     *   **When to use**: When you know the catalog name and need to discover the schemas within it. This is often a precursor to describing a specific schema or table.
 74 |     *   **Args**:
 75 |         *   `catalog_name`: The name of the Unity Catalog to describe (e.g., `prod`, `dev`, `system`).
 76 | 
 77 | 3.  `describe_uc_schema(catalog_name: str, schema_name: str, include_columns: Optional[bool] = False) -> str`
 78 |     *   **Description**: Provides detailed information about a specific schema within a Unity Catalog. Returns all tables in the schema, optionally including their column details.
 79 |     *   **When to use**: To understand the contents of a schema, primarily its tables. Set `include_columns=True` to get column information, crucial for query construction but makes the output longer. If `include_columns=False`, only table names and descriptions are shown, useful for a quicker overview.
 80 |     *   **Args**:
 81 |         *   `catalog_name`: The name of the catalog containing the schema.
 82 |         *   `schema_name`: The name of the schema to describe.
 83 |         *   `include_columns`: If True, lists tables with their columns. Defaults to False for a briefer summary.
 84 | 
 85 | 4.  `describe_uc_table(full_table_name: str, include_lineage: Optional[bool] = False) -> str`
 86 |     *   **Description**: Provides a detailed description of a specific Unity Catalog table with comprehensive lineage capabilities.
 87 |     *   **When to use**: To understand the structure (columns, data types, partitioning) of a single table. This is essential before constructing SQL queries against the table. Optionally, it can include comprehensive lineage information that goes beyond traditional table-to-table dependencies:
 88 |         *   **Table Lineage**: Upstream tables (tables this table reads from) and downstream tables (tables that read from this table)
 89 |         *   **Notebook & Job Lineage**: Notebooks that read from or write to this table, including notebook name, workspace path, associated Databricks job information (job name, ID, task details)
 90 |         *   **Code Discovery**: The lineage provides notebook paths that enable the an agent to directly read notebook files within the current repo/workspace, allowing analysis of actual data transformation logic
 91 |     *   **Args**:
 92 |         *   `full_table_name`: The fully qualified three-part name of the table (e.g., `catalog.schema.table`).
 93 |         *   `include_lineage`: Set to True to fetch comprehensive lineage (tables, notebooks, jobs). Defaults to False. May take longer to retrieve but provides rich context for understanding data dependencies and enabling code exploration.
 94 | 
 95 | 5.  `execute_sql_query(sql: str) -> str`
 96 |     *   **Note**: This is the same tool listed under "Core Capabilities" but is repeated here in the context of a typical agent workflow involving UC exploration followed by querying.
 97 |     *   **Description**: Executes a given SQL query against the Databricks SQL warehouse and returns the formatted results.
 98 |     *   **When to use**: When you need to run specific SQL queries, such as SELECT, SHOW, or other DQL statements.
 99 |     *   **Args**:
100 |         *   `sql`: The complete SQL query string to execute.
101 | 
102 | ## Setup
103 | 
104 | ### System Requirements
105 | 
106 | -   Python 3.10+
107 | -   If you plan to install via `uv`, ensure it's [installed](https://docs.astral.sh/uv/getting-started/installation/#__tabbed_1_1)
108 | 
109 | ### Installation
110 | 
111 | 1.  Install the required dependencies:
112 | 
113 | ```bash
114 | pip install -r requirements.txt
115 | ```
116 | 
117 | Or if using `uv`:
118 | 
119 | ```bash
120 | uv pip install -r requirements.txt
121 | ```
122 | 
123 | 2.  Set up your environment variables:
124 | 
125 |     Option 1: Using a `.env` file (recommended)
126 | 
127 |     Create a `.env` file in the root directory of this project with your Databricks credentials:
128 | 
129 |     ```env
130 |     DATABRICKS_HOST="your-databricks-instance.cloud.databricks.com"
131 |     DATABRICKS_TOKEN="your-databricks-personal-access-token"
132 |     DATABRICKS_SQL_WAREHOUSE_ID="your-sql-warehouse-id"
133 |     ```
134 | 
135 |     Option 2: Setting environment variables directly
136 | 
137 |     ```bash
138 |     export DATABRICKS_HOST="your-databricks-instance.cloud.databricks.com"
139 |     export DATABRICKS_TOKEN="your-databricks-personal-access-token"
140 |     export DATABRICKS_SQL_WAREHOUSE_ID="your-sql-warehouse-id"
141 |     ```
142 | 
143 |     You can find your SQL Warehouse ID in the Databricks UI under "SQL Warehouses".
144 |     The `DATABRICKS_SQL_WAREHOUSE_ID` is primarily used for fetching table lineage and executing SQL queries via the `execute_sql_query` tool.
145 |     Metadata browsing tools (listing/describing catalogs, schemas, tables) use the Databricks SDK's general UC APIs and do not strictly require a SQL Warehouse ID unless lineage is requested.
146 | 
147 | ## Permissions Requirements
148 | 
149 | Before using this MCP server, ensure that the identity associated with the `DATABRICKS_TOKEN` (e.g., a user or service principal) has the necessary permissions:
150 | 
151 | 1.  **Unity Catalog Permissions**: 
152 |     -   `USE CATALOG` on catalogs to be accessed.
153 |     -   `USE SCHEMA` on schemas to be accessed.
154 |     -   `SELECT` on tables to be queried or described in detail (including column information).
155 |     -   To list all catalogs, appropriate metastore-level permissions might be needed or it will list catalogs where the user has at least `USE CATALOG`.
156 | 2.  **SQL Warehouse Permissions** (for `execute_sql_query` and lineage fetching):
157 |     -   `CAN_USE` permission on the SQL Warehouse specified by `DATABRICKS_SQL_WAREHOUSE_ID`.
158 | 3.  **Token Permissions**: 
159 |     -   The personal access token or service principal token should have the minimum necessary scopes. For Unity Catalog operations, this typically involves workspace access. For SQL execution, it involves SQL permissions.
160 |     -   It is strongly recommended to use a service principal with narrowly defined permissions for production or automated scenarios.
161 | 
162 | For security best practices, consider regularly rotating your access tokens and auditing query history and UC audit logs to monitor usage.
163 | 
164 | ## Running the Server
165 | 
166 | ### Standalone Mode
167 | 
168 | To run the server in standalone mode (e.g., for testing with Agent Composer):
169 | 
170 | ```bash
171 | python main.py
172 | ```
173 | 
174 | This will start the MCP server using stdio transport, which can be used with Agent Composer or other MCP clients.
175 | 
176 | ### Using with Cursor
177 | 
178 | To use this MCP server with [Cursor](https://cursor.sh/), configure it in your Cursor settings (`~/.cursor/mcp.json`):
179 | 
180 | 1. Create a `.cursor` directory in your home directory if it doesn't already exist
181 | 2. Create or edit the `mcp.json` file in that directory:
182 | 
183 | ```bash
184 | mkdir -p ~/.cursor
185 | touch ~/.cursor/mcp.json
186 | ```
187 | 
188 | 3. Add the following configuration to the `mcp.json` file, replacing the directory path with the actual path to where you've installed this server:
189 | 
190 | ```json
191 | {
192 |     "mcpServers": {
193 |         "databricks": {
194 |             "command": "uv",
195 |             "args": [
196 |                 "--directory",
197 |                 "/path/to/your/mcp-databricks-server",
198 |                 "run",
199 |                 "main.py"
200 |             ]
201 |         }
202 |     }
203 | }
204 | ```
205 | 
206 | Example using `python`:
207 | ```json
208 | {
209 |     "mcpServers": {
210 |         "databricks": {
211 |             "command": "python",
212 |             "args": [
213 |                 "/path/to/your/mcp-databricks-server/main.py"
214 |             ]
215 |         }
216 |     }
217 | }
218 | ```
219 | Restart Cursor to apply the changes. You can then use the `databricks` agent in Cursor.
220 | 
221 | ## Example Usage Workflow (for an LLM Agent)
222 | 
223 | This MCP server empowers an LLM agent to autonomously navigate your Databricks environment. The following screenshot illustrates a typical interaction where the agent iteratively explores schemas and tables, adapting its approach even when initial queries don't yield results, until it successfully retrieves the requested data.
224 | 
225 | ![Agent actively using MCP tools to find data](assets/agent_usage.png)
226 | *Fig 3: An LLM agent using the Databricks MCP tools, demonstrating iterative exploration and query refinement to locate specific page view data.*
227 | 
228 | An agent might follow this kind of workflow:
229 | 
230 | 1.  **Discover available catalogs**: `list_uc_catalogs()`
231 |     *   *Agent decides `prod_catalog` is relevant from the list.* 
232 | 2.  **Explore a specific catalog**: `describe_uc_catalog(catalog_name="prod_catalog")`
233 |     *   *Agent sees `sales_schema` and `inventory_schema`.*
234 | 3.  **Explore a specific schema (quick view)**: `describe_uc_schema(catalog_name="prod_catalog", schema_name="sales_schema")`
235 |     *   *Agent sees table names like `orders`, `customers`.* 
236 | 4.  **Get detailed table structure (including columns for query building)**: `describe_uc_schema(catalog_name="prod_catalog", schema_name="sales_schema", include_columns=True)`
237 |     *   *Alternatively, if a specific table is of interest:* `describe_uc_table(full_table_name="prod_catalog.sales_schema.orders")`
238 | 5.  **Analyze data lineage and discover processing code**: `describe_uc_table(full_table_name="prod_catalog.sales_schema.orders", include_lineage=True)`
239 |     *   *Agent discovers upstream tables, downstream dependencies, and notebooks that process this data*
240 |     *   *For example, sees that `/Repos/production/etl/sales_processing.py` writes to this table*
241 | 6.  **Examine data transformation logic**: *Agent directly reads the notebook file `/Repos/production/etl/sales_processing.py` within the IDE/repo*
242 |     *   *Agent analyzes the actual Python/SQL code to understand business rules, data quality checks, and transformation logic*
243 | 7.  **Construct and execute a query**: `execute_sql_query(sql="SELECT customer_id, order_date, SUM(order_total) FROM prod_catalog.sales_schema.orders WHERE order_date > '2023-01-01' GROUP BY customer_id, order_date ORDER BY order_date DESC LIMIT 100")`
244 | 
245 | ## Managing Metadata as Code with Terraform
246 | 
247 | While manually entering metadata through the Databricks UI is an option, a more robust and scalable approach is to define your Unity Catalog metadata as code. Tools like Terraform allow you to declaratively manage your data governance objects, including catalogs and schemas. This brings several advantages:
248 | 
249 | *   **Version Control**: Your metadata definitions can be stored in Git, tracked, and versioned alongside your other infrastructure code.
250 | *   **Repeatability and Consistency**: Ensure consistent metadata across environments (dev, staging, prod).
251 | *   **Automation**: Integrate metadata management into your CI/CD pipelines.
252 | *   **Easier Maintenance for Core Assets**: While defining every new table as code might be complex due to their dynamic nature, core assets like catalogs and schemas are often more stable and benefit significantly from this approach. Maintaining their definitions and comments as code ensures a durable and well-documented foundation for your data landscape.
253 | 
254 | Here's an example of how you might define a catalog and its schemas using the Databricks provider for Terraform:
255 | 
256 | ```terraform
257 | resource "databricks_catalog" "prod_catalog" {
258 |   name          = "prod"
259 |   comment       = "Main production catalog for all enterprise data."
260 |   storage_root  = var.default_catalog_storage_root
261 |   force_destroy = false
262 | }
263 | 
264 | # Schemas within the 'prod' catalog
265 | resource "databricks_schema" "prod_raw" {
266 |   catalog_name = databricks_catalog.prod_catalog.name
267 |   name         = "raw"
268 |   comment      = "Raw data for all different projects, telemetry, game data etc., before any transformations. No schema enforcement."
269 | }
270 | 
271 | resource "databricks_schema" "prod_bi_conformed" {
272 |   catalog_name = databricks_catalog.prod_catalog.name
273 |   name         = "bi_conformed"
274 |   comment      = "Conformed (silver) schema for Business Intelligence, cleaned and well-formatted. Schema enforced."
275 | }
276 | 
277 | resource "databricks_schema" "prod_bi_modeled" {
278 |   catalog_name = databricks_catalog.prod_catalog.name
279 |   name         = "bi_modeled"
280 |   comment      = "Modeled (gold) schema for Business Intelligence, aggregated and ready for consumption. Schema enforced."
281 | }
282 | ```
283 | 
284 | Fear not if you already have existing catalogs and schemas in Unity Catalog. You don't need to recreate them to manage their metadata as code. Terraform provides the `terraform import` command, which allows you to bring existing infrastructure (including Unity Catalog assets) under its management. Once imported, you can define the resource in your Terraform configuration and selectively update attributes like the `comment` field without affecting the asset itself. For example, after importing an existing schema, you could add or update its `comment` in your `.tf` file, and `terraform apply` would only apply that change.
285 | 
286 | Adopting a metadata-as-code strategy, especially for foundational elements like catalogs and schemas, greatly enhances the quality and reliability of the metadata that this MCP server leverages. This, in turn, further improves the effectiveness of AI agents interacting with your Databricks data.
287 | 
288 | For more details on using Terraform with Databricks Unity Catalog, refer to the official documentation:
289 | *   Databricks Provider: Catalog Resource ([https://registry.terraform.io/providers/databricks/databricks/latest/docs/resources/catalog](https://registry.terraform.io/providers/databricks/databricks/latest/docs/resources/catalog))
290 | *   Databricks Provider: Schemas Data Source ([https://registry.terraform.io/providers/databricks/databricks/latest/docs/data-sources/schemas](https://registry.terraform.io/providers/databricks/databricks/latest/docs/data-sources/schemas))
291 | 
292 | ## Handling Long-Running Queries
293 | 
294 | The `execute_sql_query` tool utilizes the Databricks SDK's `execute_statement` method. The `wait_timeout` parameter in the underlying `databricks_sdk_utils.execute_databricks_sql` function is set to '50s'. If a query runs longer than this, the SDK may return a statement ID for polling, but the current implementation of the tool effectively waits up to this duration for a synchronous-like response. For very long-running queries, this timeout might be reached.
295 | 
296 | ## Dependencies
297 | 
298 | -   `databricks-sdk`: For interacting with the Databricks REST APIs and Unity Catalog.
299 | -   `python-dotenv`: For loading environment variables from a `.env` file.
300 | -   `mcp[cli]`: The Model Context Protocol library.
301 | -   `asyncio`: For asynchronous operations within the MCP server.
302 | -   `httpx` (typically a sub-dependency of `databricks-sdk` or `mcp`): For making HTTP requests.
303 | 
304 | 
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
1 | httpx>=0.28.1
2 | python-dotenv>=1.0.0 
3 | mcp[cli]>=1.2.0
4 | asyncio>=3.4.3
5 | databricks-sdk==0.55.0
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [project]
 2 | name = "databricks"
 3 | version = "0.1.0"
 4 | description = "Add your description here"
 5 | readme = "README.md"
 6 | requires-python = ">=3.10"
 7 | dependencies = [
 8 |     "httpx>=0.28.1",
 9 |     "mcp[cli]>=1.3.0",
10 | ]
11 | 
```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
 1 | FROM python:3.10-slim
 2 | 
 3 | WORKDIR /app
 4 | 
 5 | # Set environment variables (users should provide these at runtime)
 6 | ENV DATABRICKS_HOST="your-databricks-instance.cloud.databricks.com"
 7 | ENV DATABRICKS_TOKEN="your-databricks-access-token"
 8 | ENV DATABRICKS_SQL_WAREHOUSE_ID="your-sql-warehouse-id"
 9 | 
10 | COPY requirements.txt requirements.txt
11 | RUN pip install --no-cache-dir -r requirements.txt
12 | 
13 | COPY . .
14 | 
15 | CMD ["python", "main.py"] 
```

--------------------------------------------------------------------------------
/databricks_formatter.py:
--------------------------------------------------------------------------------

```python
 1 | from typing import Any, Dict, List
 2 | 
 3 | 
 4 | def format_query_results(result: Dict[str, Any]) -> str:
 5 |     """Format query results from either SDK or direct API style into a readable string."""
 6 | 
 7 |     if not result:
 8 |         return "No results or invalid result format."
 9 |     
10 |     column_names: List[str] = []
11 |     rows: List[List[Any]] = [] # For new style, this will be list of dicts initially
12 |     data_rows_formatted: List[str] = []
13 | 
14 |     # Try to parse as output from execute_databricks_sql (SDK based)
15 |     if result.get("status") == "success" and "data" in result:
16 |         print("Formatting results from SDK-based execute_databricks_sql output.")
17 |         sdk_data = result.get("data", [])
18 |         if not sdk_data: # No rows, but query was successful
19 |             # Try to get column names if available even with no data (e.g., from a manifest if we adapt execute_databricks_sql later)
20 |             # For now, if no data, we might not have explicit column names easily in this path.
21 |             # However, execute_databricks_sql returns column names implicit in the (empty) list of dicts.
22 |             # This part needs careful handling if sdk_data is empty but we still want headers.
23 |             # Let's assume if sdk_data is empty, we might not have columns easily unless manifest is also passed.
24 |             # For now, if sdk_data is empty, we report no data rows. Future improvement: get columns from manifest if possible.
25 |             if result.get("message") == "Query succeeded but returned no data.":
26 |                  # If we had column names from execute_databricks_sql (e.g. if it returned them separately)
27 |                  # we could print headers. For now, this message is sufficient.
28 |                 return "Query succeeded but returned no data."
29 |             return "Query succeeded but returned no data rows."
30 | 
31 |         # Assuming sdk_data is a list of dictionaries, get column names from the first row's keys
32 |         if isinstance(sdk_data, list) and len(sdk_data) > 0 and isinstance(sdk_data[0], dict):
33 |             column_names = list(sdk_data[0].keys())
34 |         
35 |         for row_dict in sdk_data:
36 |             row_values = []
37 |             for col_name in column_names: # Iterate in order of discovered column names
38 |                 value = row_dict.get(col_name)
39 |                 if value is None:
40 |                     row_values.append("NULL")
41 |                 else:
42 |                     row_values.append(str(value))
43 |             data_rows_formatted.append(" | ".join(row_values))
44 |     
45 |     # Try to parse as old direct API style output (from dbapi.execute_statement)
46 |     elif 'manifest' in result and 'result' in result:
47 |         print("Formatting results from original dbapi.execute_statement output.")
48 |         if result['manifest'].get('schema') and result['manifest']['schema'].get('columns'):
49 |             columns_schema = result['manifest']['schema']['columns']
50 |             column_names = [col['name'] for col in columns_schema if 'name' in col] if columns_schema else []
51 |         
52 |         if result['result'].get('data_array'):
53 |             raw_rows = result['result']['data_array']
54 |             for row_list in raw_rows:
55 |                 row_values = []
56 |                 for value in row_list:
57 |                     if value is None:
58 |                         row_values.append("NULL")
59 |                     else:
60 |                         row_values.append(str(value))
61 |                 data_rows_formatted.append(" | ".join(row_values))
62 |     else:
63 |         # Fallback if structure is completely unrecognized or an error dict itself
64 |         if result.get("status") == "error" and result.get("error"):
65 |             return f"Error from query execution: {result.get('error')} Details: {result.get('details', 'N/A')}"
66 |         return "Invalid or unrecognized result format."
67 | 
68 |     # Common formatting part for table output
69 |     if not column_names:
70 |         return "No column names found in the result."
71 |     
72 |     output_lines = []
73 |     output_lines.append(" | ".join(column_names))
74 |     output_lines.append("-" * (sum(len(name) + 3 for name in column_names) - 1 if column_names else 0))
75 | 
76 |     if not data_rows_formatted:
77 |         output_lines.append("No data rows found.")
78 |     else:
79 |         output_lines.extend(data_rows_formatted)
80 |     
81 |     return "\n".join(output_lines) 
```

--------------------------------------------------------------------------------
/main.py:
--------------------------------------------------------------------------------

```python
  1 | from typing import Optional
  2 | import asyncio
  3 | from mcp.server.fastmcp import FastMCP
  4 | from databricks_formatter import format_query_results
  5 | from databricks_sdk_utils import (
  6 |     get_uc_table_details,
  7 |     get_uc_catalog_details,
  8 |     get_uc_schema_details,
  9 |     execute_databricks_sql,
 10 |     get_uc_all_catalogs_summary
 11 | )
 12 | 
 13 | 
 14 | mcp = FastMCP("databricks")
 15 | 
 16 | @mcp.tool()
 17 | async def execute_sql_query(sql: str) -> str:
 18 |     """
 19 |     Executes a given SQL query against the Databricks SQL warehouse and returns the formatted results.
 20 |     
 21 |     Use this tool when you need to run specific SQL queries, such as SELECT, SHOW, or other DQL statements.
 22 |     This is ideal for targeted data retrieval or for queries that are too complex for the structured description tools.
 23 |     The results are returned in a human-readable, Markdown-like table format.
 24 | 
 25 |     Args:
 26 |         sql: The complete SQL query string to execute.
 27 |     """
 28 |     try:
 29 |         sdk_result = await asyncio.to_thread(execute_databricks_sql, sql_query=sql)
 30 |         
 31 |         status = sdk_result.get("status")
 32 |         if status == "failed":
 33 |             error_message = sdk_result.get("error", "Unknown query execution error.")
 34 |             details = sdk_result.get("details", "No additional details provided.")
 35 |             return f"SQL Query Failed: {error_message}\nDetails: {details}"
 36 |         elif status == "error":
 37 |             error_message = sdk_result.get("error", "Unknown error during SQL execution.")
 38 |             details = sdk_result.get("details", "No additional details provided.")
 39 |             return f"Error during SQL Execution: {error_message}\nDetails: {details}"
 40 |         elif status == "success":
 41 |             return format_query_results(sdk_result)
 42 |         else:
 43 |             # Should not happen if execute_databricks_sql always returns a known status
 44 |             return f"Received an unexpected status from query execution: {status}. Result: {sdk_result}"
 45 |             
 46 |     except Exception as e:
 47 |         return f"An unexpected error occurred while executing SQL query: {str(e)}"
 48 | 
 49 | 
 50 | @mcp.tool()
 51 | async def describe_uc_table(full_table_name: str, include_lineage: Optional[bool] = False) -> str:
 52 |     """
 53 |     Provides a detailed description of a specific Unity Catalog table.
 54 |     
 55 |     Use this tool to understand the structure (columns, data types, partitioning) of a single table.
 56 |     This is essential before constructing SQL queries against the table.
 57 |     
 58 |     Optionally, it can include comprehensive lineage information that goes beyond traditional 
 59 |     table-to-table dependencies:
 60 | 
 61 |     **Table Lineage:**
 62 |     - Upstream tables (tables this table reads from)
 63 |     - Downstream tables (tables that read from this table)
 64 |     
 65 |     **Notebook & Job Lineage:**
 66 |     - Notebooks that read from this table, including:
 67 |       * Notebook name and workspace path
 68 |       * Associated Databricks job information (job name, ID, task details)
 69 |     - Notebooks that write to this table with the same detailed context
 70 |     
 71 |     **Use Cases:**
 72 |     - Data impact analysis: understand what breaks if you modify this table
 73 |     - Code discovery: find notebooks that process this data for further analysis
 74 |     - Debugging: trace data flow issues by examining both table dependencies and processing code
 75 |     - Documentation: understand the complete data ecosystem around a table
 76 | 
 77 |     The lineage information allows LLMs and tools to subsequently fetch the actual notebook 
 78 |     code content for deeper analysis of data transformations and business logic.
 79 | 
 80 |     The output is formatted in Markdown.
 81 | 
 82 |     Args:
 83 |         full_table_name: The fully qualified three-part name of the table (e.g., `catalog.schema.table`).
 84 |         include_lineage: Set to True to fetch and include comprehensive lineage (tables, notebooks, jobs). 
 85 |                          Defaults to False. May take longer to retrieve but provides rich context for 
 86 |                          understanding data dependencies and enabling code exploration.
 87 |     """
 88 |     try:
 89 |         details_markdown = await asyncio.to_thread(
 90 |             get_uc_table_details,
 91 |             full_table_name=full_table_name,
 92 |             include_lineage=include_lineage
 93 |         )
 94 |         return details_markdown
 95 |     except ImportError as e:
 96 |         return f"Error initializing Databricks SDK utilities: {str(e)}. Please ensure DATABRICKS_HOST and DATABRICKS_TOKEN are set."
 97 |     except Exception as e:
 98 |         return f"Error getting detailed table description for '{full_table_name}': {str(e)}"
 99 | 
100 | @mcp.tool()
101 | async def describe_uc_catalog(catalog_name: str) -> str:
102 |     """
103 |     Provides a summary of a specific Unity Catalog, listing all its schemas with their names and descriptions.
104 |     
105 |     Use this tool when you know the catalog name and need to discover the schemas within it.
106 |     This is often a precursor to describing a specific schema or table.
107 |     The output is formatted in Markdown.
108 | 
109 |     Args:
110 |         catalog_name: The name of the Unity Catalog to describe (e.g., `prod`, `dev`, `system`).
111 |     """
112 |     try:
113 |         summary_markdown = await asyncio.to_thread(
114 |             get_uc_catalog_details,
115 |             catalog_name=catalog_name
116 |         )
117 |         return summary_markdown
118 |     except ImportError as e:
119 |         return f"Error initializing Databricks SDK utilities: {str(e)}. Please ensure DATABRICKS_HOST and DATABRICKS_TOKEN are set."
120 |     except Exception as e:
121 |         return f"Error getting catalog summary for '{catalog_name}': {str(e)}"
122 | 
123 | @mcp.tool()
124 | async def describe_uc_schema(catalog_name: str, schema_name: str, include_columns: Optional[bool] = False) -> str:
125 |     """
126 |     Provides detailed information about a specific schema within a Unity Catalog.
127 |     
128 |     Use this tool to understand the contents of a schema, primarily its tables.
129 |     Optionally, it can list all tables within the schema and their column details.
130 |     Set `include_columns=True` to get column information, which is crucial for query construction but makes the output longer.
131 |     If `include_columns=False`, only table names and descriptions are shown, useful for a quicker overview.
132 |     The output is formatted in Markdown.
133 | 
134 |     Args:
135 |         catalog_name: The name of the catalog containing the schema.
136 |         schema_name: The name of the schema to describe.
137 |         include_columns: If True, lists tables with their columns. Defaults to False for a briefer summary.
138 |     """
139 |     try:
140 |         details_markdown = await asyncio.to_thread(
141 |             get_uc_schema_details,
142 |             catalog_name=catalog_name,
143 |             schema_name=schema_name,
144 |             include_columns=include_columns
145 |         )
146 |         return details_markdown
147 |     except ImportError as e:
148 |         return f"Error initializing Databricks SDK utilities: {str(e)}. Please ensure DATABRICKS_HOST and DATABRICKS_TOKEN are set."
149 |     except Exception as e:
150 |         return f"Error getting detailed schema description for '{catalog_name}.{schema_name}': {str(e)}"
151 | 
152 | @mcp.tool()
153 | async def list_uc_catalogs() -> str:
154 |     """
155 |     Lists all available Unity Catalogs with their names, descriptions, and types.
156 |     
157 |     Use this tool as a starting point to discover available data sources when you don't know specific catalog names.
158 |     It provides a high-level overview of all accessible catalogs in the workspace.
159 |     The output is formatted in Markdown.
160 |     """
161 |     try:
162 |         summary_markdown = await asyncio.to_thread(get_uc_all_catalogs_summary)
163 |         return summary_markdown
164 |     except ImportError as e:
165 |         return f"Error initializing Databricks SDK utilities: {str(e)}. Please ensure DATABRICKS_HOST and DATABRICKS_TOKEN are set."
166 |     except Exception as e:
167 |         return f"Error listing catalogs: {str(e)}"
168 | 
169 | if __name__ == "__main__":
170 |     mcp.run(transport='stdio')
```

--------------------------------------------------------------------------------
/databricks_sdk_utils.py:
--------------------------------------------------------------------------------

```python
  1 | from databricks.sdk import WorkspaceClient
  2 | from databricks.sdk.core import Config
  3 | from databricks.sdk.service.catalog import TableInfo, SchemaInfo, ColumnInfo, CatalogInfo
  4 | from databricks.sdk.service.sql import StatementResponse, StatementState
  5 | from typing import Dict, Any, List
  6 | import os
  7 | import json
  8 | import time
  9 | from dotenv import load_dotenv
 10 | 
 11 | # Load environment variables from .env file when the module is imported
 12 | load_dotenv()
 13 | 
 14 | DATABRICKS_HOST = os.environ.get("DATABRICKS_HOST")
 15 | DATABRICKS_TOKEN = os.environ.get("DATABRICKS_TOKEN")
 16 | DATABRICKS_SQL_WAREHOUSE_ID = os.environ.get("DATABRICKS_SQL_WAREHOUSE_ID")
 17 | 
 18 | if not DATABRICKS_HOST or not DATABRICKS_TOKEN:
 19 |     raise ImportError(
 20 |         "DATABRICKS_HOST and DATABRICKS_TOKEN must be set in environment variables or .env file "
 21 |         "for databricks_sdk_utils to initialize."
 22 |     )
 23 | 
 24 | # Configure and initialize the global SDK client
 25 | # Using short timeouts as previously determined to be effective
 26 | sdk_config = Config(
 27 |     host=DATABRICKS_HOST,
 28 |     token=DATABRICKS_TOKEN,
 29 |     http_timeout_seconds=30,
 30 |     retry_timeout_seconds=60
 31 | )
 32 | sdk_client = WorkspaceClient(config=sdk_config)
 33 | 
 34 | # Cache for job information to avoid redundant API calls
 35 | _job_cache = {}
 36 | _notebook_cache = {}
 37 | 
 38 | def _format_column_details_md(columns: List[ColumnInfo]) -> List[str]:
 39 |     """
 40 |     Formats a list of ColumnInfo objects into a list of Markdown strings.
 41 |     """
 42 |     markdown_lines = []
 43 |     if not columns:
 44 |         markdown_lines.append("  - *No column information available.*")
 45 |         return markdown_lines
 46 | 
 47 |     for col in columns:
 48 |         if not isinstance(col, ColumnInfo):
 49 |             print(f"Warning: Encountered an unexpected item in columns list: {type(col)}. Skipping.")
 50 |             continue
 51 |         col_type = col.type_text or (col.type_name.value if col.type_name and hasattr(col.type_name, 'value') else "N/A")
 52 |         nullable_status = "nullable" if col.nullable else "not nullable"
 53 |         col_description = f": {col.comment}" if col.comment else ""
 54 |         markdown_lines.append(f"  - **{col.name}** (`{col_type}`, {nullable_status}){col_description}")
 55 |     return markdown_lines
 56 | 
 57 | def _get_job_info_cached(job_id: str) -> Dict[str, Any]:
 58 |     """Get job information with caching to avoid redundant API calls"""
 59 |     if job_id not in _job_cache:
 60 |         try:
 61 |             job_info = sdk_client.jobs.get(job_id=job_id)
 62 |             _job_cache[job_id] = {
 63 |                 'name': job_info.settings.name if job_info.settings.name else f"Job {job_id}",
 64 |                 'tasks': []
 65 |             }
 66 |             
 67 |             # Pre-process all tasks to build notebook mapping
 68 |             if job_info.settings.tasks:
 69 |                 for task in job_info.settings.tasks:
 70 |                     if hasattr(task, 'notebook_task') and task.notebook_task:
 71 |                         task_info = {
 72 |                             'task_key': task.task_key,
 73 |                             'notebook_path': task.notebook_task.notebook_path
 74 |                         }
 75 |                         _job_cache[job_id]['tasks'].append(task_info)
 76 |                         
 77 |         except Exception as e:
 78 |             print(f"Error fetching job {job_id}: {e}")
 79 |             _job_cache[job_id] = {
 80 |                 'name': f"Job {job_id}",
 81 |                 'tasks': [],
 82 |                 'error': str(e)
 83 |             }
 84 |     
 85 |     return _job_cache[job_id]
 86 | 
 87 | def _get_notebook_id_cached(notebook_path: str) -> str:
 88 |     """Get notebook ID with caching to avoid redundant API calls"""
 89 |     if notebook_path not in _notebook_cache:
 90 |         try:
 91 |             notebook_details = sdk_client.workspace.get_status(notebook_path)
 92 |             _notebook_cache[notebook_path] = str(notebook_details.object_id)
 93 |         except Exception as e:
 94 |             print(f"Error fetching notebook {notebook_path}: {e}")
 95 |             _notebook_cache[notebook_path] = None
 96 |     
 97 |     return _notebook_cache[notebook_path]
 98 | 
 99 | def _resolve_notebook_info_optimized(notebook_id: str, job_id: str) -> Dict[str, Any]:
100 |     """
101 |     Optimized version that resolves notebook info using cached job data.
102 |     Returns dict with notebook_path, notebook_name, job_name, and task_key.
103 |     """
104 |     result = {
105 |         'notebook_id': notebook_id,
106 |         'notebook_path': f"notebook_id:{notebook_id}",
107 |         'notebook_name': f"notebook_id:{notebook_id}",
108 |         'job_id': job_id,
109 |         'job_name': f"Job {job_id}",
110 |         'task_key': None
111 |     }
112 |     
113 |     # Get cached job info
114 |     job_info = _get_job_info_cached(job_id)
115 |     result['job_name'] = job_info['name']
116 |     
117 |     # Look for notebook in job tasks
118 |     for task_info in job_info['tasks']:
119 |         notebook_path = task_info['notebook_path']
120 |         cached_notebook_id = _get_notebook_id_cached(notebook_path)
121 |         
122 |         if cached_notebook_id == notebook_id:
123 |             result['notebook_path'] = notebook_path
124 |             result['notebook_name'] = notebook_path.split('/')[-1]
125 |             result['task_key'] = task_info['task_key']
126 |             break
127 |     
128 |     return result
129 | 
130 | def _format_notebook_info_optimized(notebook_info: Dict[str, Any]) -> str:
131 |     """
132 |     Formats notebook information using pre-resolved data.
133 |     """
134 |     lines = []
135 |     
136 |     if notebook_info['notebook_path'].startswith('/'):
137 |         lines.append(f"**`{notebook_info['notebook_name']}`**")
138 |         lines.append(f"  - **Path**: `{notebook_info['notebook_path']}`")
139 |     else:
140 |         lines.append(f"**{notebook_info['notebook_name']}**")
141 |     
142 |     lines.append(f"  - **Job**: {notebook_info['job_name']} (ID: {notebook_info['job_id']})")
143 |     if notebook_info['task_key']:
144 |         lines.append(f"  - **Task**: {notebook_info['task_key']}")
145 |     
146 |     return "\n".join(lines)
147 | 
148 | def _process_lineage_results(lineage_query_output: Dict[str, Any], main_table_full_name: str) -> Dict[str, Any]:
149 |     """
150 |     Optimized version of lineage processing that batches API calls and uses caching.
151 |     """
152 |     print("Processing lineage results with optimization...")
153 |     start_time = time.time()
154 |     
155 |     processed_data: Dict[str, Any] = {
156 |         "upstream_tables": [],
157 |         "downstream_tables": [],
158 |         "notebooks_reading": [],
159 |         "notebooks_writing": []
160 |     }
161 |     
162 |     if not lineage_query_output or lineage_query_output.get("status") != "success" or not isinstance(lineage_query_output.get("data"), list):
163 |         print("Warning: Lineage query output is invalid or not successful. Returning empty lineage.")
164 |         return processed_data
165 | 
166 |     upstream_set = set()
167 |     downstream_set = set()
168 |     notebooks_reading_dict = {}
169 |     notebooks_writing_dict = {}
170 |     
171 |     # Collect all unique job IDs first for batch processing
172 |     unique_job_ids = set()
173 |     notebook_job_pairs = []
174 |     
175 |     for row in lineage_query_output["data"]:
176 |         source_table = row.get("source_table_full_name")
177 |         target_table = row.get("target_table_full_name")
178 |         entity_metadata = row.get("entity_metadata")
179 |         
180 |         # Parse entity metadata
181 |         notebook_id = None
182 |         job_id = None
183 |         
184 |         if entity_metadata:
185 |             try:
186 |                 if isinstance(entity_metadata, str):
187 |                     metadata_dict = json.loads(entity_metadata)
188 |                 else:
189 |                     metadata_dict = entity_metadata
190 |                     
191 |                 notebook_id = metadata_dict.get("notebook_id")
192 |                 job_info = metadata_dict.get("job_info")
193 |                 if job_info:
194 |                     job_id = job_info.get("job_id")
195 |             except (json.JSONDecodeError, AttributeError):
196 |                 pass
197 |         
198 |         # Process table-to-table lineage
199 |         if source_table == main_table_full_name and target_table and target_table != main_table_full_name:
200 |             downstream_set.add(target_table)
201 |         elif target_table == main_table_full_name and source_table and source_table != main_table_full_name:
202 |             upstream_set.add(source_table)
203 |         
204 |         # Collect notebook-job pairs for batch processing
205 |         if notebook_id and job_id:
206 |             unique_job_ids.add(job_id)
207 |             notebook_job_pairs.append({
208 |                 'notebook_id': notebook_id,
209 |                 'job_id': job_id,
210 |                 'source_table': source_table,
211 |                 'target_table': target_table
212 |             })
213 |     
214 |     # Pre-load all job information in parallel (this is where the optimization happens)
215 |     print(f"Pre-loading {len(unique_job_ids)} unique jobs...")
216 |     batch_start = time.time()
217 |     
218 |     for job_id in unique_job_ids:
219 |         _get_job_info_cached(job_id)  # This will cache the job info
220 |     
221 |     batch_time = time.time() - batch_start
222 |     print(f"Job batch loading took {batch_time:.2f} seconds")
223 |     
224 |     # Now process all notebook-job pairs using cached data
225 |     print(f"Processing {len(notebook_job_pairs)} notebook entries...")
226 |     for pair in notebook_job_pairs:
227 |         notebook_info = _resolve_notebook_info_optimized(pair['notebook_id'], pair['job_id'])
228 |         formatted_info = _format_notebook_info_optimized(notebook_info)
229 |         
230 |         if pair['source_table'] == main_table_full_name:
231 |             notebooks_reading_dict[pair['notebook_id']] = formatted_info
232 |         elif pair['target_table'] == main_table_full_name:
233 |             notebooks_writing_dict[pair['notebook_id']] = formatted_info
234 |     
235 |     processed_data["upstream_tables"] = sorted(list(upstream_set))
236 |     processed_data["downstream_tables"] = sorted(list(downstream_set))
237 |     processed_data["notebooks_reading"] = sorted(list(notebooks_reading_dict.values()))
238 |     processed_data["notebooks_writing"] = sorted(list(notebooks_writing_dict.values()))
239 |     
240 |     total_time = time.time() - start_time
241 |     print(f"Total lineage processing took {total_time:.2f} seconds")
242 |     
243 |     return processed_data
244 | 
245 | def clear_lineage_cache():
246 |     """Clear the job and notebook caches to free memory"""
247 |     global _job_cache, _notebook_cache
248 |     _job_cache = {}
249 |     _notebook_cache = {}
250 |     print("Cleared lineage caches")
251 | 
252 | def _get_table_lineage(table_full_name: str) -> Dict[str, Any]:
253 |     """
254 |     Retrieves table lineage information for a given table using the global SDK client
255 |     and global SQL warehouse ID. Now includes notebook and job information with enhanced details.
256 |     """
257 |     if not DATABRICKS_SQL_WAREHOUSE_ID: # Check before attempting query
258 |         return {"status": "error", "error": "DATABRICKS_SQL_WAREHOUSE_ID is not set. Cannot fetch lineage."}
259 | 
260 |     lineage_sql_query = f"""
261 |     SELECT source_table_full_name, target_table_full_name, entity_type, entity_id, 
262 |            entity_run_id, entity_metadata, created_by, event_time
263 |     FROM system.access.table_lineage
264 |     WHERE source_table_full_name = '{table_full_name}' OR target_table_full_name = '{table_full_name}'
265 |     ORDER BY event_time DESC LIMIT 100;
266 |     """
267 |     print(f"Fetching and processing lineage for table: {table_full_name}")
268 |     # execute_databricks_sql will now use the global warehouse_id
269 |     raw_lineage_output = execute_databricks_sql(lineage_sql_query, wait_timeout='50s') 
270 |     return _process_lineage_results(raw_lineage_output, table_full_name)
271 | 
272 | def _format_single_table_md(table_info: TableInfo, base_heading_level: int, display_columns: bool) -> List[str]:
273 |     """
274 |     Formats the details for a single TableInfo object into a list of Markdown strings.
275 |     Uses a base_heading_level to control Markdown header depth for hierarchical display.
276 |     """
277 |     table_markdown_parts = []
278 |     table_header_prefix = "#" * base_heading_level
279 |     sub_header_prefix = "#" * (base_heading_level + 1)
280 | 
281 |     table_markdown_parts.append(f"{table_header_prefix} Table: **{table_info.full_name}**")
282 | 
283 |     if table_info.comment:
284 |         table_markdown_parts.extend(["", f"**Description**: {table_info.comment}"])
285 |     elif base_heading_level == 1:
286 |         table_markdown_parts.extend(["", "**Description**: No description provided."])
287 |     
288 |     # Process and add partition columns
289 |     partition_column_names: List[str] = []
290 |     if table_info.columns:
291 |         temp_partition_cols: List[tuple[str, int]] = []
292 |         for col in table_info.columns:
293 |             if col.partition_index is not None:
294 |                 temp_partition_cols.append((col.name, col.partition_index))
295 |         if temp_partition_cols:
296 |             temp_partition_cols.sort(key=lambda x: x[1])
297 |             partition_column_names = [name for name, index in temp_partition_cols]
298 | 
299 |     if partition_column_names:
300 |         table_markdown_parts.extend(["", f"{sub_header_prefix} Partition Columns"])
301 |         table_markdown_parts.extend([f"- `{col_name}`" for col_name in partition_column_names])
302 |     elif base_heading_level == 1:
303 |         table_markdown_parts.extend(["", f"{sub_header_prefix} Partition Columns", "- *This table is not partitioned or partition key information is unavailable.*"])
304 | 
305 |     if display_columns:
306 |         table_markdown_parts.extend(["", f"{sub_header_prefix} Table Columns"])
307 |         if table_info.columns:
308 |             table_markdown_parts.extend(_format_column_details_md(table_info.columns))
309 |         else:
310 |             table_markdown_parts.append("  - *No column information available.*")
311 |             
312 |     return table_markdown_parts
313 | 
314 | def execute_databricks_sql(sql_query: str, wait_timeout: str = '50s') -> Dict[str, Any]:
315 |     """
316 |     Executes a SQL query on Databricks using the global SDK client and global SQL warehouse ID.
317 |     """
318 |     if not DATABRICKS_SQL_WAREHOUSE_ID:
319 |         return {"status": "error", "error": "DATABRICKS_SQL_WAREHOUSE_ID is not set. Cannot execute SQL query."}
320 |     
321 |     try:
322 |         print(f"Executing SQL on warehouse {DATABRICKS_SQL_WAREHOUSE_ID} (timeout: {wait_timeout}):\n{sql_query[:200]}..." + (" (truncated)" if len(sql_query) > 200 else ""))
323 |         response: StatementResponse = sdk_client.statement_execution.execute_statement(
324 |             statement=sql_query,
325 |             warehouse_id=DATABRICKS_SQL_WAREHOUSE_ID, # Use global warehouse ID
326 |             wait_timeout=wait_timeout
327 |         )
328 | 
329 |         if response.status and response.status.state == StatementState.SUCCEEDED:
330 |             if response.result and response.result.data_array:
331 |                 column_names = [col.name for col in response.manifest.schema.columns] if response.manifest and response.manifest.schema and response.manifest.schema.columns else []
332 |                 results = [dict(zip(column_names, row)) for row in response.result.data_array]
333 |                 return {"status": "success", "row_count": len(results), "data": results}
334 |             else:
335 |                 return {"status": "success", "row_count": 0, "data": [], "message": "Query succeeded but returned no data."}
336 |         elif response.status:
337 |             error_message = response.status.error.message if response.status.error else "No error details provided."
338 |             return {"status": "failed", "error": f"Query execution failed with state: {response.status.state.value}", "details": error_message}
339 |         else:
340 |             return {"status": "failed", "error": "Query execution status unknown."}
341 |     except Exception as e:
342 |         return {"status": "error", "error": f"An error occurred during SQL execution: {str(e)}"}
343 | 
344 | def get_uc_table_details(full_table_name: str, include_lineage: bool = False) -> str:
345 |     """
346 |     Fetches table metadata and optionally lineage, then formats it into a Markdown string.
347 |     Uses the _format_single_table_md helper for core table structure.
348 |     """
349 |     print(f"Fetching metadata for {full_table_name}...")
350 |     
351 |     try:
352 |         table_info: TableInfo = sdk_client.tables.get(full_name=full_table_name)
353 |     except Exception as e:
354 |         error_details = str(e)
355 |         return f"""# Error: Could Not Retrieve Table Details
356 | **Table:** `{full_table_name}`
357 | **Problem:** Failed to fetch the complete metadata for this table.
358 | **Details:**
359 | ```
360 | {error_details}
361 | ```"""
362 | 
363 |     markdown_parts = _format_single_table_md(table_info, base_heading_level=1, display_columns=True)
364 | 
365 |     if include_lineage:
366 |         markdown_parts.extend(["", "## Lineage Information"])
367 |         if not DATABRICKS_SQL_WAREHOUSE_ID:
368 |             markdown_parts.append("- *Lineage fetching skipped: `DATABRICKS_SQL_WAREHOUSE_ID` environment variable is not set.*")
369 |         else:
370 |             print(f"Fetching lineage for {full_table_name}...")
371 |             lineage_info = _get_table_lineage(full_table_name)
372 |             
373 |             has_upstream = lineage_info and isinstance(lineage_info.get("upstream_tables"), list) and lineage_info["upstream_tables"]
374 |             has_downstream = lineage_info and isinstance(lineage_info.get("downstream_tables"), list) and lineage_info["downstream_tables"]
375 |             has_notebooks_reading = lineage_info and isinstance(lineage_info.get("notebooks_reading"), list) and lineage_info["notebooks_reading"]
376 |             has_notebooks_writing = lineage_info and isinstance(lineage_info.get("notebooks_writing"), list) and lineage_info["notebooks_writing"]
377 | 
378 |             if has_upstream:
379 |                 markdown_parts.extend(["", "### Upstream Tables (tables this table reads from):"])
380 |                 markdown_parts.extend([f"- `{table}`" for table in lineage_info["upstream_tables"]])
381 |             
382 |             if has_downstream:
383 |                 markdown_parts.extend(["", "### Downstream Tables (tables that read from this table):"])
384 |                 markdown_parts.extend([f"- `{table}`" for table in lineage_info["downstream_tables"]])
385 |             
386 |             if has_notebooks_reading:
387 |                 markdown_parts.extend(["", "### Notebooks Reading from this Table:"])
388 |                 for notebook in lineage_info["notebooks_reading"]:
389 |                     markdown_parts.extend([f"- {notebook}", ""])
390 |             
391 |             if has_notebooks_writing:
392 |                 markdown_parts.extend(["", "### Notebooks Writing to this Table:"])
393 |                 for notebook in lineage_info["notebooks_writing"]:
394 |                     markdown_parts.extend([f"- {notebook}", ""])
395 |             
396 |             if not any([has_upstream, has_downstream, has_notebooks_reading, has_notebooks_writing]):
397 |                 if lineage_info and lineage_info.get("status") == "error" and lineage_info.get("error"):
398 |                      markdown_parts.extend(["", "*Note: Could not retrieve complete lineage information.*", f"> *Lineage fetch error: {lineage_info.get('error')}*"])
399 |                 elif lineage_info and lineage_info.get("status") != "success" and lineage_info.get("error"):
400 |                     markdown_parts.extend(["", "*Note: Could not retrieve complete lineage information.*", f"> *Lineage fetch error: {lineage_info.get('error')}*"])
401 |                 else:
402 |                     markdown_parts.append("- *No table, notebook, or job dependencies found or lineage fetch was not fully successful.*")
403 |     else:
404 |         markdown_parts.extend(["", "## Lineage Information", "- *Lineage fetching skipped as per request.*"])
405 | 
406 |     return "\n".join(markdown_parts)
407 | 
408 | def get_uc_schema_details(catalog_name: str, schema_name: str, include_columns: bool = False) -> str:
409 |     """
410 |     Fetches detailed information for a specific schema, optionally including its tables and their columns.
411 |     Uses the global SDK client and the _format_single_table_md helper with appropriate heading levels.
412 |     """
413 |     full_schema_name = f"{catalog_name}.{schema_name}"
414 |     markdown_parts = [f"# Schema Details: **{full_schema_name}**"]
415 | 
416 |     try:
417 |         print(f"Fetching details for schema: {full_schema_name}...")
418 |         schema_info: SchemaInfo = sdk_client.schemas.get(full_name=full_schema_name)
419 | 
420 |         description = schema_info.comment if schema_info.comment else "No description provided."
421 |         markdown_parts.append(f"**Description**: {description}")
422 |         markdown_parts.append("")
423 | 
424 |         markdown_parts.append(f"## Tables in Schema `{schema_name}`")
425 |             
426 |         tables_iterable = sdk_client.tables.list(catalog_name=catalog_name, schema_name=schema_name)
427 |         tables_list = list(tables_iterable)
428 | 
429 |         if not tables_list:
430 |             markdown_parts.append("- *No tables found in this schema.*")
431 |         else:
432 |             for i, table_info in enumerate(tables_list):
433 |                 if not isinstance(table_info, TableInfo):
434 |                     print(f"Warning: Encountered an unexpected item in tables list: {type(table_info)}")
435 |                     continue
436 |                 
437 |                 markdown_parts.extend(_format_single_table_md(
438 |                     table_info, 
439 |                     base_heading_level=3,
440 |                     display_columns=include_columns
441 |                 ))
442 |                 if i < len(tables_list) - 1:
443 |                     markdown_parts.append("\n=============\n")
444 |                 else:
445 |                     markdown_parts.append("")
446 | 
447 |     except Exception as e:
448 |         error_message = f"Failed to retrieve details for schema '{full_schema_name}': {str(e)}"
449 |         print(f"Error in get_uc_schema_details: {error_message}")
450 |         return f"""# Error: Could Not Retrieve Schema Details
451 | **Schema:** `{full_schema_name}`
452 | **Problem:** An error occurred while attempting to fetch schema information.
453 | **Details:**
454 | ```
455 | {error_message}
456 | ```"""
457 | 
458 |     return "\n".join(markdown_parts)
459 | 
460 | def get_uc_catalog_details(catalog_name: str) -> str:
461 |     """
462 |     Fetches and formats a summary of all schemas within a given catalog
463 |     using the global SDK client.
464 |     """
465 |     markdown_parts = [f"# Catalog Summary: **{catalog_name}**", ""]
466 |     schemas_found_count = 0
467 |     
468 |     try:
469 |         print(f"Fetching schemas for catalog: {catalog_name} using global sdk_client...")
470 |         # The sdk_client is globally defined in this module
471 |         schemas_iterable = sdk_client.schemas.list(catalog_name=catalog_name)
472 |         
473 |         # Convert iterator to list to easily check if empty and get a count
474 |         schemas_list = list(schemas_iterable) 
475 | 
476 |         if not schemas_list:
477 |             markdown_parts.append(f"No schemas found in catalog `{catalog_name}`.")
478 |             return "\n".join(markdown_parts)
479 | 
480 |         schemas_found_count = len(schemas_list)
481 |         markdown_parts.append(f"Showing top {schemas_found_count} schemas found in catalog `{catalog_name}`:")
482 |         markdown_parts.append("")
483 | 
484 |         for i, schema_info in enumerate(schemas_list):
485 |             if not isinstance(schema_info, SchemaInfo):
486 |                 print(f"Warning: Encountered an unexpected item in schemas list: {type(schema_info)}")
487 |                 continue
488 | 
489 |             # Start of a schema item in the list
490 |             schema_name_display = schema_info.full_name if schema_info.full_name else "Unnamed Schema"
491 |             markdown_parts.append(f"## {schema_name_display}") # Main bullet point for schema name
492 |                         
493 |             description = f"**Description**: {schema_info.comment}" if schema_info.comment else ""
494 |             markdown_parts.append(description)
495 |             
496 |             markdown_parts.append("") # Add a blank line for separation between schemas, or remove if too much space
497 | 
498 |     except Exception as e:
499 |         error_message = f"Failed to retrieve schemas for catalog '{catalog_name}': {str(e)}"
500 |         print(f"Error in get_catalog_summary: {error_message}")
501 |         # Return a structured error message in Markdown
502 |         return f"""# Error: Could Not Retrieve Catalog Summary
503 | **Catalog:** `{catalog_name}`
504 | **Problem:** An error occurred while attempting to fetch schema information.
505 | **Details:**
506 | ```
507 | {error_message}
508 | ```"""
509 |     
510 |     markdown_parts.append(f"**Total Schemas Found in `{catalog_name}`**: {schemas_found_count}")
511 |     return "\n".join(markdown_parts)
512 | 
513 | 
514 | 
515 | def get_uc_all_catalogs_summary() -> str:
516 |     """
517 |     Fetches a summary of all available Unity Catalogs, including their names, comments, and types.
518 |     Uses the global SDK client.
519 |     """
520 |     markdown_parts = ["# Available Unity Catalogs", ""]
521 |     catalogs_found_count = 0
522 | 
523 |     try:
524 |         print("Fetching all catalogs using global sdk_client...")
525 |         catalogs_iterable = sdk_client.catalogs.list()
526 |         catalogs_list = list(catalogs_iterable)
527 | 
528 |         if not catalogs_list:
529 |             markdown_parts.append("- *No catalogs found or accessible.*")
530 |             return "\n".join(markdown_parts)
531 | 
532 |         catalogs_found_count = len(catalogs_list)
533 |         markdown_parts.append(f"Found {catalogs_found_count} catalog(s):")
534 |         markdown_parts.append("")
535 | 
536 |         for catalog_info in catalogs_list:
537 |             if not isinstance(catalog_info, CatalogInfo):
538 |                 print(f"Warning: Encountered an unexpected item in catalogs list: {type(catalog_info)}")
539 |                 continue
540 |             
541 |             markdown_parts.append(f"- **`{catalog_info.name}`**")
542 |             description = catalog_info.comment if catalog_info.comment else "No description provided."
543 |             markdown_parts.append(f"  - **Description**: {description}")
544 |             
545 |             catalog_type_str = "N/A"
546 |             if catalog_info.catalog_type and hasattr(catalog_info.catalog_type, 'value'):
547 |                 catalog_type_str = catalog_info.catalog_type.value
548 |             elif catalog_info.catalog_type: # Fallback if it's not an Enum but has a direct string representation
549 |                 catalog_type_str = str(catalog_info.catalog_type)
550 |             markdown_parts.append(f"  - **Type**: `{catalog_type_str}`")
551 |             
552 |             markdown_parts.append("") # Add a blank line for separation
553 | 
554 |     except Exception as e:
555 |         error_message = f"Failed to retrieve catalog list: {str(e)}"
556 |         print(f"Error in get_uc_all_catalogs_summary: {error_message}")
557 |         return f"""# Error: Could Not Retrieve Catalog List
558 | **Problem:** An error occurred while attempting to fetch the list of catalogs.
559 | **Details:**
560 | ```
561 | {error_message}
562 | ```"""
563 |     
564 |     return "\n".join(markdown_parts)
565 | 
566 | 
```