#
tokens: 12887/50000 9/9 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── .python-version
├── assets
│   ├── agent_usage.png
│   ├── schema_description.png
│   └── table_columns_description.png
├── databricks_formatter.py
├── databricks_sdk_utils.py
├── Dockerfile
├── LICENSE
├── main.py
├── main.py.backup
├── pyproject.toml
├── README.md
├── requirements.txt
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.10

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info

# Virtual environments
.venv
.env

.DS_Store
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Databricks MCP Server

- [Motivation](#motivation)
- [Overview](#overview)
- [Practical Benefits of UC Metadata for AI Agents](#practical-benefits-of-uc-metadata-for-ai-agents)
- [Available Tools and Features](#available-tools-and-features)
- [Setup](#setup)
  - [System Requirements](#system-requirements)
  - [Installation](#installation)
- [Permissions Requirements](#permissions-requirements)
- [Running the Server](#running-the-server)
  - [Standalone Mode](#standalone-mode)
  - [Using with Cursor](#using-with-cursor)
- [Example Usage Workflow (for an LLM Agent)](#example-usage-workflow-for-an-llm-agent)
- [Managing Metadata as Code with Terraform](#managing-metadata-as-code-with-terraform)
- [Handling Long-Running Queries](#handling-long-running-queries)
- [Dependencies](#dependencies)

## Motivation

Databricks Unity Catalog (UC) allows for detailed documentation of your data assets, including catalogs, schemas, tables, and columns. Documenting these assets thoroughly requires an investment of time. One common question is: what are the practical benefits of this detailed metadata entry?

This MCP server provides a strong justification for that effort. It enables Large Language Models (LLMs) to directly access and utilize this Unity Catalog metadata. The more comprehensively your data is described in UC, the more effectively an LLM agent can understand your Databricks environment. This deeper understanding is crucial for the agent to autonomously construct more intelligent and accurate SQL queries to fulfill data requests.

## Overview

This Model Context Protocol (MCP) server is designed to interact with Databricks, with a strong focus on leveraging Unity Catalog (UC) metadata and enabling comprehensive data lineage exploration. The primary goal is to equip an AI agent with a comprehensive set of tools, enabling it to become independent in answering questions about your data. By autonomously exploring UC, understanding data structures, analyzing data lineage (including notebook and job dependencies), and executing SQL queries, the agent can fulfill data requests without direct human intervention for each step.

Beyond traditional catalog browsing, this server enables agents to discover and analyze the actual code that processes your data. Through enhanced lineage capabilities, agents can identify notebooks and jobs that read from or write to tables, then examine the actual transformation logic, business rules, and data quality checks implemented in those notebooks. This creates a powerful feedback loop where agents not only understand *what* data exists, but also *how* it's processed and transformed.

When used in an Agent mode, it can successfully iterate over a number of requests to perform complex tasks, including data discovery, impact analysis, and code exploration.

## Practical Benefits of UC Metadata for AI Agents

The tools provided by this MCP server are designed to parse and present the descriptions you've added to Unity Catalog, while also enabling deep exploration of your data processing code. This offers tangible advantages for LLM-based agents, directly impacting their ability to generate useful SQL and understand your data ecosystem:

*   **Clearer Data Context**: Agents can quickly understand the purpose of tables and columns, reducing ambiguity. This foundational understanding is the first step towards correct query formulation.
*   **More Accurate Query Generation**: Access to descriptions, data types, and relationships helps agents construct SQL queries with greater precision and semantic correctness.
*   **Efficient Data Exploration for Query Planning**: Metadata enables agents to navigate through catalogs and schemas more effectively, allowing them to identify the correct tables and columns to include in their SQL queries.
*   **Comprehensive Data Lineage**: Beyond table-to-table relationships, agents can discover notebooks and jobs that process data, enabling impact analysis and debugging of data pipeline issues.
*   **Code-Level Understanding**: Through notebook content exploration, agents can analyze actual transformation logic, business rules, and data quality checks, providing deeper insights into how data is processed and transformed.
*   **End-to-End Data Flow Analysis**: Agents can trace data from raw ingestion through transformation pipelines to final consumption, understanding both the structure and the processing logic at each step.

Well-documented metadata in Unity Catalog, when accessed via this server, allows an LLM agent to operate with better information and make more informed decisions, culminating in the generation of more effective SQL queries. For instance, schema descriptions help the agent identify relevant data sources for a query:

![Schema Description in Unity Catalog](assets/schema_description.png)
*Fig 1: A schema in Unity Catalog with user-provided descriptions. This MCP server makes this information directly accessible to an LLM, informing its query strategy.*

Similarly, detailed comments at the column level clarify the semantics of each field, which is crucial for constructing accurate SQL conditions and selections:

![Table Column Descriptions in Unity Catalog](assets/table_columns_description.png)
*Fig 2: Column-level descriptions in Unity Catalog. These details are passed to the LLM, aiding its understanding of the data structure for precise SQL generation.*

## Available Tools and Features

This MCP server provides a suite of tools designed to empower an LLM agent interacting with Databricks:

**Core Capabilities:**

*   **Execute SQL Queries**: Run arbitrary SQL queries using the Databricks SDK via the `execute_sql_query(sql: str)` tool. This is ideal for targeted data retrieval or complex operations.
*   **LLM-Focused Output**: All descriptive tools return information in Markdown format, optimized for consumption by Large Language Models, making it easier for agents to parse and understand the context.

**Unity Catalog Exploration Tools:**

The server provides the following tools for navigating and understanding your Unity Catalog assets. These are designed to be used by an LLM agent to gather context before constructing queries or making decisions, in an agentic way.

1.  `list_uc_catalogs() -> str`
    *   **Description**: Lists all available Unity Catalogs with their names, descriptions, and types.
    *   **When to use**: As a starting point to discover available data sources when you don't know specific catalog names. It provides a high-level overview of all accessible catalogs in the workspace.

2.  `describe_uc_catalog(catalog_name: str) -> str`
    *   **Description**: Provides a summary of a specific Unity Catalog, listing all its schemas with their names and descriptions.
    *   **When to use**: When you know the catalog name and need to discover the schemas within it. This is often a precursor to describing a specific schema or table.
    *   **Args**:
        *   `catalog_name`: The name of the Unity Catalog to describe (e.g., `prod`, `dev`, `system`).

3.  `describe_uc_schema(catalog_name: str, schema_name: str, include_columns: Optional[bool] = False) -> str`
    *   **Description**: Provides detailed information about a specific schema within a Unity Catalog. Returns all tables in the schema, optionally including their column details.
    *   **When to use**: To understand the contents of a schema, primarily its tables. Set `include_columns=True` to get column information, crucial for query construction but makes the output longer. If `include_columns=False`, only table names and descriptions are shown, useful for a quicker overview.
    *   **Args**:
        *   `catalog_name`: The name of the catalog containing the schema.
        *   `schema_name`: The name of the schema to describe.
        *   `include_columns`: If True, lists tables with their columns. Defaults to False for a briefer summary.

4.  `describe_uc_table(full_table_name: str, include_lineage: Optional[bool] = False) -> str`
    *   **Description**: Provides a detailed description of a specific Unity Catalog table with comprehensive lineage capabilities.
    *   **When to use**: To understand the structure (columns, data types, partitioning) of a single table. This is essential before constructing SQL queries against the table. Optionally, it can include comprehensive lineage information that goes beyond traditional table-to-table dependencies:
        *   **Table Lineage**: Upstream tables (tables this table reads from) and downstream tables (tables that read from this table)
        *   **Notebook & Job Lineage**: Notebooks that read from or write to this table, including notebook name, workspace path, associated Databricks job information (job name, ID, task details)
        *   **Code Discovery**: The lineage provides notebook paths that enable the an agent to directly read notebook files within the current repo/workspace, allowing analysis of actual data transformation logic
    *   **Args**:
        *   `full_table_name`: The fully qualified three-part name of the table (e.g., `catalog.schema.table`).
        *   `include_lineage`: Set to True to fetch comprehensive lineage (tables, notebooks, jobs). Defaults to False. May take longer to retrieve but provides rich context for understanding data dependencies and enabling code exploration.

5.  `execute_sql_query(sql: str) -> str`
    *   **Note**: This is the same tool listed under "Core Capabilities" but is repeated here in the context of a typical agent workflow involving UC exploration followed by querying.
    *   **Description**: Executes a given SQL query against the Databricks SQL warehouse and returns the formatted results.
    *   **When to use**: When you need to run specific SQL queries, such as SELECT, SHOW, or other DQL statements.
    *   **Args**:
        *   `sql`: The complete SQL query string to execute.

## Setup

### System Requirements

-   Python 3.10+
-   If you plan to install via `uv`, ensure it's [installed](https://docs.astral.sh/uv/getting-started/installation/#__tabbed_1_1)

### Installation

1.  Install the required dependencies:

```bash
pip install -r requirements.txt
```

Or if using `uv`:

```bash
uv pip install -r requirements.txt
```

2.  Set up your environment variables:

    Option 1: Using a `.env` file (recommended)

    Create a `.env` file in the root directory of this project with your Databricks credentials:

    ```env
    DATABRICKS_HOST="your-databricks-instance.cloud.databricks.com"
    DATABRICKS_TOKEN="your-databricks-personal-access-token"
    DATABRICKS_SQL_WAREHOUSE_ID="your-sql-warehouse-id"
    ```

    Option 2: Setting environment variables directly

    ```bash
    export DATABRICKS_HOST="your-databricks-instance.cloud.databricks.com"
    export DATABRICKS_TOKEN="your-databricks-personal-access-token"
    export DATABRICKS_SQL_WAREHOUSE_ID="your-sql-warehouse-id"
    ```

    You can find your SQL Warehouse ID in the Databricks UI under "SQL Warehouses".
    The `DATABRICKS_SQL_WAREHOUSE_ID` is primarily used for fetching table lineage and executing SQL queries via the `execute_sql_query` tool.
    Metadata browsing tools (listing/describing catalogs, schemas, tables) use the Databricks SDK's general UC APIs and do not strictly require a SQL Warehouse ID unless lineage is requested.

## Permissions Requirements

Before using this MCP server, ensure that the identity associated with the `DATABRICKS_TOKEN` (e.g., a user or service principal) has the necessary permissions:

1.  **Unity Catalog Permissions**: 
    -   `USE CATALOG` on catalogs to be accessed.
    -   `USE SCHEMA` on schemas to be accessed.
    -   `SELECT` on tables to be queried or described in detail (including column information).
    -   To list all catalogs, appropriate metastore-level permissions might be needed or it will list catalogs where the user has at least `USE CATALOG`.
2.  **SQL Warehouse Permissions** (for `execute_sql_query` and lineage fetching):
    -   `CAN_USE` permission on the SQL Warehouse specified by `DATABRICKS_SQL_WAREHOUSE_ID`.
3.  **Token Permissions**: 
    -   The personal access token or service principal token should have the minimum necessary scopes. For Unity Catalog operations, this typically involves workspace access. For SQL execution, it involves SQL permissions.
    -   It is strongly recommended to use a service principal with narrowly defined permissions for production or automated scenarios.

For security best practices, consider regularly rotating your access tokens and auditing query history and UC audit logs to monitor usage.

## Running the Server

### Standalone Mode

To run the server in standalone mode (e.g., for testing with Agent Composer):

```bash
python main.py
```

This will start the MCP server using stdio transport, which can be used with Agent Composer or other MCP clients.

### Using with Cursor

To use this MCP server with [Cursor](https://cursor.sh/), configure it in your Cursor settings (`~/.cursor/mcp.json`):

1. Create a `.cursor` directory in your home directory if it doesn't already exist
2. Create or edit the `mcp.json` file in that directory:

```bash
mkdir -p ~/.cursor
touch ~/.cursor/mcp.json
```

3. Add the following configuration to the `mcp.json` file, replacing the directory path with the actual path to where you've installed this server:

```json
{
    "mcpServers": {
        "databricks": {
            "command": "uv",
            "args": [
                "--directory",
                "/path/to/your/mcp-databricks-server",
                "run",
                "main.py"
            ]
        }
    }
}
```

Example using `python`:
```json
{
    "mcpServers": {
        "databricks": {
            "command": "python",
            "args": [
                "/path/to/your/mcp-databricks-server/main.py"
            ]
        }
    }
}
```
Restart Cursor to apply the changes. You can then use the `databricks` agent in Cursor.

## Example Usage Workflow (for an LLM Agent)

This MCP server empowers an LLM agent to autonomously navigate your Databricks environment. The following screenshot illustrates a typical interaction where the agent iteratively explores schemas and tables, adapting its approach even when initial queries don't yield results, until it successfully retrieves the requested data.

![Agent actively using MCP tools to find data](assets/agent_usage.png)
*Fig 3: An LLM agent using the Databricks MCP tools, demonstrating iterative exploration and query refinement to locate specific page view data.*

An agent might follow this kind of workflow:

1.  **Discover available catalogs**: `list_uc_catalogs()`
    *   *Agent decides `prod_catalog` is relevant from the list.* 
2.  **Explore a specific catalog**: `describe_uc_catalog(catalog_name="prod_catalog")`
    *   *Agent sees `sales_schema` and `inventory_schema`.*
3.  **Explore a specific schema (quick view)**: `describe_uc_schema(catalog_name="prod_catalog", schema_name="sales_schema")`
    *   *Agent sees table names like `orders`, `customers`.* 
4.  **Get detailed table structure (including columns for query building)**: `describe_uc_schema(catalog_name="prod_catalog", schema_name="sales_schema", include_columns=True)`
    *   *Alternatively, if a specific table is of interest:* `describe_uc_table(full_table_name="prod_catalog.sales_schema.orders")`
5.  **Analyze data lineage and discover processing code**: `describe_uc_table(full_table_name="prod_catalog.sales_schema.orders", include_lineage=True)`
    *   *Agent discovers upstream tables, downstream dependencies, and notebooks that process this data*
    *   *For example, sees that `/Repos/production/etl/sales_processing.py` writes to this table*
6.  **Examine data transformation logic**: *Agent directly reads the notebook file `/Repos/production/etl/sales_processing.py` within the IDE/repo*
    *   *Agent analyzes the actual Python/SQL code to understand business rules, data quality checks, and transformation logic*
7.  **Construct and execute a query**: `execute_sql_query(sql="SELECT customer_id, order_date, SUM(order_total) FROM prod_catalog.sales_schema.orders WHERE order_date > '2023-01-01' GROUP BY customer_id, order_date ORDER BY order_date DESC LIMIT 100")`

## Managing Metadata as Code with Terraform

While manually entering metadata through the Databricks UI is an option, a more robust and scalable approach is to define your Unity Catalog metadata as code. Tools like Terraform allow you to declaratively manage your data governance objects, including catalogs and schemas. This brings several advantages:

*   **Version Control**: Your metadata definitions can be stored in Git, tracked, and versioned alongside your other infrastructure code.
*   **Repeatability and Consistency**: Ensure consistent metadata across environments (dev, staging, prod).
*   **Automation**: Integrate metadata management into your CI/CD pipelines.
*   **Easier Maintenance for Core Assets**: While defining every new table as code might be complex due to their dynamic nature, core assets like catalogs and schemas are often more stable and benefit significantly from this approach. Maintaining their definitions and comments as code ensures a durable and well-documented foundation for your data landscape.

Here's an example of how you might define a catalog and its schemas using the Databricks provider for Terraform:

```terraform
resource "databricks_catalog" "prod_catalog" {
  name          = "prod"
  comment       = "Main production catalog for all enterprise data."
  storage_root  = var.default_catalog_storage_root
  force_destroy = false
}

# Schemas within the 'prod' catalog
resource "databricks_schema" "prod_raw" {
  catalog_name = databricks_catalog.prod_catalog.name
  name         = "raw"
  comment      = "Raw data for all different projects, telemetry, game data etc., before any transformations. No schema enforcement."
}

resource "databricks_schema" "prod_bi_conformed" {
  catalog_name = databricks_catalog.prod_catalog.name
  name         = "bi_conformed"
  comment      = "Conformed (silver) schema for Business Intelligence, cleaned and well-formatted. Schema enforced."
}

resource "databricks_schema" "prod_bi_modeled" {
  catalog_name = databricks_catalog.prod_catalog.name
  name         = "bi_modeled"
  comment      = "Modeled (gold) schema for Business Intelligence, aggregated and ready for consumption. Schema enforced."
}
```

Fear not if you already have existing catalogs and schemas in Unity Catalog. You don't need to recreate them to manage their metadata as code. Terraform provides the `terraform import` command, which allows you to bring existing infrastructure (including Unity Catalog assets) under its management. Once imported, you can define the resource in your Terraform configuration and selectively update attributes like the `comment` field without affecting the asset itself. For example, after importing an existing schema, you could add or update its `comment` in your `.tf` file, and `terraform apply` would only apply that change.

Adopting a metadata-as-code strategy, especially for foundational elements like catalogs and schemas, greatly enhances the quality and reliability of the metadata that this MCP server leverages. This, in turn, further improves the effectiveness of AI agents interacting with your Databricks data.

For more details on using Terraform with Databricks Unity Catalog, refer to the official documentation:
*   Databricks Provider: Catalog Resource ([https://registry.terraform.io/providers/databricks/databricks/latest/docs/resources/catalog](https://registry.terraform.io/providers/databricks/databricks/latest/docs/resources/catalog))
*   Databricks Provider: Schemas Data Source ([https://registry.terraform.io/providers/databricks/databricks/latest/docs/data-sources/schemas](https://registry.terraform.io/providers/databricks/databricks/latest/docs/data-sources/schemas))

## Handling Long-Running Queries

The `execute_sql_query` tool utilizes the Databricks SDK's `execute_statement` method. The `wait_timeout` parameter in the underlying `databricks_sdk_utils.execute_databricks_sql` function is set to '50s'. If a query runs longer than this, the SDK may return a statement ID for polling, but the current implementation of the tool effectively waits up to this duration for a synchronous-like response. For very long-running queries, this timeout might be reached.

## Dependencies

-   `databricks-sdk`: For interacting with the Databricks REST APIs and Unity Catalog.
-   `python-dotenv`: For loading environment variables from a `.env` file.
-   `mcp[cli]`: The Model Context Protocol library.
-   `asyncio`: For asynchronous operations within the MCP server.
-   `httpx` (typically a sub-dependency of `databricks-sdk` or `mcp`): For making HTTP requests.


```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
httpx>=0.28.1
python-dotenv>=1.0.0 
mcp[cli]>=1.2.0
asyncio>=3.4.3
databricks-sdk==0.55.0
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "databricks"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
    "httpx>=0.28.1",
    "mcp[cli]>=1.3.0",
]

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM python:3.10-slim

WORKDIR /app

# Set environment variables (users should provide these at runtime)
ENV DATABRICKS_HOST="your-databricks-instance.cloud.databricks.com"
ENV DATABRICKS_TOKEN="your-databricks-access-token"
ENV DATABRICKS_SQL_WAREHOUSE_ID="your-sql-warehouse-id"

COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "main.py"] 
```

--------------------------------------------------------------------------------
/databricks_formatter.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Dict, List


def format_query_results(result: Dict[str, Any]) -> str:
    """Format query results from either SDK or direct API style into a readable string."""

    if not result:
        return "No results or invalid result format."
    
    column_names: List[str] = []
    rows: List[List[Any]] = [] # For new style, this will be list of dicts initially
    data_rows_formatted: List[str] = []

    # Try to parse as output from execute_databricks_sql (SDK based)
    if result.get("status") == "success" and "data" in result:
        print("Formatting results from SDK-based execute_databricks_sql output.")
        sdk_data = result.get("data", [])
        if not sdk_data: # No rows, but query was successful
            # Try to get column names if available even with no data (e.g., from a manifest if we adapt execute_databricks_sql later)
            # For now, if no data, we might not have explicit column names easily in this path.
            # However, execute_databricks_sql returns column names implicit in the (empty) list of dicts.
            # This part needs careful handling if sdk_data is empty but we still want headers.
            # Let's assume if sdk_data is empty, we might not have columns easily unless manifest is also passed.
            # For now, if sdk_data is empty, we report no data rows. Future improvement: get columns from manifest if possible.
            if result.get("message") == "Query succeeded but returned no data.":
                 # If we had column names from execute_databricks_sql (e.g. if it returned them separately)
                 # we could print headers. For now, this message is sufficient.
                return "Query succeeded but returned no data."
            return "Query succeeded but returned no data rows."

        # Assuming sdk_data is a list of dictionaries, get column names from the first row's keys
        if isinstance(sdk_data, list) and len(sdk_data) > 0 and isinstance(sdk_data[0], dict):
            column_names = list(sdk_data[0].keys())
        
        for row_dict in sdk_data:
            row_values = []
            for col_name in column_names: # Iterate in order of discovered column names
                value = row_dict.get(col_name)
                if value is None:
                    row_values.append("NULL")
                else:
                    row_values.append(str(value))
            data_rows_formatted.append(" | ".join(row_values))
    
    # Try to parse as old direct API style output (from dbapi.execute_statement)
    elif 'manifest' in result and 'result' in result:
        print("Formatting results from original dbapi.execute_statement output.")
        if result['manifest'].get('schema') and result['manifest']['schema'].get('columns'):
            columns_schema = result['manifest']['schema']['columns']
            column_names = [col['name'] for col in columns_schema if 'name' in col] if columns_schema else []
        
        if result['result'].get('data_array'):
            raw_rows = result['result']['data_array']
            for row_list in raw_rows:
                row_values = []
                for value in row_list:
                    if value is None:
                        row_values.append("NULL")
                    else:
                        row_values.append(str(value))
                data_rows_formatted.append(" | ".join(row_values))
    else:
        # Fallback if structure is completely unrecognized or an error dict itself
        if result.get("status") == "error" and result.get("error"):
            return f"Error from query execution: {result.get('error')} Details: {result.get('details', 'N/A')}"
        return "Invalid or unrecognized result format."

    # Common formatting part for table output
    if not column_names:
        return "No column names found in the result."
    
    output_lines = []
    output_lines.append(" | ".join(column_names))
    output_lines.append("-" * (sum(len(name) + 3 for name in column_names) - 1 if column_names else 0))

    if not data_rows_formatted:
        output_lines.append("No data rows found.")
    else:
        output_lines.extend(data_rows_formatted)
    
    return "\n".join(output_lines) 
```

--------------------------------------------------------------------------------
/main.py:
--------------------------------------------------------------------------------

```python
from typing import Optional
import asyncio
from mcp.server.fastmcp import FastMCP
from databricks_formatter import format_query_results
from databricks_sdk_utils import (
    get_uc_table_details,
    get_uc_catalog_details,
    get_uc_schema_details,
    execute_databricks_sql,
    get_uc_all_catalogs_summary
)


mcp = FastMCP("databricks")

@mcp.tool()
async def execute_sql_query(sql: str) -> str:
    """
    Executes a given SQL query against the Databricks SQL warehouse and returns the formatted results.
    
    Use this tool when you need to run specific SQL queries, such as SELECT, SHOW, or other DQL statements.
    This is ideal for targeted data retrieval or for queries that are too complex for the structured description tools.
    The results are returned in a human-readable, Markdown-like table format.

    Args:
        sql: The complete SQL query string to execute.
    """
    try:
        sdk_result = await asyncio.to_thread(execute_databricks_sql, sql_query=sql)
        
        status = sdk_result.get("status")
        if status == "failed":
            error_message = sdk_result.get("error", "Unknown query execution error.")
            details = sdk_result.get("details", "No additional details provided.")
            return f"SQL Query Failed: {error_message}\nDetails: {details}"
        elif status == "error":
            error_message = sdk_result.get("error", "Unknown error during SQL execution.")
            details = sdk_result.get("details", "No additional details provided.")
            return f"Error during SQL Execution: {error_message}\nDetails: {details}"
        elif status == "success":
            return format_query_results(sdk_result)
        else:
            # Should not happen if execute_databricks_sql always returns a known status
            return f"Received an unexpected status from query execution: {status}. Result: {sdk_result}"
            
    except Exception as e:
        return f"An unexpected error occurred while executing SQL query: {str(e)}"


@mcp.tool()
async def describe_uc_table(full_table_name: str, include_lineage: Optional[bool] = False) -> str:
    """
    Provides a detailed description of a specific Unity Catalog table.
    
    Use this tool to understand the structure (columns, data types, partitioning) of a single table.
    This is essential before constructing SQL queries against the table.
    
    Optionally, it can include comprehensive lineage information that goes beyond traditional 
    table-to-table dependencies:

    **Table Lineage:**
    - Upstream tables (tables this table reads from)
    - Downstream tables (tables that read from this table)
    
    **Notebook & Job Lineage:**
    - Notebooks that read from this table, including:
      * Notebook name and workspace path
      * Associated Databricks job information (job name, ID, task details)
    - Notebooks that write to this table with the same detailed context
    
    **Use Cases:**
    - Data impact analysis: understand what breaks if you modify this table
    - Code discovery: find notebooks that process this data for further analysis
    - Debugging: trace data flow issues by examining both table dependencies and processing code
    - Documentation: understand the complete data ecosystem around a table

    The lineage information allows LLMs and tools to subsequently fetch the actual notebook 
    code content for deeper analysis of data transformations and business logic.

    The output is formatted in Markdown.

    Args:
        full_table_name: The fully qualified three-part name of the table (e.g., `catalog.schema.table`).
        include_lineage: Set to True to fetch and include comprehensive lineage (tables, notebooks, jobs). 
                         Defaults to False. May take longer to retrieve but provides rich context for 
                         understanding data dependencies and enabling code exploration.
    """
    try:
        details_markdown = await asyncio.to_thread(
            get_uc_table_details,
            full_table_name=full_table_name,
            include_lineage=include_lineage
        )
        return details_markdown
    except ImportError as e:
        return f"Error initializing Databricks SDK utilities: {str(e)}. Please ensure DATABRICKS_HOST and DATABRICKS_TOKEN are set."
    except Exception as e:
        return f"Error getting detailed table description for '{full_table_name}': {str(e)}"

@mcp.tool()
async def describe_uc_catalog(catalog_name: str) -> str:
    """
    Provides a summary of a specific Unity Catalog, listing all its schemas with their names and descriptions.
    
    Use this tool when you know the catalog name and need to discover the schemas within it.
    This is often a precursor to describing a specific schema or table.
    The output is formatted in Markdown.

    Args:
        catalog_name: The name of the Unity Catalog to describe (e.g., `prod`, `dev`, `system`).
    """
    try:
        summary_markdown = await asyncio.to_thread(
            get_uc_catalog_details,
            catalog_name=catalog_name
        )
        return summary_markdown
    except ImportError as e:
        return f"Error initializing Databricks SDK utilities: {str(e)}. Please ensure DATABRICKS_HOST and DATABRICKS_TOKEN are set."
    except Exception as e:
        return f"Error getting catalog summary for '{catalog_name}': {str(e)}"

@mcp.tool()
async def describe_uc_schema(catalog_name: str, schema_name: str, include_columns: Optional[bool] = False) -> str:
    """
    Provides detailed information about a specific schema within a Unity Catalog.
    
    Use this tool to understand the contents of a schema, primarily its tables.
    Optionally, it can list all tables within the schema and their column details.
    Set `include_columns=True` to get column information, which is crucial for query construction but makes the output longer.
    If `include_columns=False`, only table names and descriptions are shown, useful for a quicker overview.
    The output is formatted in Markdown.

    Args:
        catalog_name: The name of the catalog containing the schema.
        schema_name: The name of the schema to describe.
        include_columns: If True, lists tables with their columns. Defaults to False for a briefer summary.
    """
    try:
        details_markdown = await asyncio.to_thread(
            get_uc_schema_details,
            catalog_name=catalog_name,
            schema_name=schema_name,
            include_columns=include_columns
        )
        return details_markdown
    except ImportError as e:
        return f"Error initializing Databricks SDK utilities: {str(e)}. Please ensure DATABRICKS_HOST and DATABRICKS_TOKEN are set."
    except Exception as e:
        return f"Error getting detailed schema description for '{catalog_name}.{schema_name}': {str(e)}"

@mcp.tool()
async def list_uc_catalogs() -> str:
    """
    Lists all available Unity Catalogs with their names, descriptions, and types.
    
    Use this tool as a starting point to discover available data sources when you don't know specific catalog names.
    It provides a high-level overview of all accessible catalogs in the workspace.
    The output is formatted in Markdown.
    """
    try:
        summary_markdown = await asyncio.to_thread(get_uc_all_catalogs_summary)
        return summary_markdown
    except ImportError as e:
        return f"Error initializing Databricks SDK utilities: {str(e)}. Please ensure DATABRICKS_HOST and DATABRICKS_TOKEN are set."
    except Exception as e:
        return f"Error listing catalogs: {str(e)}"

if __name__ == "__main__":
    mcp.run(transport='stdio')
```

--------------------------------------------------------------------------------
/databricks_sdk_utils.py:
--------------------------------------------------------------------------------

```python
from databricks.sdk import WorkspaceClient
from databricks.sdk.core import Config
from databricks.sdk.service.catalog import TableInfo, SchemaInfo, ColumnInfo, CatalogInfo
from databricks.sdk.service.sql import StatementResponse, StatementState
from typing import Dict, Any, List
import os
import json
import time
from dotenv import load_dotenv

# Load environment variables from .env file when the module is imported
load_dotenv()

DATABRICKS_HOST = os.environ.get("DATABRICKS_HOST")
DATABRICKS_TOKEN = os.environ.get("DATABRICKS_TOKEN")
DATABRICKS_SQL_WAREHOUSE_ID = os.environ.get("DATABRICKS_SQL_WAREHOUSE_ID")

if not DATABRICKS_HOST or not DATABRICKS_TOKEN:
    raise ImportError(
        "DATABRICKS_HOST and DATABRICKS_TOKEN must be set in environment variables or .env file "
        "for databricks_sdk_utils to initialize."
    )

# Configure and initialize the global SDK client
# Using short timeouts as previously determined to be effective
sdk_config = Config(
    host=DATABRICKS_HOST,
    token=DATABRICKS_TOKEN,
    http_timeout_seconds=30,
    retry_timeout_seconds=60
)
sdk_client = WorkspaceClient(config=sdk_config)

# Cache for job information to avoid redundant API calls
_job_cache = {}
_notebook_cache = {}

def _format_column_details_md(columns: List[ColumnInfo]) -> List[str]:
    """
    Formats a list of ColumnInfo objects into a list of Markdown strings.
    """
    markdown_lines = []
    if not columns:
        markdown_lines.append("  - *No column information available.*")
        return markdown_lines

    for col in columns:
        if not isinstance(col, ColumnInfo):
            print(f"Warning: Encountered an unexpected item in columns list: {type(col)}. Skipping.")
            continue
        col_type = col.type_text or (col.type_name.value if col.type_name and hasattr(col.type_name, 'value') else "N/A")
        nullable_status = "nullable" if col.nullable else "not nullable"
        col_description = f": {col.comment}" if col.comment else ""
        markdown_lines.append(f"  - **{col.name}** (`{col_type}`, {nullable_status}){col_description}")
    return markdown_lines

def _get_job_info_cached(job_id: str) -> Dict[str, Any]:
    """Get job information with caching to avoid redundant API calls"""
    if job_id not in _job_cache:
        try:
            job_info = sdk_client.jobs.get(job_id=job_id)
            _job_cache[job_id] = {
                'name': job_info.settings.name if job_info.settings.name else f"Job {job_id}",
                'tasks': []
            }
            
            # Pre-process all tasks to build notebook mapping
            if job_info.settings.tasks:
                for task in job_info.settings.tasks:
                    if hasattr(task, 'notebook_task') and task.notebook_task:
                        task_info = {
                            'task_key': task.task_key,
                            'notebook_path': task.notebook_task.notebook_path
                        }
                        _job_cache[job_id]['tasks'].append(task_info)
                        
        except Exception as e:
            print(f"Error fetching job {job_id}: {e}")
            _job_cache[job_id] = {
                'name': f"Job {job_id}",
                'tasks': [],
                'error': str(e)
            }
    
    return _job_cache[job_id]

def _get_notebook_id_cached(notebook_path: str) -> str:
    """Get notebook ID with caching to avoid redundant API calls"""
    if notebook_path not in _notebook_cache:
        try:
            notebook_details = sdk_client.workspace.get_status(notebook_path)
            _notebook_cache[notebook_path] = str(notebook_details.object_id)
        except Exception as e:
            print(f"Error fetching notebook {notebook_path}: {e}")
            _notebook_cache[notebook_path] = None
    
    return _notebook_cache[notebook_path]

def _resolve_notebook_info_optimized(notebook_id: str, job_id: str) -> Dict[str, Any]:
    """
    Optimized version that resolves notebook info using cached job data.
    Returns dict with notebook_path, notebook_name, job_name, and task_key.
    """
    result = {
        'notebook_id': notebook_id,
        'notebook_path': f"notebook_id:{notebook_id}",
        'notebook_name': f"notebook_id:{notebook_id}",
        'job_id': job_id,
        'job_name': f"Job {job_id}",
        'task_key': None
    }
    
    # Get cached job info
    job_info = _get_job_info_cached(job_id)
    result['job_name'] = job_info['name']
    
    # Look for notebook in job tasks
    for task_info in job_info['tasks']:
        notebook_path = task_info['notebook_path']
        cached_notebook_id = _get_notebook_id_cached(notebook_path)
        
        if cached_notebook_id == notebook_id:
            result['notebook_path'] = notebook_path
            result['notebook_name'] = notebook_path.split('/')[-1]
            result['task_key'] = task_info['task_key']
            break
    
    return result

def _format_notebook_info_optimized(notebook_info: Dict[str, Any]) -> str:
    """
    Formats notebook information using pre-resolved data.
    """
    lines = []
    
    if notebook_info['notebook_path'].startswith('/'):
        lines.append(f"**`{notebook_info['notebook_name']}`**")
        lines.append(f"  - **Path**: `{notebook_info['notebook_path']}`")
    else:
        lines.append(f"**{notebook_info['notebook_name']}**")
    
    lines.append(f"  - **Job**: {notebook_info['job_name']} (ID: {notebook_info['job_id']})")
    if notebook_info['task_key']:
        lines.append(f"  - **Task**: {notebook_info['task_key']}")
    
    return "\n".join(lines)

def _process_lineage_results(lineage_query_output: Dict[str, Any], main_table_full_name: str) -> Dict[str, Any]:
    """
    Optimized version of lineage processing that batches API calls and uses caching.
    """
    print("Processing lineage results with optimization...")
    start_time = time.time()
    
    processed_data: Dict[str, Any] = {
        "upstream_tables": [],
        "downstream_tables": [],
        "notebooks_reading": [],
        "notebooks_writing": []
    }
    
    if not lineage_query_output or lineage_query_output.get("status") != "success" or not isinstance(lineage_query_output.get("data"), list):
        print("Warning: Lineage query output is invalid or not successful. Returning empty lineage.")
        return processed_data

    upstream_set = set()
    downstream_set = set()
    notebooks_reading_dict = {}
    notebooks_writing_dict = {}
    
    # Collect all unique job IDs first for batch processing
    unique_job_ids = set()
    notebook_job_pairs = []
    
    for row in lineage_query_output["data"]:
        source_table = row.get("source_table_full_name")
        target_table = row.get("target_table_full_name")
        entity_metadata = row.get("entity_metadata")
        
        # Parse entity metadata
        notebook_id = None
        job_id = None
        
        if entity_metadata:
            try:
                if isinstance(entity_metadata, str):
                    metadata_dict = json.loads(entity_metadata)
                else:
                    metadata_dict = entity_metadata
                    
                notebook_id = metadata_dict.get("notebook_id")
                job_info = metadata_dict.get("job_info")
                if job_info:
                    job_id = job_info.get("job_id")
            except (json.JSONDecodeError, AttributeError):
                pass
        
        # Process table-to-table lineage
        if source_table == main_table_full_name and target_table and target_table != main_table_full_name:
            downstream_set.add(target_table)
        elif target_table == main_table_full_name and source_table and source_table != main_table_full_name:
            upstream_set.add(source_table)
        
        # Collect notebook-job pairs for batch processing
        if notebook_id and job_id:
            unique_job_ids.add(job_id)
            notebook_job_pairs.append({
                'notebook_id': notebook_id,
                'job_id': job_id,
                'source_table': source_table,
                'target_table': target_table
            })
    
    # Pre-load all job information in parallel (this is where the optimization happens)
    print(f"Pre-loading {len(unique_job_ids)} unique jobs...")
    batch_start = time.time()
    
    for job_id in unique_job_ids:
        _get_job_info_cached(job_id)  # This will cache the job info
    
    batch_time = time.time() - batch_start
    print(f"Job batch loading took {batch_time:.2f} seconds")
    
    # Now process all notebook-job pairs using cached data
    print(f"Processing {len(notebook_job_pairs)} notebook entries...")
    for pair in notebook_job_pairs:
        notebook_info = _resolve_notebook_info_optimized(pair['notebook_id'], pair['job_id'])
        formatted_info = _format_notebook_info_optimized(notebook_info)
        
        if pair['source_table'] == main_table_full_name:
            notebooks_reading_dict[pair['notebook_id']] = formatted_info
        elif pair['target_table'] == main_table_full_name:
            notebooks_writing_dict[pair['notebook_id']] = formatted_info
    
    processed_data["upstream_tables"] = sorted(list(upstream_set))
    processed_data["downstream_tables"] = sorted(list(downstream_set))
    processed_data["notebooks_reading"] = sorted(list(notebooks_reading_dict.values()))
    processed_data["notebooks_writing"] = sorted(list(notebooks_writing_dict.values()))
    
    total_time = time.time() - start_time
    print(f"Total lineage processing took {total_time:.2f} seconds")
    
    return processed_data

def clear_lineage_cache():
    """Clear the job and notebook caches to free memory"""
    global _job_cache, _notebook_cache
    _job_cache = {}
    _notebook_cache = {}
    print("Cleared lineage caches")

def _get_table_lineage(table_full_name: str) -> Dict[str, Any]:
    """
    Retrieves table lineage information for a given table using the global SDK client
    and global SQL warehouse ID. Now includes notebook and job information with enhanced details.
    """
    if not DATABRICKS_SQL_WAREHOUSE_ID: # Check before attempting query
        return {"status": "error", "error": "DATABRICKS_SQL_WAREHOUSE_ID is not set. Cannot fetch lineage."}

    lineage_sql_query = f"""
    SELECT source_table_full_name, target_table_full_name, entity_type, entity_id, 
           entity_run_id, entity_metadata, created_by, event_time
    FROM system.access.table_lineage
    WHERE source_table_full_name = '{table_full_name}' OR target_table_full_name = '{table_full_name}'
    ORDER BY event_time DESC LIMIT 100;
    """
    print(f"Fetching and processing lineage for table: {table_full_name}")
    # execute_databricks_sql will now use the global warehouse_id
    raw_lineage_output = execute_databricks_sql(lineage_sql_query, wait_timeout='50s') 
    return _process_lineage_results(raw_lineage_output, table_full_name)

def _format_single_table_md(table_info: TableInfo, base_heading_level: int, display_columns: bool) -> List[str]:
    """
    Formats the details for a single TableInfo object into a list of Markdown strings.
    Uses a base_heading_level to control Markdown header depth for hierarchical display.
    """
    table_markdown_parts = []
    table_header_prefix = "#" * base_heading_level
    sub_header_prefix = "#" * (base_heading_level + 1)

    table_markdown_parts.append(f"{table_header_prefix} Table: **{table_info.full_name}**")

    if table_info.comment:
        table_markdown_parts.extend(["", f"**Description**: {table_info.comment}"])
    elif base_heading_level == 1:
        table_markdown_parts.extend(["", "**Description**: No description provided."])
    
    # Process and add partition columns
    partition_column_names: List[str] = []
    if table_info.columns:
        temp_partition_cols: List[tuple[str, int]] = []
        for col in table_info.columns:
            if col.partition_index is not None:
                temp_partition_cols.append((col.name, col.partition_index))
        if temp_partition_cols:
            temp_partition_cols.sort(key=lambda x: x[1])
            partition_column_names = [name for name, index in temp_partition_cols]

    if partition_column_names:
        table_markdown_parts.extend(["", f"{sub_header_prefix} Partition Columns"])
        table_markdown_parts.extend([f"- `{col_name}`" for col_name in partition_column_names])
    elif base_heading_level == 1:
        table_markdown_parts.extend(["", f"{sub_header_prefix} Partition Columns", "- *This table is not partitioned or partition key information is unavailable.*"])

    if display_columns:
        table_markdown_parts.extend(["", f"{sub_header_prefix} Table Columns"])
        if table_info.columns:
            table_markdown_parts.extend(_format_column_details_md(table_info.columns))
        else:
            table_markdown_parts.append("  - *No column information available.*")
            
    return table_markdown_parts

def execute_databricks_sql(sql_query: str, wait_timeout: str = '50s') -> Dict[str, Any]:
    """
    Executes a SQL query on Databricks using the global SDK client and global SQL warehouse ID.
    """
    if not DATABRICKS_SQL_WAREHOUSE_ID:
        return {"status": "error", "error": "DATABRICKS_SQL_WAREHOUSE_ID is not set. Cannot execute SQL query."}
    
    try:
        print(f"Executing SQL on warehouse {DATABRICKS_SQL_WAREHOUSE_ID} (timeout: {wait_timeout}):\n{sql_query[:200]}..." + (" (truncated)" if len(sql_query) > 200 else ""))
        response: StatementResponse = sdk_client.statement_execution.execute_statement(
            statement=sql_query,
            warehouse_id=DATABRICKS_SQL_WAREHOUSE_ID, # Use global warehouse ID
            wait_timeout=wait_timeout
        )

        if response.status and response.status.state == StatementState.SUCCEEDED:
            if response.result and response.result.data_array:
                column_names = [col.name for col in response.manifest.schema.columns] if response.manifest and response.manifest.schema and response.manifest.schema.columns else []
                results = [dict(zip(column_names, row)) for row in response.result.data_array]
                return {"status": "success", "row_count": len(results), "data": results}
            else:
                return {"status": "success", "row_count": 0, "data": [], "message": "Query succeeded but returned no data."}
        elif response.status:
            error_message = response.status.error.message if response.status.error else "No error details provided."
            return {"status": "failed", "error": f"Query execution failed with state: {response.status.state.value}", "details": error_message}
        else:
            return {"status": "failed", "error": "Query execution status unknown."}
    except Exception as e:
        return {"status": "error", "error": f"An error occurred during SQL execution: {str(e)}"}

def get_uc_table_details(full_table_name: str, include_lineage: bool = False) -> str:
    """
    Fetches table metadata and optionally lineage, then formats it into a Markdown string.
    Uses the _format_single_table_md helper for core table structure.
    """
    print(f"Fetching metadata for {full_table_name}...")
    
    try:
        table_info: TableInfo = sdk_client.tables.get(full_name=full_table_name)
    except Exception as e:
        error_details = str(e)
        return f"""# Error: Could Not Retrieve Table Details
**Table:** `{full_table_name}`
**Problem:** Failed to fetch the complete metadata for this table.
**Details:**
```
{error_details}
```"""

    markdown_parts = _format_single_table_md(table_info, base_heading_level=1, display_columns=True)

    if include_lineage:
        markdown_parts.extend(["", "## Lineage Information"])
        if not DATABRICKS_SQL_WAREHOUSE_ID:
            markdown_parts.append("- *Lineage fetching skipped: `DATABRICKS_SQL_WAREHOUSE_ID` environment variable is not set.*")
        else:
            print(f"Fetching lineage for {full_table_name}...")
            lineage_info = _get_table_lineage(full_table_name)
            
            has_upstream = lineage_info and isinstance(lineage_info.get("upstream_tables"), list) and lineage_info["upstream_tables"]
            has_downstream = lineage_info and isinstance(lineage_info.get("downstream_tables"), list) and lineage_info["downstream_tables"]
            has_notebooks_reading = lineage_info and isinstance(lineage_info.get("notebooks_reading"), list) and lineage_info["notebooks_reading"]
            has_notebooks_writing = lineage_info and isinstance(lineage_info.get("notebooks_writing"), list) and lineage_info["notebooks_writing"]

            if has_upstream:
                markdown_parts.extend(["", "### Upstream Tables (tables this table reads from):"])
                markdown_parts.extend([f"- `{table}`" for table in lineage_info["upstream_tables"]])
            
            if has_downstream:
                markdown_parts.extend(["", "### Downstream Tables (tables that read from this table):"])
                markdown_parts.extend([f"- `{table}`" for table in lineage_info["downstream_tables"]])
            
            if has_notebooks_reading:
                markdown_parts.extend(["", "### Notebooks Reading from this Table:"])
                for notebook in lineage_info["notebooks_reading"]:
                    markdown_parts.extend([f"- {notebook}", ""])
            
            if has_notebooks_writing:
                markdown_parts.extend(["", "### Notebooks Writing to this Table:"])
                for notebook in lineage_info["notebooks_writing"]:
                    markdown_parts.extend([f"- {notebook}", ""])
            
            if not any([has_upstream, has_downstream, has_notebooks_reading, has_notebooks_writing]):
                if lineage_info and lineage_info.get("status") == "error" and lineage_info.get("error"):
                     markdown_parts.extend(["", "*Note: Could not retrieve complete lineage information.*", f"> *Lineage fetch error: {lineage_info.get('error')}*"])
                elif lineage_info and lineage_info.get("status") != "success" and lineage_info.get("error"):
                    markdown_parts.extend(["", "*Note: Could not retrieve complete lineage information.*", f"> *Lineage fetch error: {lineage_info.get('error')}*"])
                else:
                    markdown_parts.append("- *No table, notebook, or job dependencies found or lineage fetch was not fully successful.*")
    else:
        markdown_parts.extend(["", "## Lineage Information", "- *Lineage fetching skipped as per request.*"])

    return "\n".join(markdown_parts)

def get_uc_schema_details(catalog_name: str, schema_name: str, include_columns: bool = False) -> str:
    """
    Fetches detailed information for a specific schema, optionally including its tables and their columns.
    Uses the global SDK client and the _format_single_table_md helper with appropriate heading levels.
    """
    full_schema_name = f"{catalog_name}.{schema_name}"
    markdown_parts = [f"# Schema Details: **{full_schema_name}**"]

    try:
        print(f"Fetching details for schema: {full_schema_name}...")
        schema_info: SchemaInfo = sdk_client.schemas.get(full_name=full_schema_name)

        description = schema_info.comment if schema_info.comment else "No description provided."
        markdown_parts.append(f"**Description**: {description}")
        markdown_parts.append("")

        markdown_parts.append(f"## Tables in Schema `{schema_name}`")
            
        tables_iterable = sdk_client.tables.list(catalog_name=catalog_name, schema_name=schema_name)
        tables_list = list(tables_iterable)

        if not tables_list:
            markdown_parts.append("- *No tables found in this schema.*")
        else:
            for i, table_info in enumerate(tables_list):
                if not isinstance(table_info, TableInfo):
                    print(f"Warning: Encountered an unexpected item in tables list: {type(table_info)}")
                    continue
                
                markdown_parts.extend(_format_single_table_md(
                    table_info, 
                    base_heading_level=3,
                    display_columns=include_columns
                ))
                if i < len(tables_list) - 1:
                    markdown_parts.append("\n=============\n")
                else:
                    markdown_parts.append("")

    except Exception as e:
        error_message = f"Failed to retrieve details for schema '{full_schema_name}': {str(e)}"
        print(f"Error in get_uc_schema_details: {error_message}")
        return f"""# Error: Could Not Retrieve Schema Details
**Schema:** `{full_schema_name}`
**Problem:** An error occurred while attempting to fetch schema information.
**Details:**
```
{error_message}
```"""

    return "\n".join(markdown_parts)

def get_uc_catalog_details(catalog_name: str) -> str:
    """
    Fetches and formats a summary of all schemas within a given catalog
    using the global SDK client.
    """
    markdown_parts = [f"# Catalog Summary: **{catalog_name}**", ""]
    schemas_found_count = 0
    
    try:
        print(f"Fetching schemas for catalog: {catalog_name} using global sdk_client...")
        # The sdk_client is globally defined in this module
        schemas_iterable = sdk_client.schemas.list(catalog_name=catalog_name)
        
        # Convert iterator to list to easily check if empty and get a count
        schemas_list = list(schemas_iterable) 

        if not schemas_list:
            markdown_parts.append(f"No schemas found in catalog `{catalog_name}`.")
            return "\n".join(markdown_parts)

        schemas_found_count = len(schemas_list)
        markdown_parts.append(f"Showing top {schemas_found_count} schemas found in catalog `{catalog_name}`:")
        markdown_parts.append("")

        for i, schema_info in enumerate(schemas_list):
            if not isinstance(schema_info, SchemaInfo):
                print(f"Warning: Encountered an unexpected item in schemas list: {type(schema_info)}")
                continue

            # Start of a schema item in the list
            schema_name_display = schema_info.full_name if schema_info.full_name else "Unnamed Schema"
            markdown_parts.append(f"## {schema_name_display}") # Main bullet point for schema name
                        
            description = f"**Description**: {schema_info.comment}" if schema_info.comment else ""
            markdown_parts.append(description)
            
            markdown_parts.append("") # Add a blank line for separation between schemas, or remove if too much space

    except Exception as e:
        error_message = f"Failed to retrieve schemas for catalog '{catalog_name}': {str(e)}"
        print(f"Error in get_catalog_summary: {error_message}")
        # Return a structured error message in Markdown
        return f"""# Error: Could Not Retrieve Catalog Summary
**Catalog:** `{catalog_name}`
**Problem:** An error occurred while attempting to fetch schema information.
**Details:**
```
{error_message}
```"""
    
    markdown_parts.append(f"**Total Schemas Found in `{catalog_name}`**: {schemas_found_count}")
    return "\n".join(markdown_parts)



def get_uc_all_catalogs_summary() -> str:
    """
    Fetches a summary of all available Unity Catalogs, including their names, comments, and types.
    Uses the global SDK client.
    """
    markdown_parts = ["# Available Unity Catalogs", ""]
    catalogs_found_count = 0

    try:
        print("Fetching all catalogs using global sdk_client...")
        catalogs_iterable = sdk_client.catalogs.list()
        catalogs_list = list(catalogs_iterable)

        if not catalogs_list:
            markdown_parts.append("- *No catalogs found or accessible.*")
            return "\n".join(markdown_parts)

        catalogs_found_count = len(catalogs_list)
        markdown_parts.append(f"Found {catalogs_found_count} catalog(s):")
        markdown_parts.append("")

        for catalog_info in catalogs_list:
            if not isinstance(catalog_info, CatalogInfo):
                print(f"Warning: Encountered an unexpected item in catalogs list: {type(catalog_info)}")
                continue
            
            markdown_parts.append(f"- **`{catalog_info.name}`**")
            description = catalog_info.comment if catalog_info.comment else "No description provided."
            markdown_parts.append(f"  - **Description**: {description}")
            
            catalog_type_str = "N/A"
            if catalog_info.catalog_type and hasattr(catalog_info.catalog_type, 'value'):
                catalog_type_str = catalog_info.catalog_type.value
            elif catalog_info.catalog_type: # Fallback if it's not an Enum but has a direct string representation
                catalog_type_str = str(catalog_info.catalog_type)
            markdown_parts.append(f"  - **Type**: `{catalog_type_str}`")
            
            markdown_parts.append("") # Add a blank line for separation

    except Exception as e:
        error_message = f"Failed to retrieve catalog list: {str(e)}"
        print(f"Error in get_uc_all_catalogs_summary: {error_message}")
        return f"""# Error: Could Not Retrieve Catalog List
**Problem:** An error occurred while attempting to fetch the list of catalogs.
**Details:**
```
{error_message}
```"""
    
    return "\n".join(markdown_parts)


```