#
tokens: 44339/50000 42/42 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .github
│   ├── CODEOWNERS
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── custom.md
│   │   └── feature_request.md
│   ├── SECURITY.md
│   └── workflows
│       ├── checks.yml
│       └── mcp-server-release.yml
├── .gitignore
├── .pre-commit-config.yaml
├── CHANGELOG.md
├── CODE_OF_CONDUCT.md
├── commitlint.config.js
├── CONTRIBUTING.md
├── LICENSE
├── modelcontextprotocol
│   ├── .cursor
│   │   └── rules
│   │       ├── mcp-guidelines.mdc
│   │       ├── project-structure.mdc
│   │       ├── python.mdc
│   │       └── tool-development-guide.mdc
│   ├── .dockerignore
│   ├── .env.template
│   ├── .python-version
│   ├── client.py
│   ├── Dockerfile
│   ├── docs
│   │   ├── DEPLOYMENT.md
│   │   └── LOCAL_BUILD.md
│   ├── middleware.py
│   ├── pyproject.toml
│   ├── README.md
│   ├── server.py
│   ├── settings.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── assets.py
│   │   ├── domain.py
│   │   ├── dq_rules.py
│   │   ├── dsl.py
│   │   ├── glossary.py
│   │   ├── lineage.py
│   │   ├── models.py
│   │   ├── query.py
│   │   └── search.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── assets.py
│   │   ├── constants.py
│   │   ├── parameters.py
│   │   └── search.py
│   ├── uv.lock
│   └── version.py
└── README.md
```

# Files

--------------------------------------------------------------------------------
/modelcontextprotocol/.python-version:
--------------------------------------------------------------------------------

```
3.11

```

--------------------------------------------------------------------------------
/modelcontextprotocol/.env.template:
--------------------------------------------------------------------------------

```
ATLAN_BASE_URL=https://domain.atlan.com
ATLAN_API_KEY=your_api_key
ATLAN_AGENT_ID=your_agent_id
MCP_TRANSPORT="stdio"  # "stdio" , "sse" or "streamable-http"
MCP_HOST=0.0.0.0
MCP_PORT=8000
MCP_PATH="/"

```

--------------------------------------------------------------------------------
/modelcontextprotocol/.dockerignore:
--------------------------------------------------------------------------------

```
# Environment files
.env
.env.*

# Python virtual environments
.venv
venv/

# Python cache and compiled files
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
*.so

# IDE and editor files
.cursor/
.vscode/
.vscodeignore
.idea/
.DS_Store

# Git
.git/
.gitignore

# Python development and testing
.pytest_cache/
.ruff_cache/
.mypy_cache/
.coverage
.tox/
.nox/

# Python build artifacts
*.egg-info/
dist/
build/


# Development configuration
.pre-commit-config.yaml

```

--------------------------------------------------------------------------------
/.pre-commit-config.yaml:
--------------------------------------------------------------------------------

```yaml
repos:
-   repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.5.0
    hooks:
    -   id: trailing-whitespace
    -   id: end-of-file-fixer
    -   id: check-yaml
    -   id: check-added-large-files
    -   id: check-ast
    -   id: check-json
    -   id: check-merge-conflict
    -   id: detect-private-key

-   repo: https://github.com/alessandrojcm/commitlint-pre-commit-hook
    rev: v9.11.0
    hooks:
    -   id: commitlint
        stages: [commit-msg]
        additional_dependencies: ['@commitlint/config-conventional']

-   repo: https://github.com/astral-sh/ruff-pre-commit
    rev: v0.3.0
    hooks:
    -   id: ruff
        args: [--fix, --exit-non-zero-on-fix]
    -   id: ruff-format

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
#   For a library or package, you might want to ignore these files since the code is
#   intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
#   However, in case of collaboration, if having platform-specific dependencies or dependencies
#   having no cross-platform support, pipenv may install dependencies that don't work, or not
#   install all needed dependencies.
#Pipfile.lock

# poetry
#   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
#   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
#   in version control.
#   https://pdm.fming.dev/#use-with-ide
.pdm.toml

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
#  and can be added to the global gitignore or merged into this file.  For a more nuclear
#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
.vscode/

.DS_Store

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Atlan Agent Toolkit

[![Contributor Covenant](https://img.shields.io/badge/Contributor%20Covenant-2.1-4baaaa.svg)](code_of_conduct.md)
[![PyPI - Version](https://img.shields.io/pypi/v/atlan-mcp-server.svg)](https://pypi.org/project/atlan-mcp-server)
[![License](https://img.shields.io/github/license/atlanhq/agent-toolkit.svg)](https://github.com/atlanhq/agent-toolkit/blob/main/LICENSE)


This repository contains a collection of tools and protocols for interacting with Atlan services for AI agents. Each component is designed to provide specific functionality and can be used independently or together.

## Components

### Model Context Protocol (MCP)

An MCP server that enables interaction with Atlan services through tool calling. Provides tools for asset search, and retrieval using [pyatlan](https://developer.atlan.com/sdks/python/).

You can find the documentation and setup instructions for the MCP server [here](modelcontextprotocol/README.md).


## 🔍 DeepWiki: Ask Questions About This Project

[![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/atlanhq/agent-toolkit)


## Contributing

See [CONTRIBUTING.md](CONTRIBUTING.md) for details on how to contribute to the Atlan Agent Toolkit.


## License

The project is licensed under the [MIT License](LICENSE). Please see the [LICENSE](LICENSE) file for details.

```

--------------------------------------------------------------------------------
/modelcontextprotocol/README.md:
--------------------------------------------------------------------------------

```markdown
# Atlan MCP Server

The Atlan [Model Context Protocol](https://modelcontextprotocol.io/introduction) server allows your AI agents to interact with Atlan services.

## Quick Start

1. Generate Atlan API key by following the [documentation](https://ask.atlan.com/hc/en-us/articles/8312649180049-API-authentication).
2. Select one of the following approaches based on your preference:
   - **[Install via Docker](#install-via-docker)** - Uses Docker containers (recommended)
   - **[Install via uv](#install-via-uv)** - Uses UV package manager

> [!NOTE]
> Make sure to replace `<YOUR_API_KEY>`, `<YOUR_INSTANCE>`, and `<YOUR_AGENT_ID>` with your actual Atlan API key, instance URL, and agent ID(optional) in the configuration file respectively.

## Install via Docker

**Prerequisites:**
- Follow the official [Docker installation guide](https://docs.docker.com/get-docker/) for your operating system
- Verify Docker is running:
   ```bash
   docker --version
   ```

### Add to Claude Desktop

Go to `Claude > Settings > Developer > Edit Config > claude_desktop_config.json` and add:

```json
{
  "mcpServers": {
    "atlan": {
      "command": "docker",
      "args": [
        "run",
        "-i",
        "--rm",
        "-e",
        "ATLAN_API_KEY=<YOUR_API_KEY>",
        "-e",
        "ATLAN_BASE_URL=https://<YOUR_INSTANCE>.atlan.com",
        "-e",
        "ATLAN_AGENT_ID=<YOUR_AGENT_ID>",
        "ghcr.io/atlanhq/atlan-mcp-server:latest"
      ]
    }
  }
}
```

### Add to Cursor

Open `Cursor > Settings > Tools & Integrations > New MCP Server` to include the following:

```json
{
  "mcpServers": {
    "atlan": {
      "command": "docker",
      "args": [
        "run",
        "-i",
        "--rm",
        "-e",
        "ATLAN_API_KEY=<YOUR_API_KEY>",
        "-e",
        "ATLAN_BASE_URL=https://<YOUR_INSTANCE>.atlan.com",
        "-e",
        "ATLAN_AGENT_ID=<YOUR_AGENT_ID>",
        "ghcr.io/atlanhq/atlan-mcp-server:latest"
      ]
    }
  }
}
```

## Install via uv

**Prerequisites:**
- Install uv:
   ```bash
   # macOS/Linux
   curl -LsSf https://astral.sh/uv/install.sh | sh

   # Windows (PowerShell)
   powershell -c "irm https://astral.sh/uv/install.ps1 | iex"

   # Alternative: if you already have Python/pip
   pip install uv
   ```
- Verify installation:
  ```bash
  uv --version
  ```

> [!NOTE]
> With uv, `uvx` automatically fetches the latest version each time you run it. For more predictable behavior, consider using the Docker option.

### Add to Claude Desktop

Go to `Claude > Settings > Developer > Edit Config > claude_desktop_config.json` to include the following:

```json
{
  "mcpServers": {
    "atlan": {
      "command": "uvx",
      "args": ["atlan-mcp-server"],
      "env": {
        "ATLAN_API_KEY": "<YOUR_API_KEY>",
        "ATLAN_BASE_URL": "https://<YOUR_INSTANCE>.atlan.com",
        "ATLAN_AGENT_ID": "<YOUR_AGENT_ID>"
      }
    }
  }
}
```

### Add to Cursor

Open `Cursor > Settings > Tools & Integrations > New MCP Server` to include the following:

```json
{
  "mcpServers": {
    "atlan": {
      "command": "uvx",
      "args": ["atlan-mcp-server"],
      "env": {
        "ATLAN_API_KEY": "<YOUR_API_KEY>",
        "ATLAN_BASE_URL": "https://<YOUR_INSTANCE>.atlan.com",
        "ATLAN_AGENT_ID": "<YOUR_AGENT_ID>"
      }
    }
  }
}
```

## Available Tools

| Tool                | Description                                                       |
| ------------------- | ----------------------------------------------------------------- |
| `search_assets`     | Search for assets based on conditions                             |
| `get_assets_by_dsl` | Retrieve assets using a DSL query                                 |
| `traverse_lineage`  | Retrieve lineage for an asset                                     |
| `update_assets`     | Update asset attributes (user description and certificate status) |
| `create_glossaries` | Create glossaries                                                 |
| `create_glossary_categories` | Create glossary categories                               |
| `create_glossary_terms` | Create glossary terms                                         |
| `create_dq_rules`   | Create data quality rules on Table, View, MaterialisedView, or SnowflakeDynamicTable assets (column-level, table-level, custom SQL) |
| `update_dq_rules`   | Update existing data quality rules (threshold, priority, conditions, etc.) |
| `schedule_dq_rules` | Schedule data quality rule execution for assets using cron expressions |
| `delete_dq_rules`   | Delete one or multiple data quality rules by GUID                 |
| `query_asset`       | Execute SQL queries on table/view assets                          |

## Tool Access Control

The Atlan MCP Server includes a configurable tool restriction middleware that allows you to control which tools are available to users. This is useful for implementing role-based access control or restricting certain operations in specific environments.

### Restricting Tools

You can restrict access to specific tools using the `RESTRICTED_TOOLS` environment variable. Provide a comma-separated list of tool names that should be blocked:

#### Docker Configuration

```json
{
  "mcpServers": {
    "atlan": {
      "command": "docker",
      "args": [
        "run",
        "-i",
        "--rm",
        "-e",
        "ATLAN_API_KEY=<YOUR_API_KEY>",
        "-e",
        "ATLAN_BASE_URL=https://<YOUR_INSTANCE>.atlan.com",
        "-e",
        "ATLAN_AGENT_ID=<YOUR_AGENT_ID>",
        "-e",
        "RESTRICTED_TOOLS=get_assets_by_dsl_tool,update_assets_tool",
        "ghcr.io/atlanhq/atlan-mcp-server:latest"
      ]
    }
  }
}
```

#### uv Configuration

```json
{
  "mcpServers": {
    "atlan": {
      "command": "uvx",
      "args": ["atlan-mcp-server"],
      "env": {
        "ATLAN_API_KEY": "<YOUR_API_KEY>",
        "ATLAN_BASE_URL": "https://<YOUR_INSTANCE>.atlan.com",
        "ATLAN_AGENT_ID": "<YOUR_AGENT_ID>",
        "RESTRICTED_TOOLS": "get_assets_by_dsl_tool,update_assets_tool"
      }
    }
  }
}
```

### Available Tool Names for Restriction

You can restrict any of the following tools:

- `search_assets_tool` - Asset search functionality
- `get_assets_by_dsl_tool` - DSL query execution
- `traverse_lineage_tool` - Lineage traversal
- `update_assets_tool` - Asset updates (descriptions, certificates)
- `create_glossaries` - Glossary creation
- `create_glossary_categories` - Category creation
- `create_glossary_terms` - Term creation
- `create_dq_rules_tool` - Data quality rule creation
- `update_dq_rules_tool` - Data quality rule updates
- `schedule_dq_rules_tool` - Data quality rule scheduling
- `delete_dq_rules_tool` - Data quality rule deletion

### Common Use Cases

#### Read-Only Access
Restrict all write operations:
```
RESTRICTED_TOOLS=update_assets_tool,create_glossaries,create_glossary_categories,create_glossary_terms,create_dq_rules_tool,update_dq_rules_tool,schedule_dq_rules_tool,delete_dq_rules_tool
```

#### Disable DSL Queries
For security or performance reasons:
```
RESTRICTED_TOOLS=get_assets_by_dsl_tool
```

#### Minimal Access
Allow only basic search:
```
RESTRICTED_TOOLS=get_assets_by_dsl_tool,update_assets_tool,traverse_lineage_tool,create_glossaries,create_glossary_categories,create_glossary_terms,create_dq_rules_tool,update_dq_rules_tool,schedule_dq_rules_tool,delete_dq_rules_tool
```

### How It Works

When tools are restricted:
1. **Hidden from listings**: Restricted tools won't appear when clients request available tools
2. **Execution blocked**: If someone tries to execute a restricted tool, they'll receive a clear error message
3. **Logged**: All access decisions are logged for monitoring and debugging

### No Restrictions (Default)

If you don't set the `RESTRICTED_TOOLS` environment variable, all tools will be available by default.

## Transport Modes

The Atlan MCP Server supports three transport modes, each optimized for different deployment scenarios. For more details about MCP transport modes, see the [official MCP documentation](https://modelcontextprotocol.io/specification/2025-06-18/basic/transports).

| Transport Mode | Use Case | Benefits | When to Use |
|---|---|---|---|
| **stdio** (Default) | Local development, IDE integrations | Simple, direct communication | Claude Desktop, Cursor IDE |
| **SSE** (Server-Sent Events) | Remote deployments, web browsers | Real-time streaming, web-compatible | Cloud deployments, web clients |
| **streamable-http** | HTTP-based remote connections | Standard HTTP, load balancer friendly | Kubernetes, containerized deployments |

For comprehensive deployment instructions, configuration examples, and production best practices, see our [Deployment Guide](./docs/Deployment.md).

## Production Deployment

- Host the Atlan MCP container image on the cloud/platform of your choice
- Make sure you add all the required environment variables
- Choose the appropriate transport mode for your deployment scenario. SSE Transport is recommended for production (`-e MCP_TRANSPORT=sse`)
- For detailed deployment scenarios and configurations, refer to the [Deployment Guide](./docs/Deployment.md)

### Remote MCP Configuration

We currently do not have a remote MCP server for Atlan generally available.

You can use the [mcp-remote](https://www.npmjs.com/package/mcp-remote) local proxy tool to connect it to your remote MCP server.

This lets you to test what an interaction with your remote MCP server will be like with a real-world MCP client.

```json
{
  "mcpServers": {
    "math": {
      "command": "npx",
      "args": ["mcp-remote", "https://hosted-domain"]
    }
  }
}
```

## Develop Locally

Want to develop locally? Check out our [Local Build](./docs/LOCAL_BUILD.md) Guide for a step-by-step walkthrough!

## Need Help?

- Reach out to [email protected] for any questions or feedback
- You can also directly create a [GitHub issue](https://github.com/atlanhq/agent-toolkit/issues) and we will answer it for you

## Frequently Asked Questions

### Do I need Python installed?

**Short answer**: It depends on your installation method.

- **Docker (Recommended)**: No Python installation required on your host machine. The container includes everything needed.
- **uv**: A Python runtime is needed, but uv will automatically download and manage Python 3.11+ for you if it's not already available.

**Technical details**: The Atlan MCP server is implemented as a Python application. The Model Context Protocol itself is language-agnostic, but our current implementation requires Python 3.11+ to run.

## Troubleshooting

1. If Claude Desktop shows an error similar to `spawn uv ENOENT {"context":"connection","stack":"Error: spawn uv ENOENT\n    at ChildProcess._handle.onexit`, it is most likely [this](https://github.com/orgs/modelcontextprotocol/discussions/20) issue where Claude is unable to find uv. To fix it:
   - Make sure uv is installed and available in your PATH
   - Run `which uv` to verify the installation path
   - Update Claude's configuration to point to the exact uv path by running `whereis uv` and use that path

```

--------------------------------------------------------------------------------
/.github/SECURITY.md:
--------------------------------------------------------------------------------

```markdown
# Vulnerability Disclosure

If you think you have found a potential security vulnerability,
please open a [draft Security Advisory](https://github.com/atlanhq/agent-toolkit/security/advisories/new)
via GitHub. We will coordinate verification and next steps through
that secure medium.

If English is not your first language, please try to describe the
problem and its impact to the best of your ability. For greater detail,
please use your native language and we will try our best to translate it
using online services.

Please also include the code you used to find the problem and the
shortest amount of code necessary to reproduce it.

Please do not disclose this to anyone else. We will retrieve a CVE
identifier if necessary and give you full credit under whatever name or
alias you provide. We will only request an identifier when we have a fix
and can publish it in a release.

We will respect your privacy and will only publicize your involvement if
you grant us permission.

```

--------------------------------------------------------------------------------
/CONTRIBUTING.md:
--------------------------------------------------------------------------------

```markdown
# Contributing

We welcome contributions to the Atlan Agent Toolkit! Please follow these guidelines when submitting pull requests:

1. **Create a New Branch:**
   - Create a new branch for your changes.
   - Use a descriptive name for the branch (e.g., `feature/add-new-tool`).

2. **Make Your Changes:**
   - Make your changes in the new branch.
   - Ensure your tools are well-defined and follow the MCP specification.

3. **Submit a Pull Request:**
   - Push your changes to your branch.
   - Create a pull request against the `main` branch.
   - Provide a clear description of the changes and any related issues.
   - Ensure the PR passes all CI checks before requesting a review.

4. **Code Quality:**
   - We use pre-commit hooks to maintain code quality.
   - Install pre-commit in your local environment:
     ```bash
     uv pip install pre-commit
     pre-commit install
     ```
   - Pre-commit will automatically run checks before each commit, including:
     - Code formatting with Ruff
     - Trailing whitespace removal
     - End-of-file fixing
     - YAML and JSON validation
     - Other quality checks

5. **Environment Setup:**
   - This project uses [uv](https://docs.astral.sh/uv/) for dependency management.
   - Refer to the [Model Context Protocol README](modelcontextprotocol/README.md) for setup instructions.
   - Python 3.11 or higher is required.

6. **Documentation:**
   - Update documentation to reflect your changes.
   - Add comments to your code where necessary.

Please open an issue or discussion for questions or suggestions before starting significant work!

```

--------------------------------------------------------------------------------
/CODE_OF_CONDUCT.md:
--------------------------------------------------------------------------------

```markdown
# Contributor Covenant Code of Conduct

## Our Pledge

In the interest of fostering an open and welcoming environment, we as
contributors and maintainers pledge to make participation in our project and
our community a harassment-free experience for everyone, regardless of age, body
size, disability, ethnicity, sex characteristics, gender identity and expression,
level of experience, education, socio-economic status, nationality, personal
appearance, race, religion, or sexual identity and orientation.

## Our Standards

Examples of behavior that contributes to creating a positive environment
include:

- Using welcoming and inclusive language
- Being respectful of differing viewpoints and experiences
- Gracefully accepting constructive criticism
- Focusing on what is best for the community
- Showing empathy towards other community members

Examples of unacceptable behavior by participants include:

- The use of sexualized language or imagery and unwelcome sexual attention or
  advances
- Trolling, insulting/derogatory comments, and personal or political attacks
- Public or private harassment
- Publishing others' private information, such as a physical or electronic
  address, without explicit permission
- Other conduct which could reasonably be considered inappropriate in a
  professional setting

## Our Responsibilities

Project maintainers are responsible for clarifying the standards of acceptable
behavior and are expected to take appropriate and fair corrective action in
response to any instances of unacceptable behavior.

Project maintainers have the right and responsibility to remove, edit, or
reject comments, commits, code, wiki edits, issues, and other contributions
that are not aligned to this Code of Conduct, or to ban temporarily or
permanently any contributor for other behaviors that they deem inappropriate,
threatening, offensive, or harmful.

## Scope

This Code of Conduct applies within all project spaces, and it also applies when
an individual is representing the project or its community in public spaces.
Examples of representing a project or community include using an official
project e-mail address, posting via an official social media account, or acting
as an appointed representative at an online or offline event. Representation of
a project may be further defined and clarified by project maintainers.

## Enforcement

Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported by contacting the project team at [[email protected]](mailto:[email protected]). All
complaints will be reviewed and investigated and will result in a response that
is deemed necessary and appropriate to the circumstances. The project team is
obligated to maintain confidentiality with regard to the reporter of an incident.
Further details of specific enforcement policies may be posted separately.

Project maintainers who do not follow or enforce the Code of Conduct in good
faith may face temporary or permanent repercussions as determined by other
members of the project's leadership.

## Attribution

This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html

[homepage]: https://www.contributor-covenant.org

For answers to common questions about this code of conduct, see
https://www.contributor-covenant.org/faq

```

--------------------------------------------------------------------------------
/modelcontextprotocol/version.py:
--------------------------------------------------------------------------------

```python
"""Version information."""

__version__ = "0.3.0"

```

--------------------------------------------------------------------------------
/.github/dependabot.yml:
--------------------------------------------------------------------------------

```yaml
version: 2
updates:
  - package-ecosystem: pip
    directory: "/modelcontextprotocol"
    schedule:
      interval: daily
    open-pull-requests-limit: 100
    allow:
      - dependency-type: "all"

  - package-ecosystem: "github-actions"
    directory: "/modelcontextprotocol"
    schedule:
      interval: daily

```

--------------------------------------------------------------------------------
/modelcontextprotocol/utils/constants.py:
--------------------------------------------------------------------------------

```python
VALID_RELATIONSHIPS = ["anchor"]

DEFAULT_SEARCH_ATTRIBUTES = [
    "name",
    "display_name",
    "description",
    "qualified_name",
    "user_description",
    "certificate_status",
    "owner_users",
    "connector_name",
    "has_lineage",
    "source_created_at",
    "source_updated_at",
    "readme",
    "owner_groups",
    "asset_tags",
]

```

--------------------------------------------------------------------------------
/commitlint.config.js:
--------------------------------------------------------------------------------

```javascript
module.exports = {
  extends: ['@commitlint/config-conventional'],
  rules: {
    'type-enum': [
      2,
      'always',
      [
        'feat',
        'fix',
        'docs',
        'style',
        'refactor',
        'perf',
        'test',
        'build',
        'ci',
        'chore',
        'revert'
      ]
    ],
    'subject-case': [0], // Disabled to allow any case
  }
};

```

--------------------------------------------------------------------------------
/modelcontextprotocol/utils/__init__.py:
--------------------------------------------------------------------------------

```python
"""
Utilities for the Atlan MCP server.

This package provides common utilities used across the server components.
"""

from .assets import save_assets
from .constants import DEFAULT_SEARCH_ATTRIBUTES
from .search import SearchUtils
from .parameters import (
    parse_json_parameter,
    parse_list_parameter,
)

__all__ = [
    "DEFAULT_SEARCH_ATTRIBUTES",
    "SearchUtils",
    "parse_json_parameter",
    "parse_list_parameter",
    "save_assets",
]

```

--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/custom.md:
--------------------------------------------------------------------------------

```markdown
---
name: Documentation or Question
about: Ask a question or request documentation improvements
title: '[DOCS] '
labels: 'documentation'
assignees: ''

---

**What is your question or documentation request?**
A clear and concise description of what you need help with or what documentation you'd like to see improved.

**Additional context**
Add any other context about your question or documentation request here, such as:
- Related documentation you've already reviewed
- Specific sections that need clarification
- Examples or use cases you'd like to see documented

```

--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/feature_request.md:
--------------------------------------------------------------------------------

```markdown
---
name: Feature request
about: Suggest a new feature or enhancement for the agent toolkit
title: '[FEATURE] '
labels: 'enhancement'
assignees: ''

---

**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

**Describe the solution you'd like**
A clear and concise description of what you want to happen.

**Technical Details**
- Proposed API changes (if any)
- Impact on existing functionality
- Required dependencies or new packages

**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.

**Additional context**
Add any other context about the feature request here, such as:
- Use cases or scenarios
- Related issues or pull requests
- Implementation considerations

```

--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/bug_report.md:
--------------------------------------------------------------------------------

```markdown
---
name: Bug report
about: Report a bug or unexpected behavior in the agent toolkit
title: '[BUG] '
labels: 'bug'
assignees: ''

---

**Describe the bug**
A clear and concise description of what the bug is.

**To Reproduce**
Steps to reproduce the behavior:
1. Python version and environment details
2. Command or code that triggered the bug
3. Expected output vs actual output

**Environment Information**
- Python version: [e.g. 3.9.0]
- OS: [e.g. macOS, Linux, Windows]
- Package versions: [e.g. python-dotenv==1.0.0, pydantic==2.0.0]

**Error Message**
```
Paste any error messages or stack traces here
```

**Expected behavior**
A clear and concise description of what you expected to happen.

**Additional context**
Add any other context about the problem here, such as:
- Related configuration files
- Relevant environment variables
- Any workarounds you've tried

```

--------------------------------------------------------------------------------
/modelcontextprotocol/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Use a Python image with uv pre-installed
FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim AS builder

# Set environment variables for build
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PIP_NO_CACHE_DIR=1

# Install the project into `/app`
WORKDIR /app

ADD . /app

# Create a virtual environment and install dependencies
RUN python -m venv /app/.venv
ENV PATH="/app/.venv/bin:$PATH"
RUN uv sync --no-cache-dir --no-dev --python /app/.venv/bin/python

FROM python:3.12-slim-bookworm AS runtime

RUN groupadd -r appuser && useradd -r -g appuser -m -d /home/appuser appuser

WORKDIR /appuser

COPY --from=builder --chown=appuser:appuser /app /appuser

# Set the PATH to use the virtual environment
ENV PATH="/appuser/.venv/bin:$PATH"

ENV MCP_TRANSPORT="stdio"
ENV MCP_HOST="0.0.0.0"
ENV MCP_PORT="8000"
ENV MCP_PATH="/"

USER appuser

ENTRYPOINT exec python server.py --transport "$MCP_TRANSPORT" --host "$MCP_HOST" --port "$MCP_PORT" --path "$MCP_PATH"

```

--------------------------------------------------------------------------------
/modelcontextprotocol/client.py:
--------------------------------------------------------------------------------

```python
"""Client factory for Atlan."""

import logging
from typing import Optional

from pyatlan.client.atlan import AtlanClient
from settings import get_settings

logger = logging.getLogger(__name__)

_client_instance: Optional[AtlanClient] = None


def get_atlan_client() -> AtlanClient:
    """
    Get the singleton AtlanClient instance for connection reuse.

    Returns:
        AtlanClient: The singleton AtlanClient instance.

    Raises:
        Exception: If client creation fails.
    """
    global _client_instance

    if _client_instance is None:
        settings = get_settings()
        try:
            _client_instance = AtlanClient(
                base_url=settings.ATLAN_BASE_URL, api_key=settings.ATLAN_API_KEY
            )
            _client_instance.update_headers(settings.headers)
            logger.info("AtlanClient initialized successfully")
        except Exception:
            logger.error("Failed to create Atlan client", exc_info=True)
            raise

    return _client_instance

```

--------------------------------------------------------------------------------
/.github/workflows/checks.yml:
--------------------------------------------------------------------------------

```yaml
# Pre-commit-checks. This can be reused across all the applications.

name: Pre-commit Checks
on:
  workflow_call:
  pull_request:
    types: [ opened, synchronize, labeled, reopened ]
    branches: "main"

jobs:
  pre-commit:
    concurrency:
      group: ${{ github.workflow }}-${{ github.ref }}
      cancel-in-progress: ${{ startsWith(github.ref, 'refs/pull/') }}
    runs-on: ubuntu-latest
    timeout-minutes: 10
    steps:
    - uses: actions/checkout@v4
    #----------------------------------------------
    #  -----  install & configure Python + UV  -----
    #----------------------------------------------
    - uses: actions/setup-python@v5
      with:
        python-version: '3.11'
    - name: Install UV
      uses: astral-sh/setup-uv@v5
    #----------------------------------------------
    #  -----  install dependencies & run pre-commit  -----
    #----------------------------------------------
    - name: Install dependencies and run pre-commit
      run: |
        uv pip install --system pre-commit
        # Run pre-commit directly
        pre-commit run --all-files

```

--------------------------------------------------------------------------------
/modelcontextprotocol/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "atlan-mcp-server"
dynamic = ["version"]
description = "Atlan Model Context Protocol server for interacting with Atlan services"
readme = "README.md"
requires-python = ">=3.11"
license = { text = "MIT" }
authors = [
    {name = "AtlanHQ", email = "[email protected]"}
]
classifiers = [
    "Programming Language :: Python :: 3",
    "Programming Language :: Python :: 3.11",
    "License :: OSI Approved :: MIT License",
    "Operating System :: OS Independent",
]

dependencies = [
    "fastmcp==2.13.2",
    "pyatlan>=6.0.1",
    "uvicorn>=0.35.0"
]

[project.scripts]
atlan-mcp-server = "server:main"

[project.urls]
"Homepage" = "https://github.com/atlanhq/agent-toolkit"
"Documentation" = "https://ask.atlan.com/hc/en-us/articles/12525731740175-How-to-implement-the-Atlan-MCP-server"
"Bug Tracker" = "https://github.com/atlanhq/agent-toolkit/issues"
"Source" = "https://github.com/atlanhq/agent-toolkit.git"
"Changelog" = "https://github.com/atlanhq/agent-toolkit/blob/main/CHANGELOG.md"

[tool.hatch.version]
path = "version.py"

[tool.hatch.build.targets.wheel]
packages = ["."]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

```

--------------------------------------------------------------------------------
/modelcontextprotocol/settings.py:
--------------------------------------------------------------------------------

```python
"""Configuration settings for the application."""

from typing import Optional
from pydantic_settings import BaseSettings
from version import __version__ as MCP_VERSION


class Settings(BaseSettings):
    """Application settings loaded from environment variables or .env file."""

    ATLAN_BASE_URL: str
    ATLAN_API_KEY: str
    ATLAN_AGENT_ID: str = "NA"
    ATLAN_AGENT: str = "atlan-mcp"
    ATLAN_MCP_USER_AGENT: str = f"Atlan MCP Server {MCP_VERSION}"
    MCP_TRANSPORT: str = "stdio"
    MCP_HOST: str = "0.0.0.0"
    MCP_PORT: int = 8000
    MCP_PATH: str = "/"

    @property
    def headers(self) -> dict:
        """Get the headers for API requests."""
        return {
            "User-Agent": self.ATLAN_MCP_USER_AGENT,
            "X-Atlan-Agent": self.ATLAN_AGENT,
            "X-Atlan-Agent-Id": self.ATLAN_AGENT_ID,
            "X-Atlan-Client-Origin": self.ATLAN_AGENT,
        }

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"
        extra = "allow"
        # Allow case-insensitive environment variables
        case_sensitive = False


_settings: Optional[Settings] = None


def get_settings() -> Settings:
    """
    Get the singleton Settings instance.
    Loads settings once from environment/file and reuses the instance.

    Returns:
        Settings: The singleton settings instance
    """
    global _settings
    if _settings is None:
        _settings = Settings()
    return _settings

```

--------------------------------------------------------------------------------
/modelcontextprotocol/tools/__init__.py:
--------------------------------------------------------------------------------

```python
from .search import search_assets
from .dsl import get_assets_by_dsl
from .lineage import traverse_lineage
from .assets import update_assets
from .query import query_asset
from .dq_rules import (
    create_dq_rules,
    schedule_dq_rules,
    delete_dq_rules,
    update_dq_rules,
)
from .glossary import (
    create_glossary_category_assets,
    create_glossary_assets,
    create_glossary_term_assets,
)
from .domain import create_data_domain_assets, create_data_product_assets
from .models import (
    CertificateStatus,
    UpdatableAttribute,
    UpdatableAsset,
    TermOperations,
    Glossary,
    GlossaryCategory,
    GlossaryTerm,
    DQRuleType,
    DQAssetType,
    DQRuleSpecification,
    DQRuleScheduleSpecification,
    DQRuleScheduleResponse,
    ScheduledAssetInfo,
    DQRuleInfo,
    DQRuleDeleteResponse,
)

__all__ = [
    "search_assets",
    "get_assets_by_dsl",
    "traverse_lineage",
    "update_assets",
    "query_asset",
    "create_glossary_category_assets",
    "create_glossary_assets",
    "create_glossary_term_assets",
    "create_data_domain_assets",
    "create_data_product_assets",
    "CertificateStatus",
    "UpdatableAttribute",
    "UpdatableAsset",
    "TermOperations",
    "Glossary",
    "GlossaryCategory",
    "GlossaryTerm",
    "create_dq_rules",
    "schedule_dq_rules",
    "delete_dq_rules",
    "update_dq_rules",
    "DQRuleType",
    "DQAssetType",
    "DQRuleSpecification",
    "DQRuleScheduleSpecification",
    "DQRuleScheduleResponse",
    "ScheduledAssetInfo",
    "DQRuleInfo",
    "DQRuleDeleteResponse",
]

```

--------------------------------------------------------------------------------
/modelcontextprotocol/utils/parameters.py:
--------------------------------------------------------------------------------

```python
"""
Parameter parsing and validation utilities for MCP tools.

This module provides reusable functions for parsing and validating
parameters that are commonly used across different MCP tools.
"""

import json
import logging
from typing import Any, List, Optional, Union

logger = logging.getLogger(__name__)


def parse_json_parameter(param: Any) -> Union[dict, list, None]:
    """
    Parse a parameter that might be a JSON string.

    Args:
        param: The parameter value to parse (could be string, dict, list, etc.)

    Returns:
        The parsed parameter value

    Raises:
        json.JSONDecodeError: If the JSON string is invalid
    """
    if param is None:
        return None

    if isinstance(param, str):
        try:
            return json.loads(param)
        except json.JSONDecodeError as e:
            logger.error(f"Invalid JSON parameter: {param}")
            raise e

    return param


def parse_list_parameter(param: Any) -> Optional[List[Any]]:
    """
    Parse a parameter that might be a JSON string representing a list.

    Args:
        param: The parameter value to parse

    Returns:
        The parsed list, None if param is None, or original value converted to list if needed

    Raises:
        json.JSONDecodeError: If the JSON string is invalid
    """
    if param is None:
        return None

    if isinstance(param, str):
        try:
            parsed = json.loads(param)
        except json.JSONDecodeError as e:
            logger.error(f"Invalid JSON parameter: {param}")
            raise e

        if isinstance(parsed, list):
            return parsed
        return [parsed]

    if isinstance(param, list):
        return param

    return [param]

```

--------------------------------------------------------------------------------
/modelcontextprotocol/docs/LOCAL_BUILD.md:
--------------------------------------------------------------------------------

```markdown
# Local Build

1. Clone the repository:
```bash
git clone https://github.com/atlanhq/agent-toolkit.git
cd agent-toolkit
```

2. Install UV package manager:
For macOS:
```bash
# Using Homebrew
brew install uv
```

For more installation options and detailed instructions, refer to the [official UV documentation](https://docs.astral.sh/uv/getting-started/installation/).

3. Install dependencies:
> python version should be >= 3.11
```bash
cd modelcontextprotocol
uv sync
```

4. Configure Atlan credentials:

a. Using a .env file:
Create a `.env` file in the root directory (or copy the `.env.template` file and rename it to `.env`) with the following content:
```
ATLAN_BASE_URL=https://your-instance.atlan.com
ATLAN_API_KEY=your_api_key
ATLAN_AGENT_ID=your_agent_id
```

**Note: `ATLAN_AGENT_ID` is optional but recommended. It will be used to identify which Agent is making the request on Atlan UI**

To generate the API key, refer to the [Atlan documentation](https://ask.atlan.com/hc/en-us/articles/8312649180049-API-authentication).

5. Run the server:
```bash
uv run .venv/bin/atlan-mcp-server
```

6. (For debugging) Run the server with MCP inspector:
```bash
uv run mcp dev server.py
```
7. Integrate local MCP changes with Claude Desktop(For E2E testing):
When claude is integrated with Atlan MCP, it runs its own MCP server
Update config in claude desktop config as below to use your local code changes for testing end to end:
```bash
{
  "mcpServers": {
    "atlan-local": {
      "command": "uv",
      "args": [
        "run",
        "/path/to/agent-toolkit/modelcontextprotocol/.venv/bin/atlan-mcp-server"
      ],
      "cwd": "/path/to/agent-toolkit/modelcontextprotocol",
      "env": {
        "ATLAN_API_KEY": "your_api_key",
        "ATLAN_BASE_URL": "https://your-instance.atlan.com",
        "ATLAN_AGENT_ID": "your_agent_id"
      }
    }
  }
}
```

```

--------------------------------------------------------------------------------
/modelcontextprotocol/utils/assets.py:
--------------------------------------------------------------------------------

```python
"""
Asset utilities for the Atlan MCP server.

This module provides reusable functions for asset operations
that are commonly used across different MCP tools.
"""

import logging
from typing import Any, Dict, List

from pyatlan.model.assets import Asset

from client import get_atlan_client

logger = logging.getLogger(__name__)


def save_assets(assets: List[Asset]) -> List[Dict[str, Any]]:
    """
    Common bulk save and response processing for any asset type.

    Args:
        assets (List[Asset]): List of Asset objects to save.

    Returns:
        List[Dict[str, Any]]: List of dictionaries with details for each created
            or updated asset.

    Raises:
        Exception: If there's an error saving the assets.
    """
    logger.info("Starting bulk save operation")
    client = get_atlan_client()
    try:
        response = client.asset.save(assets)
    except Exception as e:
        logger.error(f"Error saving assets: {e}")
        raise

    created_assets = response.mutated_entities.CREATE or []
    updated_assets = response.mutated_entities.UPDATE or []

    logger.info(
        f"Save operation completed: {len(created_assets)} created, "
        f"{len(updated_assets)} updated"
    )

    results = []

    # Process created assets
    for asset in created_assets:
        results.append(
            {
                "guid": asset.guid,
                "name": asset.name,
                "qualified_name": asset.qualified_name,
                "operation": "CREATE",
            }
        )

    # Process updated assets
    for asset in updated_assets:
        results.append(
            {
                "guid": asset.guid,
                "name": asset.name,
                "qualified_name": asset.qualified_name,
                "operation": "UPDATE",
            }
        )

    logger.info(f"Bulk save completed successfully for {len(results)} assets")
    return results

```

--------------------------------------------------------------------------------
/modelcontextprotocol/tools/dsl.py:
--------------------------------------------------------------------------------

```python
import logging
import json
from typing import Dict, Any, Union

from client import get_atlan_client
from pyatlan.model.search import DSL, IndexSearchRequest
from utils.search import SearchUtils

# Configure logging
logger = logging.getLogger(__name__)


def get_assets_by_dsl(dsl_query: Union[str, Dict[str, Any]]) -> Dict[str, Any]:
    """
    Execute the search with the given query
    Args:
        dsl_query (Union[str, Dict[str, Any]]): The DSL query as either a string or dictionary
    Returns:
        Dict[str, Any]: A dictionary containing the results and aggregations
    """
    logger.info("Starting DSL-based asset search")
    try:
        # Parse string to dict if needed
        if isinstance(dsl_query, str):
            logger.debug("Converting DSL string to JSON")
            try:
                dsl_dict = json.loads(dsl_query)
            except json.JSONDecodeError as e:
                logger.error(f"Invalid JSON in DSL query: {e}")
                return {
                    "results": [],
                    "aggregations": {},
                    "error": "Invalid JSON in DSL query",
                }
        else:
            logger.debug("Using provided DSL dictionary")
            dsl_dict = dsl_query

        logger.debug("Creating IndexSearchRequest")
        index_request = IndexSearchRequest(
            dsl=DSL(**dsl_dict),
            suppress_logs=True,
            show_search_score=True,
            exclude_meanings=False,
            exclude_atlan_tags=False,
        )

        logger.info("Executing DSL search request")
        client = get_atlan_client()
        search_response = client.asset.search(index_request)
        processed_results = SearchUtils.process_results(search_response)
        return processed_results
    except Exception as e:
        logger.error(f"Error in DSL search: {str(e)}")
        return {"results": [], "aggregations": {}, "error": str(e)}

```

--------------------------------------------------------------------------------
/modelcontextprotocol/docs/DEPLOYMENT.md:
--------------------------------------------------------------------------------

```markdown
# Atlan MCP Server Deployment Guide

This guide covers transport modes and basic deployment options for the Atlan MCP Server.

## Transport Modes

The Atlan MCP Server supports three transport modes. For more details about MCP transport modes, see the [official MCP documentation](https://modelcontextprotocol.io/specification/2025-06-18/basic/transports).

| Transport Mode | Use Case | Benefits | When to Use |
|---|---|---|---|
| **stdio** (Default) | Local development, IDE integrations | Simple, direct communication | Claude Desktop, Cursor IDE |
| **SSE** (Server-Sent Events) | Remote deployments, web browsers | Real-time streaming, web-compatible | Cloud deployments, web clients |
| **streamable-http** | HTTP-based remote connections | Standard HTTP, load balancer friendly | Kubernetes, containerized deployments |

## Basic Deployment Examples

### Local Development (stdio)
```bash
# Default stdio mode
python server.py

# Or explicitly specify stdio
python server.py --transport stdio
```

### Cloud Deployment (SSE)
```bash
# Docker with SSE
docker run -d \
  -p 8000:8000 \
  -e ATLAN_API_KEY="<YOUR_API_KEY>" \
  -e ATLAN_BASE_URL="https://<YOUR_INSTANCE>.atlan.com" \
  -e MCP_TRANSPORT="sse" \
  ghcr.io/atlanhq/atlan-mcp-server:latest

# Python with SSE
python server.py --transport sse --host 0.0.0.0 --port 8000
```

### HTTP Deployment (streamable-http)
```bash
# Docker with HTTP
docker run -d \
  -p 8000:8000 \
  -e ATLAN_API_KEY="<YOUR_API_KEY>" \
  -e ATLAN_BASE_URL="https://<YOUR_INSTANCE>.atlan.com" \
  -e MCP_TRANSPORT="streamable-http" \
  ghcr.io/atlanhq/atlan-mcp-server:latest

# Python with HTTP
python server.py --transport streamable-http --host 0.0.0.0 --port 8000
```

## Environment Variables

### Required
- `ATLAN_API_KEY`: Your Atlan API key
- `ATLAN_BASE_URL`: Your Atlan instance URL

### Transport Configuration
- `MCP_TRANSPORT`: Transport mode (stdio/sse/streamable-http)
- `MCP_HOST`: Host address for network transports (default: 0.0.0.0)
- `MCP_PORT`: Port number for network transports (default: 8000)
- `MCP_PATH`: Path for streamable-http transport (default: /)

### Optional
- `ATLAN_AGENT_ID`: Agent identifier
- `RESTRICTED_TOOLS`: Comma-separated list of tools to restrict

For additional support, refer to the main [README](../README.md) or contact [email protected].

```

--------------------------------------------------------------------------------
/modelcontextprotocol/tools/query.py:
--------------------------------------------------------------------------------

```python
"""
Query tool for executing SQL queries on table/view assets.

This module provides functionality to execute SQL queries on data sources
using the Atlan client.
"""

import logging
from typing import Dict, Any, Optional

from client import get_atlan_client
from pyatlan.model.query import QueryRequest

# Configure logging
logger = logging.getLogger(__name__)


def query_asset(
    sql: str,
    connection_qualified_name: str,
    default_schema: Optional[str] = None,
) -> Dict[str, Any]:
    """
    Execute a SQL query on a table/view asset.

    Note:
        Use read-only queries to retrieve data.
        Please add reasonable LIMIT clauses to your SQL queries to avoid
        overwhelming the client or causing timeouts. Large result sets can
        cause performance issues or crash the client application.

    Args:
        sql (str): The SQL query to execute (read-only queries)
        connection_qualified_name (str): Connection qualified name to use for the query
            (e.g., "default/snowflake/1705755637")
        default_schema (str, optional): Default schema name to use for unqualified
            objects in the SQL, in the form "DB.SCHEMA"
            (e.g., "RAW.WIDEWORLDIMPORTERS_WAREHOUSE")

    Returns:
        Dict[str, Any]: Dictionary containing:
            - success: Boolean indicating if the query was successful
            - data: Query result data (rows, columns) if successful
            - error: Error message if query failed
            - query_info: Additional query execution information

    Raises:
        Exception: If there's an error executing the query
    """
    logger.info(
        f"Starting SQL query execution on connection: {connection_qualified_name}"
    )
    logger.debug(f"SQL query: {sql}")
    logger.debug(f"Parameters - default_schema: {default_schema}")

    try:
        # Validate required parameters
        if not sql or not sql.strip():
            error_msg = "SQL query cannot be empty"
            logger.error(error_msg)
            return {
                "success": False,
                "data": None,
                "error": error_msg,
                "query_info": {},
            }

        if not connection_qualified_name or not connection_qualified_name.strip():
            error_msg = "Connection qualified name cannot be empty"
            logger.error(error_msg)
            return {
                "success": False,
                "data": None,
                "error": error_msg,
                "query_info": {},
            }

        # Get Atlan client
        logger.debug("Getting Atlan client")
        client = get_atlan_client()

        # Build query request
        logger.debug("Building QueryRequest object")
        query_request = QueryRequest(
            sql=sql,
            data_source_name=connection_qualified_name,
            default_schema=default_schema,
        )

        # Execute query
        logger.info("Executing SQL query")
        query_response = client.queries.stream(request=query_request)

        logger.info("Query executed successfully, returning response")

        return {
            "success": True,
            "data": query_response,
            "error": None,
            "query_info": {
                "data_source": connection_qualified_name,
                "default_schema": default_schema,
                "sql": sql,
            },
        }

    except Exception as e:
        error_msg = f"Error executing SQL query: {str(e)}"
        logger.error(error_msg)
        logger.exception("Exception details:")

        return {
            "success": False,
            "data": None,
            "error": error_msg,
            "query_info": {
                "data_source": connection_qualified_name,
                "default_schema": default_schema,
                "sql": sql,
            },
        }

```

--------------------------------------------------------------------------------
/modelcontextprotocol/tools/lineage.py:
--------------------------------------------------------------------------------

```python
import logging
from typing import Dict, Any, List, Optional, Union

from client import get_atlan_client
from pyatlan.model.enums import LineageDirection
from pyatlan.model.lineage import FluentLineage
from pyatlan.model.fields.atlan_fields import AtlanField
from utils.search import SearchUtils
from utils.constants import DEFAULT_SEARCH_ATTRIBUTES

# Configure logging
logger = logging.getLogger(__name__)


def traverse_lineage(
    guid: str,
    direction: LineageDirection,
    depth: int = 1000000,
    size: int = 10,
    immediate_neighbors: bool = False,
    include_attributes: Optional[List[Union[str, AtlanField]]] = None,
) -> Dict[str, Any]:
    """
    Traverse asset lineage in specified direction.

    By default, essential attributes used in search operations are included.
    Additional attributes can be specified via include_attributes parameter.

    Args:
        guid (str): GUID of the starting asset
        direction (LineageDirection): Direction to traverse (UPSTREAM or DOWNSTREAM)
        depth (int, optional): Maximum depth to traverse. Defaults to 1000000.
        size (int, optional): Maximum number of results to return. Defaults to 10.
        immediate_neighbors (bool, optional): Only return immediate neighbors. Defaults to False.
        include_attributes (Optional[List[Union[str, AtlanField]]], optional): List of additional
            attributes to include in results. Can be string attribute names or AtlanField objects.
            These will be added to the default set. Defaults to None.

    Returns:
        Dict[str, Any]: Dictionary containing:
            - assets: List of assets in the lineage with processed attributes
            - error: None if no error occurred, otherwise the error message

    Raises:
        Exception: If there's an error executing the lineage request
    """
    logger.info(
        f"Starting lineage traversal from {guid} in direction {direction}, "
        f"depth={depth}, size={size}, immediate_neighbors={immediate_neighbors}"
    )
    logger.debug(f"Include attributes parameter: {include_attributes}")

    try:
        # Initialize base request
        logger.debug("Initializing FluentLineage object")
        lineage_builder = (
            FluentLineage(starting_guid=guid)
            .direction(direction)
            .depth(depth)
            .size(size)
            .immediate_neighbors(immediate_neighbors)
        )

        # Prepare attributes to include: default attributes + additional user-specified attributes
        all_attributes = DEFAULT_SEARCH_ATTRIBUTES.copy()

        if include_attributes:
            logger.debug(f"Adding user-specified attributes: {include_attributes}")
            for attr in include_attributes:
                if isinstance(attr, str) and attr not in all_attributes:
                    all_attributes.append(attr)

        logger.debug(f"Total attributes to include: {all_attributes}")

        # Include all string attributes in results
        for attr_name in all_attributes:
            attr_obj = SearchUtils._get_asset_attribute(attr_name)
            if attr_obj is None:
                logger.warning(
                    f"Unknown attribute for inclusion: {attr_name}, skipping"
                )
                continue
            logger.debug(f"Including attribute: {attr_name}")
            lineage_builder = lineage_builder.include_on_results(attr_obj)

        # Execute request
        logger.debug("Converting FluentLineage to request object")
        request = lineage_builder.request

        logger.info("Executing lineage request")
        client = get_atlan_client()
        response = client.asset.get_lineage_list(request)

        # Process results using same pattern as search
        logger.info("Processing lineage results")
        if response is None:
            logger.info("No lineage results found")
            return {"assets": [], "error": None}

        # Convert results to list and process using Pydantic serialization
        results_list = [
            result.dict(by_alias=True, exclude_unset=True)
            for result in response
            if result is not None
        ]

        logger.info(
            f"Lineage traversal completed, returned {len(results_list)} results"
        )
        return {"assets": results_list, "error": None}

    except Exception as e:
        logger.error(f"Error traversing lineage: {str(e)}")
        return {"assets": [], "error": str(e)}

```

--------------------------------------------------------------------------------
/modelcontextprotocol/middleware.py:
--------------------------------------------------------------------------------

```python
"""
Tool restriction middleware for FastMCP to control tool access.

This middleware restricts access to specified tools based on configuration.
Tools can be restricted globally by providing a list during initialization.
"""

from typing import List, Set, Optional
from fastmcp.server.middleware import Middleware, MiddlewareContext
from fastmcp.exceptions import ToolError
import logging

logger = logging.getLogger(__name__)


class ToolRestrictionMiddleware(Middleware):
    """
    Middleware to restrict tool access based on configuration.

    Allows specifying which tools should be restricted during initialization.
    Restricted tools will be hidden from the tools list and blocked from execution.
    """

    def __init__(self, restricted_tools: Optional[List[str]] = None):
        """
        Initialize the Tool Restriction Middleware.

        Args:
            restricted_tools: List of tool names to restrict. If None, no tools are restricted.
        """
        self.restricted_tools: Set[str] = set(restricted_tools or [])
        self._log_initialization()

    def _log_initialization(self) -> None:
        """Log middleware initialization details."""
        logger.info(
            f"Tool Restriction Middleware initialized with {len(self.restricted_tools)} restricted tools",
            restricted_tools=list(self.restricted_tools),
        )

    def _is_tool_restricted(self, tool_name: str) -> bool:
        """
        Check if a tool is restricted.

        Args:
            tool_name: Name of the tool being called.

        Returns:
            True if the tool is restricted, False otherwise.
        """
        is_restricted = tool_name in self.restricted_tools

        if is_restricted:
            logger.info(f"Tool {tool_name} is restricted", tool=tool_name)

        return is_restricted

    def _get_error_message(self, tool_name: str) -> str:
        """
        Get appropriate error message for a restricted tool.

        Args:
            tool_name: Name of the restricted tool.

        Returns:
            Error message string.
        """
        return f"Tool '{tool_name}' is not available due to access restrictions"

    async def on_call_tool(self, context: MiddlewareContext, call_next):
        """
        Hook called when a tool is being executed.

        Checks if the tool is restricted and either allows execution or raises an error.

        Args:
            context: The middleware context containing request information.
            call_next: Function to call the next middleware/handler in the chain.

        Returns:
            The result from the next handler if allowed.

        Raises:
            ToolError: If the tool is restricted.
        """
        tool_name = context.message.name

        try:
            # Check if tool is restricted
            if self._is_tool_restricted(tool_name):
                error_message = self._get_error_message(tool_name)

                logger.warning(
                    f"Tool access denied: {tool_name}",
                    tool=tool_name,
                    reason=error_message,
                )

                raise ToolError(error_message)

            # Tool is allowed, proceed with execution
            logger.debug(f"Tool access granted: {tool_name}", tool=tool_name)

            return await call_next(context)

        except ToolError:
            # Re-raise ToolError as-is
            raise
        except Exception as e:
            # Handle unexpected errors
            logger.error(
                f"Error in tool restriction middleware: {str(e)}",
                tool=tool_name,
                exc_info=True,
            )
            # Re-raise the original exception
            raise

    async def on_list_tools(self, context: MiddlewareContext, call_next):
        """
        Hook called when listing available tools.

        Filters the tool list to hide restricted tools.

        Args:
            context: The middleware context.
            call_next: Function to call the next handler.

        Returns:
            Filtered list of tools.
        """
        # Get the full list of tools
        all_tools = await call_next(context)

        try:
            # If no tools are restricted, return all tools
            if not self.restricted_tools:
                return all_tools

            # Filter out restricted tools
            filtered_tools = [
                tool for tool in all_tools if tool.name not in self.restricted_tools
            ]

            logger.debug(
                "Filtered tool list",
                total_tools=len(all_tools),
                filtered_tools=len(filtered_tools),
                restricted_tools=list(self.restricted_tools),
            )

            return filtered_tools

        except Exception as e:
            logger.error(
                f"Error filtering tool list: {str(e)}",
                exc_info=True,
            )
            # On error, return the original list to avoid breaking functionality
            return all_tools

```

--------------------------------------------------------------------------------
/modelcontextprotocol/tools/domain.py:
--------------------------------------------------------------------------------

```python
from __future__ import annotations

import logging
from typing import Any, Dict, List, Union

from pyatlan.model.assets import Asset, DataDomain, DataProduct
from pyatlan.model.fluent_search import CompoundQuery, FluentSearch

from utils import save_assets
from .models import DataDomainSpec, DataProductSpec

logger = logging.getLogger(__name__)


def create_data_domain_assets(
    domains: Union[Dict[str, Any], List[Dict[str, Any]]],
) -> List[Dict[str, Any]]:
    """
    Create one or multiple Data Domain or Sub Domain assets in Atlan.

    Args:
        domains (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single domain
            specification (dict) or a list of domain specifications. Each specification
            can be a dictionary containing:
            - name (str): Name of the domain (required)
            - parent_domain_qualified_name (str, optional): Qualified name of the parent
              domain. If provided, creates a Sub Domain under that parent.
            - user_description (str, optional): Detailed description of the domain
            - certificate_status (str, optional): Certification status
              ("VERIFIED", "DRAFT", or "DEPRECATED")

    Returns:
        List[Dict[str, Any]]: List of dictionaries, each with details for a created domain:
            - guid: The GUID of the created domain
            - name: The name of the domain
            - qualified_name: The qualified name of the created domain

    Raises:
        Exception: If there's an error creating the domain assets.
    """
    data = domains if isinstance(domains, list) else [domains]
    logger.info(f"Creating {len(data)} data domain asset(s)")
    logger.debug(f"Domain specifications: {data}")

    specs = [DataDomainSpec(**item) for item in data]

    assets: List[DataDomain] = []
    for spec in specs:
        logger.debug(
            f"Creating DataDomain: {spec.name}"
            + (
                f" under {spec.parent_domain_qualified_name}"
                if spec.parent_domain_qualified_name
                else ""
            )
        )
        domain = DataDomain.creator(
            name=spec.name,
            parent_domain_qualified_name=spec.parent_domain_qualified_name,
        )
        domain.user_description = spec.user_description
        domain.certificate_status = (
            spec.certificate_status.value if spec.certificate_status else None
        )

        if spec.certificate_status:
            logger.debug(
                f"Set certificate status for {spec.name}: {spec.certificate_status.value}"
            )

        assets.append(domain)

    return save_assets(assets)


def create_data_product_assets(
    products: Union[Dict[str, Any], List[Dict[str, Any]]],
) -> List[Dict[str, Any]]:
    """
    Create one or multiple Data Product assets in Atlan.

    Args:
        products (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single product
            specification (dict) or a list of product specifications. Each specification
            can be a dictionary containing:
            - name (str): Name of the product (required)
            - domain_qualified_name (str): Qualified name of the domain this product
              belongs to (required)
            - asset_guids (List[str]): List of asset GUIDs to link to this product
              (required, at least one)
            - user_description (str, optional): Detailed description of the product
            - certificate_status (str, optional): Certification status
              ("VERIFIED", "DRAFT", or "DEPRECATED")

    Returns:
        List[Dict[str, Any]]: List of dictionaries, each with details for a created product:
            - guid: The GUID of the created product
            - name: The name of the product
            - qualified_name: The qualified name of the created product

    Raises:
        Exception: If there's an error creating the product assets.
        ValueError: If no asset_guids are provided (validated in DataProductSpec model).
    """
    data = products if isinstance(products, list) else [products]
    logger.info(f"Creating {len(data)} data product asset(s)")
    logger.debug(f"Product specifications: {data}")

    # Validation for asset_guids is now handled by DataProductSpec model
    specs = [DataProductSpec(**item) for item in data]

    assets: List[DataProduct] = []
    for spec in specs:
        logger.debug(
            f"Creating DataProduct: {spec.name} under {spec.domain_qualified_name}"
        )
        logger.debug(f"Linking {len(spec.asset_guids)} asset(s) to product")

        # Build FluentSearch to select assets by their GUIDs
        asset_selection = (
            FluentSearch()
            .where(CompoundQuery.active_assets())
            .where(Asset.GUID.within(spec.asset_guids))
        ).to_request()

        product = DataProduct.creator(
            name=spec.name,
            domain_qualified_name=spec.domain_qualified_name,
            asset_selection=asset_selection,
        )
        product.user_description = spec.user_description
        product.certificate_status = (
            spec.certificate_status.value if spec.certificate_status else None
        )

        if spec.certificate_status:
            logger.debug(
                f"Set certificate status for {spec.name}: {spec.certificate_status.value}"
            )

        assets.append(product)

    return save_assets(assets)

```

--------------------------------------------------------------------------------
/CHANGELOG.md:
--------------------------------------------------------------------------------

```markdown
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.3.0] - 2025-12-03
- Added new tools:
  - `create_data_domain_assets`
  - `create_data_product_assets`
  - `create_dq_rules`

## [0.2.12] - 2025-11-14

### Changed
- Added explicit uvicorn>=0.35.0 dependency to ensure compatibility with FastMCP 2.13.0.2

## [0.2.11] - 2025-11-07

### Changed
- Upgraded FastMCP dependency from 2.11.0 to 2.13.0.2

## [0.2.10] - 2025-10-13

### Fixed
- Fixed a bug in `update_assets_tool` where glossary terms and categories could not be updated because the glossary GUID parameter was not being sent - now properly includes the glossary GUID as a required parameter for these asset types

## [0.2.9] - 2025-09-22

### Fixed
- Transport configuration not working when installed via PyPI and executed using `uvx atlan-mcp-server` - server would ignore environment variables and command-line arguments, always defaulting to stdio mode

## [0.2.8] - 2025-09-15

### Added
- Term linking functionality for improved glossary term relationships (#138)
- Enhanced search tool with popularity attributes context and examples (#132)
- Comprehensive MCP transport mode documentation (#124)

### Changed
- Implemented client singleton pattern for improved connection pool reuse and performance (#131)
- Enhanced Docker configuration with improved .dockerignore settings (#127)

## [0.2.7] - 2025-09-02

### Added
- Configurable tool access control via `RESTRICTED_TOOLS` environment variable
- Organizations can now restrict any combination of tools for MCP clients


## [0.2.6] - 2025-08-19

### Added
- Glossary management tools to streamline glossary creation and management:
  - `create_glossaries`: Create top-level glossaries with metadata (name, `user_description`, optional `certificate_status`)
  - `create_glossary_terms`: Add individual terms to an existing glossary; supports `user_description`, optional `certificate_status`, and `category_guids`
  - `create_glossary_categories`: Add categories (and nested subcategories) anchored to a glossary or parent category; supports `user_description` and optional `certificate_status`
- Bulk creation support across glossaries, terms, and categories to enable scalable glossary builds
- Foundation for automated, structured glossary generation from unstructured content


## [0.2.5] - 2025-08-05

### Changed
- Enhanced `search_assets_tool` documentation and usage examples for `connection_qualified_name` parameter
- Added `connection_qualified_name` parameter to example function calls for missing descriptions and multiple asset types searches

## [0.2.4] - 2025-07-24

### Added
- Enhanced lineage traversal tool with configurable attribute inclusion support
- `include_attributes` parameter in `traverse_lineage_tool` allowing users to specify additional attributes beyond defaults
- Default attributes are now always included: name, display_name, description, qualified_name, user_description, certificate_status, owner_users, owner_groups, connector_name, has_lineage, source_created_at, source_updated_at, readme, asset_tags

### Changed
- Improved lineage tool return format to standardized dictionary structure with `assets` and `error` keys
- Enhanced lineage processing using Pydantic serialization with `dict(by_alias=True, exclude_unset=True)` for consistent API responses
- Updated `immediate_neighbors` default value from `True` to `False` to align with underlying FluentLineage behavior
- Better error handling and logging throughout lineage traversal operations

### Fixed
- Lineage tool now returns richer attribute information instead of just default minimal attributes
- Resolved issue where lineage results only contained basic metadata without requested additional attributes

## [0.2.3] - 2025-07-16

### Added
- Expanded docstring attributes for LLM context in `server.py` for improved clarity and developer experience

### Changed
- Major documentation and README refactoring for easier setup and integration with Claude Desktop and Cursor, including clearer configuration examples and troubleshooting guidance

### Fixed
- Made `ATLAN_MCP_USER_AGENT` dynamic in `settings.py` to always reflect the current MCP server version in API requests

## [0.2.2] - 2025-06-23

### Added
- Multi-architecture build support for Docker images (ARM64 and AMD64)
- README support for asset updates - allows updating asset documentation/readme using markdown content
- Enhanced parameter parsing utilities for better Claude Desktop integration

### Fixed
- Search and Update Assets Tool compatibility issues with Claude Desktop
- String input parsing from Claude Desktop for better tool interaction
- Parameter validation and error handling improvements

### Changed
- Upgraded FastMCP dependency version for improved performance and stability
- Enhanced tool parameter processing with better error handling
- Improved asset update functionality with support for README content management

## [0.2.1] - 2025-05-24

### Added
- Advanced search operators support in `search_assets` including `contains`, `between`, and case-insensitive comparisons
- Default attributes for search results via `DEFAULT_SEARCH_ATTRIBUTES` constant with dynamic user-specified attribute support
- Enhanced "some conditions" handling with support for advanced operators and case-insensitive logic
- New search examples demonstrating OR logic for multiple type names and glossary term searches by specific attributes

### Changed
- Integrated `SearchUtils` for centralized and consistent search result processing
- Improved search API flexibility and precision with advanced query capabilities

### Fixed
- Release workflow changelog generation issues that previously caused empty release notes
- Improved commit range calculation and error handling in GitHub Actions workflow

## [0.2.0] - 2025-05-17

### Added
- Support for new transport modes: streamable HTTP and SSE
- MCP server executable script (`atlan-mcp-server`)
- Improved Docker image with non-root user and security enhancements

### Changed
- Made MCP server an installable package
- Updated dependencies and bumped versions
- Improved build process for faster Docker builds
- Restructured release workflow for better isolation and PR-based releases

### Fixed
- Various minor bugs and stability issues

### Documentation
- Updated setup and usage instructions
- Added more comprehensive examples


## [0.1.0] - 2024-05-05

### Added
- Initial release of Atlan MCP Server
- Basic functionality for integrating with Atlan

```

--------------------------------------------------------------------------------
/modelcontextprotocol/utils/search.py:
--------------------------------------------------------------------------------

```python
from typing import Dict, Any
import logging
from pyatlan.model.assets import Asset

logger = logging.getLogger(__name__)


class SearchUtils:
    @staticmethod
    def process_results(results: Any) -> Dict[str, Any]:
        """
        Process the results from the search index using Pydantic serialization.

        This method uses Pydantic's .dict(by_alias=True, exclude_unset=True) to:
        - Convert field names to their API-friendly camelCase format (by_alias=True)
        - Exclude any fields that weren't explicitly set (exclude_unset=True)

        Args:
            results: The search results from Atlan

        Returns:
            Dict[str, Any]: Dictionary containing:
                - results: List of processed results
                - aggregations: Search aggregations if available
                - error: None if no error occurred, otherwise the error message
        """
        current_page_results = (
            results.current_page()
            if hasattr(results, "current_page") and callable(results.current_page)
            else []
        )
        aggregations = results.aggregations

        logger.info(f"Processing {len(current_page_results)} search results")
        results_list = [
            result.dict(by_alias=True, exclude_unset=True)
            for result in current_page_results
            if result is not None
        ]

        return {"results": results_list, "aggregations": aggregations, "error": None}

    @staticmethod
    def _get_asset_attribute(attr_name: str):
        """
        Get Asset attribute by name.
        """
        return getattr(Asset, attr_name.upper(), None)

    @staticmethod
    def _apply_operator_condition(
        attr, operator: str, value: Any, case_insensitive: bool = False
    ):
        """
        Apply an operator condition to an attribute.

        Args:
            attr: The Asset attribute object
            operator (str): The operator to apply
            value: The value for the condition
            case_insensitive (bool): Whether to apply case insensitive matching

        Returns:
            The condition object to be used with where/where_not/where_some

        Raises:
            ValueError: If the operator is unknown or value format is invalid
        """
        logger.debug(
            f"Applying operator '{operator}' with value '{value}' (case_insensitive={case_insensitive})"
        )

        if operator == "startswith":
            return attr.startswith(value, case_insensitive=case_insensitive)
        elif operator == "match":
            return attr.match(value)
        elif operator == "eq":
            return attr.eq(value, case_insensitive=case_insensitive)
        elif operator == "neq":
            return attr.neq(value, case_insensitive=case_insensitive)
        elif operator == "gte":
            return attr.gte(value)
        elif operator == "lte":
            return attr.lte(value)
        elif operator == "gt":
            return attr.gt(value)
        elif operator == "lt":
            return attr.lt(value)
        elif operator == "has_any_value":
            return attr.has_any_value()
        elif operator == "contains":
            return attr.contains(value, case_insensitive=case_insensitive)
        elif operator == "between":
            # Expecting value to be a list/tuple with [start, end]
            if isinstance(value, (list, tuple)) and len(value) == 2:
                return attr.between(value[0], value[1])
            else:
                raise ValueError(
                    f"Invalid value format for 'between' operator: {value}, expected [start, end]"
                )
        else:
            # Try to get the operator method from the attribute
            op_method = getattr(attr, operator, None)
            if op_method is None:
                raise ValueError(f"Unknown operator: {operator}")

            # Try to pass case_insensitive if the method supports it
            try:
                return op_method(value, case_insensitive=case_insensitive)
            except TypeError:
                # Fallback if case_insensitive is not supported
                return op_method(value)

    @staticmethod
    def _process_condition(
        search, attr, condition, attr_name: str, search_method_name: str
    ):
        """
        Process a single condition and apply it to the search using the specified method.

        Args:
            search: The FluentSearch object
            attr: The Asset attribute object
            condition: The condition value (dict, list, or simple value)
            attr_name (str): The attribute name for logging
            search_method_name (str): The search method to use ('where', 'where_not', 'where_some')

        Returns:
            FluentSearch: The updated search object
        """
        search_method = getattr(search, search_method_name)

        if isinstance(condition, dict):
            operator = condition.get("operator", "eq")
            value = condition.get("value")
            case_insensitive = condition.get("case_insensitive", False)

            try:
                condition_obj = SearchUtils._apply_operator_condition(
                    attr, operator, value, case_insensitive
                )
                search = search_method(condition_obj)
                return search
            except ValueError as e:
                logger.warning(f"Skipping condition for {attr_name}: {e}")
                return search
        elif isinstance(condition, list):
            if search_method_name == "where_some":
                # Handle multiple values for where_some
                logger.debug(
                    f"Adding multiple '{search_method_name}' values for {attr_name}: {condition}"
                )
                for value in condition:
                    search = search_method(attr.eq(value))
                return search
            else:
                # Handle list of values with OR logic using .within()
                logger.debug(f"Applying multiple values for {attr_name}: {condition}")
                search = search_method(attr.within(condition))
                return search
        elif condition == "has_any_value" and search_method_name == "where_not":
            # Special case for has_any_value in negative conditions
            logger.debug(f"Excluding assets where {attr_name} has any value")
            search = search_method(attr.has_any_value())
            return search
        else:
            # Default to equality operator
            logger.debug(
                f"Applying {search_method_name} equality condition {attr_name}={condition}"
            )
            search = search_method(attr.eq(condition))
            return search

```

--------------------------------------------------------------------------------
/modelcontextprotocol/tools/glossary.py:
--------------------------------------------------------------------------------

```python
from __future__ import annotations
import logging
from typing import Dict, Any, List, Union

from pyatlan.model.assets import (
    AtlasGlossary,
    AtlasGlossaryCategory,
    AtlasGlossaryTerm,
)

from utils import parse_list_parameter, save_assets
from .models import (
    Glossary,
    GlossaryCategory,
    GlossaryTerm,
)

logger = logging.getLogger(__name__)


def create_glossary_assets(
    glossaries: Union[Dict[str, Any], List[Dict[str, Any]]],
) -> List[Dict[str, Any]]:
    """
    Create one or multiple AtlasGlossary assets in Atlan.

    Args:
        glossaries (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single glossary
            specification (dict) or a list of glossary specifications. Each specification
            can be a dictionary containing:
            - name (str): Name of the glossary (required)
            - user_description (str, optional): Detailed description of the glossary
              proposed by the user
            - certificate_status (str, optional): Certification status
              ("VERIFIED", "DRAFT", or "DEPRECATED")

    Returns:
        List[Dict[str, Any]]: List of dictionaries, each with details for a created glossary:
            - guid: The GUID of the created glossary
            - name: The name of the glossary
            - qualified_name: The qualified name of the created glossary

    Raises:
        Exception: If there's an error creating the glossary assets.
    """
    data = glossaries if isinstance(glossaries, list) else [glossaries]
    logger.info(f"Creating {len(data)} glossary asset(s)")
    logger.debug(f"Glossary specifications: {data}")

    specs = [Glossary(**item) for item in data]

    assets: List[AtlasGlossary] = []
    for spec in specs:
        logger.debug(f"Creating AtlasGlossary for: {spec.name}")
        glossary = AtlasGlossary.creator(name=spec.name)
        glossary.user_description = spec.user_description
        if spec.certificate_status:
            glossary.certificate_status = spec.certificate_status.value
            logger.debug(
                f"Set certificate status for {spec.name}: {spec.certificate_status.value}"
            )
        assets.append(glossary)

    return save_assets(assets)


def create_glossary_category_assets(
    categories: Union[Dict[str, Any], List[Dict[str, Any]]],
) -> List[Dict[str, Any]]:
    """
    Create one or multiple AtlasGlossaryCategory assets in Atlan.

    Args:
        categories (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single category
            specification (dict) or a list of category specifications. Each specification
            can be a dictionary containing:
            - name (str): Name of the category (required)
            - glossary_guid (str): GUID of the glossary this category belongs to (required)
            - user_description (str, optional): Detailed description of the category
              proposed by the user
            - certificate_status (str, optional): Certification status
              ("VERIFIED", "DRAFT", or "DEPRECATED")
            - parent_category_guid (str, optional): GUID of the parent category if this
              is a subcategory

    Returns:
        List[Dict[str, Any]]: List of dictionaries, each with details for a created category:
            - guid: The GUID of the created category
            - name: The name of the category
            - qualified_name: The qualified name of the created category

    Raises:
        Exception: If there's an error creating the glossary category assets.
    """
    data = categories if isinstance(categories, list) else [categories]
    logger.info(f"Creating {len(data)} glossary category asset(s)")
    logger.debug(f"Category specifications: {data}")

    specs = [GlossaryCategory(**item) for item in data]

    assets: List[AtlasGlossaryCategory] = []
    for spec in specs:
        logger.debug(f"Creating AtlasGlossaryCategory for: {spec.name}")
        anchor = AtlasGlossary.ref_by_guid(spec.glossary_guid)
        category = AtlasGlossaryCategory.creator(
            name=spec.name,
            anchor=anchor,
            parent_category=(
                AtlasGlossaryCategory.ref_by_guid(spec.parent_category_guid)
                if spec.parent_category_guid
                else None
            ),
        )
        category.user_description = spec.user_description
        if spec.certificate_status:
            category.certificate_status = spec.certificate_status.value
            logger.debug(
                f"Set certificate status for {spec.name}: {spec.certificate_status.value}"
            )

        assets.append(category)

    return save_assets(assets)


def create_glossary_term_assets(
    terms: Union[Dict[str, Any], List[Dict[str, Any]]],
) -> List[Dict[str, Any]]:
    """
    Create one or multiple AtlasGlossaryTerm assets in Atlan.

    Args:
        terms (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single term
            specification (dict) or a list of term specifications. Each specification
            can be a dictionary containing:
            - name (str): Name of the term (required)
            - glossary_guid (str): GUID of the glossary this term belongs to (required)
            - user_description (str, optional): Detailed description of the term
              proposed by the user
            - certificate_status (str, optional): Certification status
              ("VERIFIED", "DRAFT", or "DEPRECATED")
            - category_guids (List[str], optional): List of category GUIDs this term
              belongs to

    Returns:
        List[Dict[str, Any]]: List of dictionaries, each with details for a created term:
            - guid: The GUID of the created term
            - name: The name of the term
            - qualified_name: The qualified name of the created term

    Raises:
        ValueError: If any provided category_guids are not found.
        Exception: If there's an error creating the glossary term assets.
    """
    data = terms if isinstance(terms, list) else [terms]
    logger.info(f"Creating {len(data)} glossary term asset(s)")
    logger.debug(f"Term specifications: {data}")

    specs = [GlossaryTerm(**item) for item in data]
    per_term_guids = [set(parse_list_parameter(s.category_guids) or []) for s in specs]

    assets: List[AtlasGlossaryTerm] = []
    for spec, guids in zip(specs, per_term_guids):
        term = AtlasGlossaryTerm.creator(
            name=spec.name,
            anchor=AtlasGlossary.ref_by_guid(spec.glossary_guid),
            categories=[AtlasGlossaryCategory.ref_by_guid(g) for g in guids] or None,
        )
        term.user_description = spec.user_description
        if spec.certificate_status:
            term.certificate_status = spec.certificate_status.value
        assets.append(term)

    return save_assets(assets)

```

--------------------------------------------------------------------------------
/.github/workflows/mcp-server-release.yml:
--------------------------------------------------------------------------------

```yaml
name: MCP-Release

on:
  pull_request:
    types: [closed]
    branches:
      - main

jobs:
  prepare-release:
    # Only run when a PR with the "release" label is merged
    if: github.event.pull_request.merged == true && contains(github.event.pull_request.labels.*.name, 'release')
    runs-on: ubuntu-latest
    permissions:
      contents: write
    outputs:
      version: ${{ steps.get_version.outputs.version }}
      should_release: ${{ steps.check_tag.outputs.exists == 'false' }}
    steps:
      - name: Checkout
        uses: actions/checkout@v4
        with:
          fetch-depth: 0

      - name: Get version
        id: get_version
        run: |
          VERSION=$(grep -m 1 "__version__" modelcontextprotocol/version.py | cut -d'"' -f2)
          echo "version=$VERSION" >> $GITHUB_OUTPUT
          echo "Found version: $VERSION"

      - name: Check if tag exists
        id: check_tag
        run: |
          TAG_NAME="v${{ steps.get_version.outputs.version }}"
          if git rev-parse "$TAG_NAME" >/dev/null 2>&1; then
            echo "Tag $TAG_NAME already exists, stopping workflow"
            echo "exists=true" >> $GITHUB_OUTPUT
          else
            echo "Tag $TAG_NAME does not exist, continuing workflow"
            echo "exists=false" >> $GITHUB_OUTPUT
          fi

      - name: Generate changelog entry
        id: changelog
        if: steps.check_tag.outputs.exists == 'false'
        run: |
          set +e

          VERSION="${{ steps.get_version.outputs.version }}"
          RELEASE_DATE=$(date +"%Y-%m-%d")

          echo "Generating changelog for version $VERSION ($RELEASE_DATE)"

          # Get the previous version tag
          PREV_TAG=$(git describe --tags --abbrev=0 HEAD~1 2>/dev/null || echo "")

          if [ -z "$PREV_TAG" ]; then
            # If no previous tag, get the first commit
            FIRST_COMMIT=$(git rev-list --max-parents=0 HEAD)
            RANGE="$FIRST_COMMIT..HEAD"
            echo "Using range from first commit to HEAD"
          else
            RANGE="$PREV_TAG..HEAD"
            echo "Using range from $PREV_TAG to HEAD"
          fi

          # Create temporary changelog entry for RELEASE_NOTES.md
          echo "## [$VERSION] - $RELEASE_DATE" > RELEASE_NOTES.md
          echo "" >> RELEASE_NOTES.md

          # Add features
          git log $RANGE --format="* %s (%h)" --grep="^feat" --perl-regexp --no-merges 2>/dev/null > features.txt || touch features.txt

          if [ -s features.txt ]; then
            echo "### Added" >> RELEASE_NOTES.md
            echo "" >> RELEASE_NOTES.md
            sed 's/^\* feat[[:space:]]*\([^:]*\):[[:space:]]*/* /' features.txt >> RELEASE_NOTES.md
            echo "" >> RELEASE_NOTES.md
          fi

          # Add fixes
          git log $RANGE --format="* %s (%h)" --grep="^fix" --perl-regexp --no-merges 2>/dev/null > fixes.txt || touch fixes.txt

          if [ -s fixes.txt ]; then
            echo "### Fixed" >> RELEASE_NOTES.md
            echo "" >> RELEASE_NOTES.md
            sed 's/^\* fix[[:space:]]*\([^:]*\):[[:space:]]*/* /' fixes.txt >> RELEASE_NOTES.md
            echo "" >> RELEASE_NOTES.md
          fi

          # Add other changes (excluding merge commits, chore, docs, style, refactor, test, ci)
          git log $RANGE --format="* %s (%h)" --no-merges 2>/dev/null | \
            grep -v -E "^\* (feat|fix|chore|docs|style|refactor|test|ci)(\(.*\))?:" > others.txt || touch others.txt

          if [ -s others.txt ]; then
            echo "### Changed" >> RELEASE_NOTES.md
            echo "" >> RELEASE_NOTES.md
            cat others.txt >> RELEASE_NOTES.md
            echo "" >> RELEASE_NOTES.md
          fi

          # If no specific changes found, add a simple entry
          if [ ! -s features.txt ] && [ ! -s fixes.txt ] && [ ! -s others.txt ]; then
            echo "### Changed" >> RELEASE_NOTES.md
            echo "" >> RELEASE_NOTES.md
            echo "* Release version $VERSION" >> RELEASE_NOTES.md
            echo "" >> RELEASE_NOTES.md
          fi

          # Clean up temporary files
          rm -f features.txt fixes.txt others.txt

          echo "Release notes generated successfully"
          echo "================================"
          cat RELEASE_NOTES.md
          echo "================================"

      - name: Create Tag
        if: steps.check_tag.outputs.exists == 'false'
        run: |
          git tag v${{ steps.get_version.outputs.version }}
          git push --tags

      - name: Create GitHub Release
        if: steps.check_tag.outputs.exists == 'false'
        uses: softprops/action-gh-release@v2
        with:
          tag_name: v${{ steps.get_version.outputs.version }}
          body_path: RELEASE_NOTES.md
          token: ${{ secrets.GITHUB_TOKEN }}
          draft: false
          prerelease: false

      # Upload release notes for other jobs to use
      - name: Upload release notes
        if: steps.check_tag.outputs.exists == 'false'
        uses: actions/upload-artifact@v4
        with:
          name: release-notes
          path: RELEASE_NOTES.md
          retention-days: 1

  publish-pypi:
    needs: prepare-release
    if: needs.prepare-release.outputs.should_release == 'true'
    runs-on: ubuntu-latest
    permissions:
      contents: read
    steps:
      - name: Checkout
        uses: actions/checkout@v4
        with:
          ref: v${{ needs.prepare-release.outputs.version }}

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install build dependencies
        run: |
          python -m pip install --upgrade pip
          pip install build wheel twine

      - name: Build package
        run: |
          cd modelcontextprotocol
          python -m build

      - name: Publish to PyPI
        env:
          TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }}
          TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
        run: |
          cd modelcontextprotocol
          twine upload dist/*

  publish-docker:
    needs: prepare-release
    if: needs.prepare-release.outputs.should_release == 'true'
    runs-on: ubuntu-latest
    permissions:
      contents: read
      packages: write
    steps:
      - name: Checkout
        uses: actions/checkout@v4
        with:
          ref: v${{ needs.prepare-release.outputs.version }}

      - name: Set up QEMU for Cross-Platform Builds
        uses: docker/setup-qemu-action@v3

      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v3

      - name: Login to GitHub Container Registry
        uses: docker/login-action@v3
        with:
          registry: ghcr.io
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}

      - name: Build and push Docker image
        uses: docker/build-push-action@v5
        with:
          context: ./modelcontextprotocol/
          push: true
          tags: |
            ghcr.io/atlanhq/atlan-mcp-server:latest
            ghcr.io/atlanhq/atlan-mcp-server:${{ needs.prepare-release.outputs.version }}
          platforms: |
            linux/amd64
            linux/arm64

```

--------------------------------------------------------------------------------
/modelcontextprotocol/tools/assets.py:
--------------------------------------------------------------------------------

```python
import logging
from typing import List, Union, Dict, Any
from client import get_atlan_client
from .models import (
    UpdatableAsset,
    UpdatableAttribute,
    CertificateStatus,
    TermOperation,
    TermOperations,
)
from pyatlan.model.assets import Readme, AtlasGlossaryTerm, AtlasGlossaryCategory
from pyatlan.model.fluent_search import CompoundQuery, FluentSearch

# Initialize logging
logger = logging.getLogger(__name__)


def update_assets(
    updatable_assets: Union[UpdatableAsset, List[UpdatableAsset]],
    attribute_name: UpdatableAttribute,
    attribute_values: List[Union[str, CertificateStatus, TermOperations]],
) -> Dict[str, Any]:
    """
    Update one or multiple assets with different values for attributes or term operations.

    Args:
        updatable_assets (Union[UpdatableAsset, List[UpdatableAsset]]): Asset(s) to update.
            Can be a single UpdatableAsset or a list of UpdatableAssets.
            For asset of type_name=AtlasGlossaryTerm or type_name=AtlasGlossaryCategory, each asset dictionary MUST include a "glossary_guid" key which is the GUID of the glossary that the term belongs to.
        attribute_name (UpdatableAttribute): Name of the attribute to update.
            Supports userDescription, certificateStatus, readme, and term.
        attribute_values (List[Union[str, CertificateStatus, TermOperations]]): List of values to set for the attribute.
            For certificateStatus, only VERIFIED, DRAFT, or DEPRECATED are allowed.
            For readme, the value must be a valid Markdown string.
            For term, the value must be a TermOperations object with operation and term_guids.

    Returns:
        Dict[str, Any]: Dictionary containing:
            - updated_count: Number of assets successfully updated
            - errors: List of any errors encountered
            - operation: The operation that was performed (for term operations)
    """
    try:
        # Convert single asset to list for consistent handling
        if not isinstance(updatable_assets, list):
            updatable_assets = [updatable_assets]

        logger.info(
            f"Updating {len(updatable_assets)} assets with attribute '{attribute_name}'"
        )

        # Validate attribute values
        if len(updatable_assets) != len(attribute_values):
            error_msg = "Number of asset GUIDs must match number of attribute values"
            logger.error(error_msg)
            return {"updated_count": 0, "errors": [error_msg]}

        # Initialize result tracking
        result = {"updated_count": 0, "errors": []}

        # Validate certificate status values if applicable
        if attribute_name == UpdatableAttribute.CERTIFICATE_STATUS:
            for value in attribute_values:
                if value not in CertificateStatus.__members__.values():
                    error_msg = f"Invalid certificate status: {value}"
                    logger.error(error_msg)
                    result["errors"].append(error_msg)

        # Get Atlan client
        client = get_atlan_client()

        # Create assets with updated values
        assets = []
        # readme_update_parent_assets: Assets that were updated with readme.
        readme_update_parent_assets = []
        for index, updatable_asset in enumerate(updatable_assets):
            type_name = updatable_asset.type_name
            qualified_name = updatable_asset.qualified_name
            asset_cls = getattr(
                __import__("pyatlan.model.assets", fromlist=[type_name]), type_name
            )

            # Special handling for Glossary Term updates
            if (
                updatable_asset.type_name == AtlasGlossaryTerm.__name__
                or updatable_asset.type_name == AtlasGlossaryCategory.__name__
            ):
                asset = asset_cls.updater(
                    qualified_name=updatable_asset.qualified_name,
                    name=updatable_asset.name,
                    glossary_guid=updatable_asset.glossary_guid,
                )
            else:
                asset = asset_cls.updater(
                    qualified_name=updatable_asset.qualified_name,
                    name=updatable_asset.name,
                )

            # Special handling for README updates
            if attribute_name == UpdatableAttribute.README:
                # Get the current readme content for the asset
                # The below query is used to get the asset based on the qualified name and include the readme content.
                asset_readme_response = (
                    FluentSearch()
                    .select()
                    .where(CompoundQuery.asset_type(asset_cls))
                    .where(asset_cls.QUALIFIED_NAME.eq(qualified_name))
                    .include_on_results(asset_cls.README)
                    .include_on_relations(Readme.DESCRIPTION)
                    .execute(client=client)
                )

                if first := asset_readme_response.current_page():
                    updated_content = attribute_values[index]
                    # We replace the existing readme content with the new content.
                    # If the existing readme content is not present, we create a new readme asset.
                    updated_readme = Readme.creator(
                        asset=first[0], content=updated_content
                    )
                    # Save the readme asset
                    assets.append(updated_readme)
                    # Add the parent/actual asset to the list of assets that were updated with readme.
                    readme_update_parent_assets.append(asset)
            elif attribute_name == UpdatableAttribute.TERM:
                # Special handling for term operations
                term_value = attribute_values[index]
                if not isinstance(term_value, TermOperations):
                    error_msg = f"Term value must be a TermOperations object for asset {updatable_asset.qualified_name}"
                    logger.error(error_msg)
                    result["errors"].append(error_msg)
                    continue

                term_operation = TermOperation(term_value.operation.lower())
                term_guids = term_value.term_guids

                # Create term references
                term_refs = [
                    AtlasGlossaryTerm.ref_by_guid(guid=guid) for guid in term_guids
                ]

                try:
                    # Perform the appropriate term operation
                    if term_operation == TermOperation.APPEND:
                        client.asset.append_terms(
                            asset_type=asset_cls,
                            qualified_name=updatable_asset.qualified_name,
                            terms=term_refs,
                        )
                    elif term_operation == TermOperation.REPLACE:
                        client.asset.replace_terms(
                            asset_type=asset_cls,
                            qualified_name=updatable_asset.qualified_name,
                            terms=term_refs,
                        )
                    elif term_operation == TermOperation.REMOVE:
                        client.asset.remove_terms(
                            asset_type=asset_cls,
                            qualified_name=updatable_asset.qualified_name,
                            terms=term_refs,
                        )

                    result["updated_count"] += 1
                    logger.info(
                        f"Successfully {term_operation.value}d terms on asset: {updatable_asset.qualified_name}"
                    )

                except Exception as e:
                    error_msg = f"Error updating terms on asset {updatable_asset.qualified_name}: {str(e)}"
                    logger.error(error_msg)
                    result["errors"].append(error_msg)
            else:
                # Regular attribute update flow
                setattr(asset, attribute_name.value, attribute_values[index])
                assets.append(asset)

        if len(readme_update_parent_assets) > 0:
            result["readme_updated"] = len(readme_update_parent_assets)
            # Collect qualified names or other identifiers for assets that were updated with readme
            result["updated_readme_assets"] = [
                asset.qualified_name
                for asset in readme_update_parent_assets
                if hasattr(asset, "qualified_name")
            ]
            logger.info(
                f"Successfully updated {result['readme_updated']} readme assets: {result['updated_readme_assets']}"
            )

        # Proces response
        if len(assets) > 0:
            response = client.asset.save(assets)
            result["updated_count"] = len(response.guid_assignments)
        logger.info(f"Successfully updated {result['updated_count']} assets")

        return result

    except Exception as e:
        error_msg = f"Error updating assets: {str(e)}"
        logger.error(error_msg)
        return {"updated_count": 0, "errors": [error_msg]}

```

--------------------------------------------------------------------------------
/modelcontextprotocol/tools/models.py:
--------------------------------------------------------------------------------

```python
import logging
from enum import Enum
from typing import Optional, List, Union, Dict, Any

from pydantic import BaseModel, field_validator, model_validator

logger = logging.getLogger(__name__)


class CertificateStatus(str, Enum):
    """Enum for allowed certificate status values."""

    VERIFIED = "VERIFIED"
    DRAFT = "DRAFT"
    DEPRECATED = "DEPRECATED"


class UpdatableAttribute(str, Enum):
    """Enum for attributes that can be updated."""

    USER_DESCRIPTION = "user_description"
    CERTIFICATE_STATUS = "certificate_status"
    README = "readme"
    TERM = "term"


class TermOperation(str, Enum):
    """Enum for term operations on assets."""

    APPEND = "append"
    REPLACE = "replace"
    REMOVE = "remove"


class TermOperations(BaseModel):
    """Model for term operations on assets."""

    operation: TermOperation
    term_guids: List[str]


class UpdatableAsset(BaseModel):
    """Class representing an asset that can be updated."""

    guid: str
    name: str
    qualified_name: str
    type_name: str
    user_description: Optional[str] = None
    certificate_status: Optional[CertificateStatus] = None
    glossary_guid: Optional[str] = None


class Glossary(BaseModel):
    """Payload model for creating a glossary asset."""

    name: str
    user_description: Optional[str] = None
    certificate_status: Optional[CertificateStatus] = None


class GlossaryCategory(BaseModel):
    """Payload model for creating a glossary category asset."""

    name: str
    glossary_guid: str
    user_description: Optional[str] = None
    certificate_status: Optional[CertificateStatus] = None
    parent_category_guid: Optional[str] = None


class GlossaryTerm(BaseModel):
    """Payload model for creating a glossary term asset."""

    name: str
    glossary_guid: str
    user_description: Optional[str] = None
    certificate_status: Optional[CertificateStatus] = None
    category_guids: Optional[List[str]] = None


class DataDomainSpec(BaseModel):
    """Payload model for creating a Data Domain or Sub Domain asset."""

    name: str
    parent_domain_qualified_name: Optional[str] = (
        None  # if passed, will be created as a sub domain
    )
    user_description: Optional[str] = None
    certificate_status: Optional[CertificateStatus] = None


class DataProductSpec(BaseModel):
    """Payload model for creating a Data Product asset."""

    name: str
    domain_qualified_name: str
    asset_guids: List[str]  # Required: at least one asset GUID for data products
    user_description: Optional[str] = None
    certificate_status: Optional[CertificateStatus] = None

    @field_validator("asset_guids")
    @classmethod
    def validate_asset_guids(cls, v: List[str]) -> List[str]:
        """Validate that asset_guids is not empty."""
        if not v:
            raise ValueError(
                "Data products require at least one asset GUID. "
                "Please provide asset_guids to link assets to this product."
            )
        return v


class DQRuleCondition(BaseModel):
    """Model representing a single data quality rule condition."""

    type: (
        str  # Condition type (e.g., "STRING_LENGTH_BETWEEN", "REGEX_MATCH", "IN_LIST")
    )
    value: Optional[Union[str, List[str]]] = None  # Single value or list of values
    min_value: Optional[Union[int, float]] = None  # Minimum value for range conditions
    max_value: Optional[Union[int, float]] = None  # Maximum value for range conditions


class DQAssetType(str, Enum):
    """Enum for supported asset types for data quality rules."""

    TABLE = "Table"
    VIEW = "View"
    MATERIALIZED_VIEW = "MaterialisedView"
    SNOWFLAKE_DYNAMIC_TABLE = "SnowflakeDynamicTable"


class DQRuleType(str, Enum):
    """Enum for supported data quality rule types."""

    # Completeness checks
    NULL_COUNT = "Null Count"
    NULL_PERCENTAGE = "Null Percentage"
    BLANK_COUNT = "Blank Count"
    BLANK_PERCENTAGE = "Blank Percentage"

    # Statistical checks
    MIN_VALUE = "Min Value"
    MAX_VALUE = "Max Value"
    AVERAGE = "Average"
    STANDARD_DEVIATION = "Standard Deviation"

    # Uniqueness checks
    UNIQUE_COUNT = "Unique Count"
    DUPLICATE_COUNT = "Duplicate Count"

    # Validity checks
    REGEX = "Regex"
    STRING_LENGTH = "String Length"
    VALID_VALUES = "Valid Values"

    # Timeliness checks
    FRESHNESS = "Freshness"

    # Volume checks
    ROW_COUNT = "Row Count"

    # Custom checks
    CUSTOM_SQL = "Custom SQL"

    def get_rule_config(self) -> Dict[str, Any]:
        """
        Get complete configuration for this rule type.

        Returns:
            Dict containing:
                - creator_method: Name of the DataQualityRule creator method to use
                - requires_column: Whether this rule requires column_qualified_name
                - supports_conditions: Whether this rule supports conditional logic
        """
        # Custom SQL rules
        if self == DQRuleType.CUSTOM_SQL:
            return {
                "creator_method": "custom_sql_creator",
                "requires_column": False,
                "supports_conditions": False,
            }

        # Table-level rules
        if self == DQRuleType.ROW_COUNT:
            return {
                "creator_method": "table_level_rule_creator",
                "requires_column": False,
                "supports_conditions": False,
            }

        # Column-level rules with conditions
        if self in {
            DQRuleType.STRING_LENGTH,
            DQRuleType.REGEX,
            DQRuleType.VALID_VALUES,
        }:
            return {
                "creator_method": "column_level_rule_creator",
                "requires_column": True,
                "supports_conditions": True,
            }

        # Standard column-level rules
        return {
            "creator_method": "column_level_rule_creator",
            "requires_column": True,
            "supports_conditions": False,
        }


class DQRuleSpecification(BaseModel):
    """
    Comprehensive model for creating any type of data quality rule.

    Different rule types require different fields:
    - Column-level rules: require column_qualified_name
    - Table-level rules: only require asset_qualified_name
    - Custom SQL rules: require custom_sql, rule_name, dimension
    - Rules with conditions: require rule_conditions (String Length, Regex, Valid Values)
    """

    # Core identification
    rule_type: DQRuleType
    asset_qualified_name: str
    asset_type: Optional[DQAssetType] = DQAssetType.TABLE  # Default to Table

    # Column-level specific (required for most rule types except Row Count and Custom SQL)
    column_qualified_name: Optional[str] = None

    # Threshold configuration
    threshold_value: Optional[Union[int, float]] = None
    threshold_compare_operator: Optional[str] = None  # "EQUAL", "GREATER_THAN", etc.
    threshold_unit: Optional[str] = None  # "DAYS", "HOURS", "MINUTES"

    # Alert configuration
    alert_priority: Optional[str] = "NORMAL"  # "LOW", "NORMAL", "URGENT"

    # Custom SQL specific
    custom_sql: Optional[str] = None
    rule_name: Optional[str] = None
    dimension: Optional[str] = None  # "COMPLETENESS", "VALIDITY", etc.

    # Advanced configuration
    rule_conditions: Optional[List[DQRuleCondition]] = None
    row_scope_filtering_enabled: Optional[bool] = False
    description: Optional[str] = None

    @model_validator(mode="after")
    def validate_rule_requirements(self) -> "DQRuleSpecification":
        """
        Validate rule specification based on rule type requirements.

        Raises:
            ValueError: If required fields are missing for the rule type
        """
        errors = []
        config = self.rule_type.get_rule_config()

        # Check if column is required but missing
        if config["requires_column"] and not self.column_qualified_name:
            errors.append(f"{self.rule_type.value} requires column_qualified_name")

        # Custom SQL rules require specific fields
        if self.rule_type == DQRuleType.CUSTOM_SQL:
            if not self.custom_sql:
                errors.append("Custom SQL rules require custom_sql field")
            if not self.rule_name:
                errors.append("Custom SQL rules require rule_name field")
            if not self.dimension:
                errors.append("Custom SQL rules require dimension field")

        # Conditional rules should have conditions (warning only)
        if config["supports_conditions"] and not self.rule_conditions:
            logger.warning(f"{self.rule_type.value} rule created without conditions")

        # Freshness rules require threshold_unit
        if self.rule_type == DQRuleType.FRESHNESS and not self.threshold_unit:
            errors.append(
                "Freshness rules require threshold_unit (DAYS, HOURS, or MINUTES)"
            )

        # All rules require threshold_value
        if self.threshold_value is None:
            errors.append(f"{self.rule_type.value} requires threshold_value")

        if errors:
            raise ValueError("; ".join(errors))

        return self


class CreatedRuleInfo(BaseModel):
    """Model representing information about a created data quality rule."""

    guid: str
    qualified_name: str
    rule_type: Optional[str] = None


class DQRuleCreationResponse(BaseModel):
    """Response model for data quality rule creation operations."""

    created_count: int = 0
    created_rules: List[CreatedRuleInfo] = []
    errors: List[str] = []


class DQRuleScheduleSpecification(BaseModel):
    """
    Specification model for scheduling data quality rules on an asset.

    This model defines the required parameters for scheduling DQ rule
    execution on a table, view, or other supported asset types.

    """

    asset_type: DQAssetType
    asset_name: str
    asset_qualified_name: str
    schedule_crontab: str
    schedule_time_zone: str

    @field_validator("schedule_crontab")
    @classmethod
    def validate_crontab(cls, v: str) -> str:
        """
        Validate the crontab expression format.

        A valid cron expression should have 5 fields:
        minute, hour, day of month, month, day of week.
        """
        parts = v.strip().split()
        if len(parts) != 5:
            raise ValueError(
                f"Invalid cron expression '{v}'. Expected 5 fields "
                "(minute hour day-of-month month day-of-week), got {len(parts)}."
            )
        return v.strip()

    @field_validator("schedule_time_zone")
    @classmethod
    def validate_timezone(cls, v: str) -> str:
        """Validate that a non-empty timezone string is provided."""
        if not v or not v.strip():
            raise ValueError("schedule_time_zone cannot be empty")
        return v.strip()


class ScheduledAssetInfo(BaseModel):
    """
    Model representing information about a successfully scheduled asset.

    This is returned as part of the response to indicate which assets
    had their DQ rule schedules configured successfully.
    """

    asset_name: str
    asset_qualified_name: str
    schedule_crontab: str
    schedule_time_zone: str


class DQRuleScheduleResponse(BaseModel):
    """Response model for data quality rule scheduling operations."""

    scheduled_count: int = 0
    scheduled_assets: List[ScheduledAssetInfo] = []
    errors: List[str] = []


class DQRuleInfo(BaseModel):
    """Model representing a data quality rule identifier.

    Used for both delete operations (input) and deleted rule tracking (output).
    """

    rule_guid: str


class DQRuleDeleteResponse(BaseModel):
    """Response model for data quality rule deletion operations."""

    deleted_count: int = 0
    deleted_rules: List[DQRuleInfo] = []


class DQRuleUpdateSpecification(BaseModel):
    """
    Model for updating an existing data quality rule.

    Only necessary and updatable fields are included. All fields except
    qualified_name, rule_type, and asset_qualified_name are optional.
    """

    # Required fields for identification and validation
    qualified_name: str  # The qualified name of the rule to update
    rule_type: DQRuleType  # Type of rule (required for validation)
    asset_qualified_name: (
        str  # Qualified name of the table/view (required for validation)
    )

    # Optional updatable fields
    threshold_value: Optional[Union[int, float]] = None
    threshold_compare_operator: Optional[str] = None  # "EQUAL", "GREATER_THAN", etc.
    threshold_unit: Optional[str] = (
        None  # "DAYS", "HOURS", "MINUTES" (for Freshness rules)
    )
    alert_priority: Optional[str] = None  # "LOW", "NORMAL", "URGENT"

    # Custom SQL specific fields
    custom_sql: Optional[str] = None
    rule_name: Optional[str] = None
    dimension: Optional[str] = None  # "COMPLETENESS", "VALIDITY", etc.

    # Advanced configuration
    rule_conditions: Optional[List[DQRuleCondition]] = None
    row_scope_filtering_enabled: Optional[bool] = None
    description: Optional[str] = None


class UpdatedRuleInfo(BaseModel):
    """Model representing information about an updated data quality rule."""

    guid: str
    qualified_name: str
    rule_type: Optional[str] = None


class DQRuleUpdateResponse(BaseModel):
    """Response model for data quality rule update operations."""

    updated_count: int = 0
    updated_rules: List[UpdatedRuleInfo] = []
    errors: List[str] = []

```

--------------------------------------------------------------------------------
/modelcontextprotocol/tools/search.py:
--------------------------------------------------------------------------------

```python
import logging
from typing import Type, List, Optional, Union, Dict, Any

from client import get_atlan_client
from pyatlan.model.assets import Asset, AtlasGlossaryTerm
from pyatlan.model.fluent_search import CompoundQuery, FluentSearch
from pyatlan.model.fields.atlan_fields import AtlanField
from utils.search import SearchUtils
from utils.constants import DEFAULT_SEARCH_ATTRIBUTES, VALID_RELATIONSHIPS

# Configure logging
logger = logging.getLogger(__name__)


def search_assets(
    conditions: Optional[Union[Dict[str, Any], str]] = None,
    negative_conditions: Optional[Dict[str, Any]] = None,
    some_conditions: Optional[Dict[str, Any]] = None,
    min_somes: int = 1,
    include_attributes: Optional[List[Union[str, AtlanField]]] = None,
    asset_type: Optional[Union[Type[Asset], str]] = None,
    include_archived: bool = False,
    limit: int = 10,
    offset: int = 0,
    sort_by: Optional[str] = None,
    sort_order: str = "ASC",
    connection_qualified_name: Optional[str] = None,
    tags: Optional[List[str]] = None,
    directly_tagged: bool = True,
    domain_guids: Optional[List[str]] = None,
    date_range: Optional[Dict[str, Dict[str, Any]]] = None,
    guids: Optional[List[str]] = None,
) -> Dict[str, Any]:
    """
    Advanced asset search using FluentSearch with flexible conditions.

    By default, only essential attributes used in result processing are included.
    Additional attributes can be specified via include_attributes parameter.

    Args:
        conditions (Dict[str, Any], optional): Dictionary of attribute conditions to match.
            Format: {"attribute_name": value} or {"attribute_name": {"operator": operator, "value": value}}
        negative_conditions (Dict[str, Any], optional): Dictionary of attribute conditions to exclude.
            Format: {"attribute_name": value} or {"attribute_name": {"operator": operator, "value": value}}
        some_conditions (Dict[str, Any], optional): Conditions for where_some() queries that require min_somes of them to match.
            Format: {"attribute_name": value} or {"attribute_name": {"operator": operator, "value": value}}
        min_somes (int): Minimum number of some_conditions that must match. Defaults to 1.
        include_attributes (List[Union[str, AtlanField]], optional): List of additional attributes to include in results.
            Can be string attribute names or AtlanField objects. These will be added to the default set.
        asset_type (Union[Type[Asset], str], optional): Type of asset to search for.
            Either a class (e.g., Table, Column) or a string type name (e.g., "Table", "Column")
        include_archived (bool): Whether to include archived assets. Defaults to False.
        limit (int, optional): Maximum number of results to return. Defaults to 10.
        offset (int, optional): Offset for pagination. Defaults to 0.
        sort_by (str, optional): Attribute to sort by. Defaults to None.
        sort_order (str, optional): Sort order, "ASC" or "DESC". Defaults to "ASC".
        connection_qualified_name (str, optional): Connection qualified name to filter by.
        tags (List[str], optional): List of tags to filter by.
        directly_tagged (bool): Whether to filter for directly tagged assets only. Defaults to True.
        domain_guids (List[str], optional): List of domain GUIDs to filter by.
        date_range (Dict[str, Dict[str, Any]], optional): Date range filters.
            Format: {"attribute_name": {"gte": start_timestamp, "lte": end_timestamp}}
        guids (List[str], optional): List of GUIDs to filter by.


    Returns:
        Dict[str, Any]: Dictionary containing:
            - results: List of assets matching the search criteria
            - aggregations: Search aggregations if available
            - error: None if no error occurred, otherwise the error message
    """
    logger.info(
        f"Starting asset search with parameters: asset_type={asset_type}, "
        f"limit={limit}, include_archived={include_archived}"
    )
    logger.debug(
        f"Full search parameters: conditions={conditions}, "
        f"negative_conditions={negative_conditions}, some_conditions={some_conditions}, "
        f"include_attributes={include_attributes}, "
        f"connection_qualified_name={connection_qualified_name}, "
        f"tags={tags}, domain_guids={domain_guids}"
    )

    try:
        # Initialize FluentSearch
        logger.debug("Initializing FluentSearch object")
        search = FluentSearch()

        # Apply asset type filter if provided
        if asset_type:
            if isinstance(asset_type, str):
                # Handle string type name
                logger.debug(f"Filtering by asset type name: {asset_type}")
                search = search.where(Asset.TYPE_NAME.eq(asset_type))
            else:
                # Handle class type
                logger.debug(f"Filtering by asset class: {asset_type.__name__}")
                search = search.where(CompoundQuery.asset_type(asset_type))

        # Filter for active assets unless archived are explicitly included
        if not include_archived:
            logger.debug("Filtering for active assets only")
            search = search.where(CompoundQuery.active_assets())

        # Apply connection qualified name filter if provided
        if connection_qualified_name:
            logger.debug(
                f"Filtering by connection qualified name: {connection_qualified_name}"
            )
            search = search.where(
                Asset.QUALIFIED_NAME.startswith(connection_qualified_name)
            )

        # Apply tags filter if provided
        if tags and len(tags) > 0:
            logger.debug(
                f"Filtering by tags: {tags}, directly_tagged={directly_tagged}"
            )
            search = search.where(
                CompoundQuery.tagged(with_one_of=tags, directly=directly_tagged)
            )

        # Apply domain GUIDs filter if provided
        if domain_guids and len(domain_guids) > 0:
            logger.debug(f"Filtering by domain GUIDs: {domain_guids}")
            for guid in domain_guids:
                search = search.where(Asset.DOMAIN_GUIDS.eq(guid))

        # Apply positive conditions
        if conditions:
            if not isinstance(conditions, dict):
                error_msg = f"Conditions parameter must be a dictionary, got {type(conditions).__name__}"
                logger.error(error_msg)
                return []

            logger.debug(f"Applying positive conditions: {conditions}")
            for attr_name, condition in conditions.items():
                attr = SearchUtils._get_asset_attribute(attr_name)
                if attr is None:
                    logger.warning(
                        f"Unknown attribute: {attr_name}, skipping condition"
                    )
                    continue

                logger.debug(f"Processing condition for attribute: {attr_name}")

                search = SearchUtils._process_condition(
                    search, attr, condition, attr_name, "where"
                )

        # Apply negative conditions
        if negative_conditions:
            logger.debug(f"Applying negative conditions: {negative_conditions}")
            for attr_name, condition in negative_conditions.items():
                attr = SearchUtils._get_asset_attribute(attr_name)
                if attr is None:
                    logger.warning(
                        f"Unknown attribute for negative condition: {attr_name}, skipping"
                    )
                    continue

                logger.debug(
                    f"Processing negative condition for attribute: {attr_name}"
                )

                search = SearchUtils._process_condition(
                    search, attr, condition, attr_name, "where_not"
                )

        # Apply where_some conditions with min_somes
        if some_conditions:
            logger.debug(
                f"Applying 'some' conditions: {some_conditions} with min_somes={min_somes}"
            )
            for attr_name, condition in some_conditions.items():
                attr = SearchUtils._get_asset_attribute(attr_name)
                if attr is None:
                    logger.warning(
                        f"Unknown attribute for 'some' condition: {attr_name}, skipping"
                    )
                    continue

                logger.debug(f"Processing 'some' condition for attribute: {attr_name}")

                search = SearchUtils._process_condition(
                    search, attr, condition, attr_name, "where_some"
                )
            search = search.min_somes(min_somes)

        # Apply date range filters
        if date_range:
            logger.debug(f"Applying date range filters: {date_range}")
            date_range_count = 0
            for attr_name, range_cond in date_range.items():
                attr = SearchUtils._get_asset_attribute(attr_name)
                if attr is None:
                    logger.warning(
                        f"Unknown attribute for date range: {attr_name}, skipping"
                    )
                    continue

                logger.debug(f"Processing date range for attribute: {attr_name}")

                if "gte" in range_cond:
                    logger.debug(f"Adding {attr_name} >= {range_cond['gte']}")
                    search = search.where(attr.gte(range_cond["gte"]))
                    date_range_count += 1
                if "lte" in range_cond:
                    logger.debug(f"Adding {attr_name} <= {range_cond['lte']}")
                    search = search.where(attr.lte(range_cond["lte"]))
                    date_range_count += 1
                if "gt" in range_cond:
                    logger.debug(f"Adding {attr_name} > {range_cond['gt']}")
                    search = search.where(attr.gt(range_cond["gt"]))
                    date_range_count += 1
                if "lt" in range_cond:
                    logger.debug(f"Adding {attr_name} < {range_cond['lt']}")
                    search = search.where(attr.lt(range_cond["lt"]))
                    date_range_count += 1

            logger.debug(f"Applied {date_range_count} date range conditions")

        if guids and len(guids) > 0:
            logger.debug(f"Applying GUID filter: {guids}")
            search = search.where(Asset.GUID.within(guids))

        # Prepare attributes to include: default attributes + additional user-specified attributes
        all_attributes = DEFAULT_SEARCH_ATTRIBUTES.copy()

        if include_attributes:
            logger.debug(f"Adding user-specified attributes: {include_attributes}")
            for attr in include_attributes:
                if isinstance(attr, str):
                    if attr not in all_attributes:
                        all_attributes.append(attr)
                else:
                    # For AtlanField objects, we'll add them directly to the search
                    # They can't be easily compared for duplicates
                    pass

        logger.debug(f"Total attributes to include: {all_attributes}")

        # Include all attributes in results
        for attr_name in all_attributes:
            attr_obj = SearchUtils._get_asset_attribute(attr_name)
            if attr_obj is None:
                logger.warning(
                    f"Unknown attribute for inclusion: {attr_name}, skipping"
                )
                continue
            logger.debug(f"Including attribute: {attr_name}")
            search = search.include_on_results(attr_obj)

        # Include additional AtlanField objects specified by user
        if include_attributes:
            for attr in include_attributes:
                if not isinstance(attr, str):
                    # Assume it's already an AtlanField object
                    logger.debug(f"Including attribute object: {attr}")
                    search = search.include_on_results(attr)
                elif attr in VALID_RELATIONSHIPS:
                    search = search.include_on_results(attr)
        try:
            search = search.include_on_results(Asset.ASSIGNED_TERMS)
            search = search.include_on_relations(AtlasGlossaryTerm.NAME)
        except Exception as e:
            logger.warning(f"Error including assigned terms: {e}")

        # Set pagination
        logger.debug(f"Setting pagination: limit={limit}, offset={offset}")
        search = search.page_size(limit)
        if offset > 0:
            search = search.from_offset(offset)

        # Set sorting
        if sort_by:
            sort_attr = SearchUtils._get_asset_attribute(sort_by)
            if sort_attr is not None:
                if sort_order.upper() == "DESC":
                    logger.debug(f"Setting sort order: {sort_by} DESC")
                    search = search.sort_by_desc(sort_attr)
                else:
                    logger.debug(f"Setting sort order: {sort_by} ASC")
                    search = search.sort_by_asc(sort_attr)
            else:
                logger.warning(
                    f"Unknown attribute for sorting: {sort_by}, skipping sort"
                )

        # Execute search
        logger.debug("Converting FluentSearch to request object")
        request = search.to_request()

        logger.info("Executing search request")
        client = get_atlan_client()
        search_response = client.asset.search(request)
        processed_results = SearchUtils.process_results(search_response)
        logger.info(
            f"Search completed, returned {len(processed_results['results'])} results"
        )
        return processed_results

    except Exception as e:
        logger.error(f"Error searching assets: {str(e)}")
        return [{"results": [], "aggregations": {}, "error": str(e)}]

```

--------------------------------------------------------------------------------
/modelcontextprotocol/tools/dq_rules.py:
--------------------------------------------------------------------------------

```python
"""
Data Quality Rules creation and update tools for Atlan MCP server.

This module provides functionality to create and update data quality rules in Atlan,
supporting column-level, table-level, and custom SQL rules.
"""

from __future__ import annotations
import logging
from typing import Dict, Any, List, Union

from pyatlan.model.assets import (
    DataQualityRule,
    Table,
    Column,
    View,
    MaterialisedView,
    SnowflakeDynamicTable,
)
from pyatlan.model.enums import (
    DataQualityRuleAlertPriority,
    DataQualityRuleThresholdCompareOperator,
    DataQualityDimension,
    DataQualityRuleThresholdUnit,
    DataQualityRuleTemplateConfigRuleConditions,
)
from pyatlan.model.dq_rule_conditions import DQRuleConditionsBuilder

from client import get_atlan_client
from .models import (
    DQRuleSpecification,
    DQRuleType,
    DQRuleCreationResponse,
    CreatedRuleInfo,
    DQRuleCondition,
    DQAssetType,
    DQRuleScheduleSpecification,
    DQRuleScheduleResponse,
    ScheduledAssetInfo,
    DQRuleInfo,
    DQRuleDeleteResponse,
    DQRuleUpdateSpecification,
    DQRuleUpdateResponse,
    UpdatedRuleInfo,
)

logger = logging.getLogger(__name__)


# Asset type class mapping for DQ rule operations
_ASSET_TYPE_MAP = {
    DQAssetType.TABLE: Table,
    DQAssetType.VIEW: View,
    DQAssetType.MATERIALIZED_VIEW: MaterialisedView,
    DQAssetType.SNOWFLAKE_DYNAMIC_TABLE: SnowflakeDynamicTable,
}


def create_dq_rules(
    rules: Union[Dict[str, Any], List[Dict[str, Any]]],
) -> DQRuleCreationResponse:
    """
    Create one or multiple data quality rules in Atlan.

    Args:
        rules (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single rule
            specification or a list of rule specifications.

    Returns:
        DQRuleCreationResponse: Response containing:
            - created_count: Number of rules successfully created
            - created_rules: List of created rule details (guid, qualified_name, rule_type)
            - errors: List of any errors encountered

    Raises:
        Exception: If there's an error creating the rules.
    """
    # Convert single rule to list for consistent handling
    data = rules if isinstance(rules, list) else [rules]
    logger.info(f"Creating {len(data)} data quality rule(s)")

    result = DQRuleCreationResponse()

    try:
        # Validate and parse specifications
        specs = []
        for idx, item in enumerate(data):
            try:
                # Pydantic model validation happens automatically
                spec = DQRuleSpecification(**item)
                specs.append(spec)
            except ValueError as e:
                # Pydantic validation errors
                result.errors.append(f"Rule {idx + 1} validation error: {str(e)}")
                logger.error(f"Error validating rule specification {idx + 1}: {e}")
            except Exception as e:
                result.errors.append(f"Rule {idx + 1} error: {str(e)}")
                logger.error(f"Error parsing rule specification {idx + 1}: {e}")

        if not specs:
            logger.warning("No valid rule specifications to create")
            return result

        # Get Atlan client
        client = get_atlan_client()

        # Create rules
        created_assets = []
        for spec in specs:
            try:
                rule = _create_dq_rule(spec, client)
                created_assets.append(rule)

            except Exception as e:
                error_msg = f"Error creating {spec.rule_type.value} rule: {str(e)}"
                result.errors.append(error_msg)
                logger.error(error_msg)

        if not created_assets:
            return result

        # Bulk save all created rules
        logger.info(f"Saving {len(created_assets)} data quality rules")
        response = client.asset.save(created_assets)

        # Process response
        for created_rule in response.mutated_entities.CREATE:
            result.created_rules.append(
                CreatedRuleInfo(
                    guid=created_rule.guid,
                    qualified_name=created_rule.qualified_name,
                    rule_type=created_rule.dq_rule_type
                    if hasattr(created_rule, "dq_rule_type")
                    else None,
                )
            )

        result.created_count = len(result.created_rules)
        logger.info(f"Successfully created {result.created_count} data quality rules")

        return result

    except Exception as e:
        error_msg = f"Error in bulk rule creation: {str(e)}"
        logger.error(error_msg)
        result.errors.append(error_msg)
        return result


def _create_dq_rule(spec: DQRuleSpecification, client) -> DataQualityRule:
    """
    Create a data quality rule based on specification.

    This unified method handles all rule types by using the rule's configuration
    to determine the appropriate creator method and required parameters.

    Args:
        spec (DQRuleSpecification): Rule specification
        client: Atlan client instance

    Returns:
        DataQualityRule: Created rule asset
    """
    # Get rule configuration
    config = spec.rule_type.get_rule_config()

    # Determine asset class based on asset type
    asset_class = _ASSET_TYPE_MAP.get(spec.asset_type, Table)

    # Base parameters common to all rule types
    params = {
        "client": client,
        "asset": asset_class.ref_by_qualified_name(
            qualified_name=spec.asset_qualified_name
        ),
        "threshold_value": spec.threshold_value,
        "alert_priority": DataQualityRuleAlertPriority[spec.alert_priority],
    }

    # Add rule-type specific parameters based on config
    if spec.rule_type == DQRuleType.CUSTOM_SQL:
        params.update(
            {
                "rule_name": spec.rule_name,
                "custom_sql": spec.custom_sql,
                "dimension": DataQualityDimension[spec.dimension],
            }
        )
    else:
        params["rule_type"] = spec.rule_type.value

        # Add column reference if required
        if config["requires_column"]:
            params["column"] = Column.ref_by_qualified_name(
                qualified_name=spec.column_qualified_name
            )

    # Add optional parameters
    if spec.threshold_compare_operator:
        params["threshold_compare_operator"] = DataQualityRuleThresholdCompareOperator[
            spec.threshold_compare_operator
        ]

    if spec.threshold_unit:
        params["threshold_unit"] = DataQualityRuleThresholdUnit[spec.threshold_unit]

    if spec.row_scope_filtering_enabled:
        params["row_scope_filtering_enabled"] = spec.row_scope_filtering_enabled

    # Add rule conditions if supported and provided
    if config["supports_conditions"] and spec.rule_conditions:
        params["rule_conditions"] = _build_rule_conditions(spec.rule_conditions)

    # Create rule based on type using explicit creator methods
    if spec.rule_type == DQRuleType.CUSTOM_SQL:
        dq_rule = DataQualityRule.custom_sql_creator(**params)
    elif spec.rule_type == DQRuleType.ROW_COUNT:
        dq_rule = DataQualityRule.table_level_rule_creator(**params)
    elif spec.rule_type in {
        DQRuleType.NULL_COUNT,
        DQRuleType.NULL_PERCENTAGE,
        DQRuleType.BLANK_COUNT,
        DQRuleType.BLANK_PERCENTAGE,
        DQRuleType.MIN_VALUE,
        DQRuleType.MAX_VALUE,
        DQRuleType.AVERAGE,
        DQRuleType.STANDARD_DEVIATION,
        DQRuleType.UNIQUE_COUNT,
        DQRuleType.DUPLICATE_COUNT,
        DQRuleType.REGEX,
        DQRuleType.STRING_LENGTH,
        DQRuleType.VALID_VALUES,
        DQRuleType.FRESHNESS,
    }:
        dq_rule = DataQualityRule.column_level_rule_creator(**params)
    else:
        raise ValueError(f"Unsupported rule type: {spec.rule_type}")

    # Add description if provided
    if spec.description:
        dq_rule.description = spec.description

    return dq_rule


def _build_rule_conditions(conditions: List[DQRuleCondition]) -> Any:
    """
    Build DQRuleConditionsBuilder from condition specifications.

    Args:
        conditions (List[DQRuleCondition]): List of rule condition models

    Returns:
        Built rule conditions object
    """
    builder = DQRuleConditionsBuilder()

    for condition in conditions:
        condition_type = DataQualityRuleTemplateConfigRuleConditions[condition.type]

        # Build condition parameters dynamically
        condition_params = {"type": condition_type}

        for key in ["value", "min_value", "max_value"]:
            value = getattr(condition, key)
            if value is not None:
                condition_params[key] = value

        builder.add_condition(**condition_params)

    return builder.build()


def schedule_dq_rules(
    schedules: Union[Dict[str, Any], List[Dict[str, Any]]],
) -> DQRuleScheduleResponse:
    """
    Schedule data quality rule execution for one or multiple assets.

    Args:
        schedules: Either a single schedule specification or a list of specifications.

    Returns:
        DQRuleScheduleResponse: Response containing scheduled_count, scheduled_assets, and errors.
    """
    # Convert single schedule to list for consistent handling
    data = schedules if isinstance(schedules, list) else [schedules]

    result = DQRuleScheduleResponse()

    # Validate and parse specifications
    specs = []
    for idx, item in enumerate(data):
        try:
            spec = DQRuleScheduleSpecification(**item)
            specs.append(spec)
        except Exception as e:
            result.errors.append(f"Schedule {idx + 1} error: {str(e)}")
            logger.error(f"Error parsing schedule specification {idx + 1}: {e}")

    if not specs:
        logger.warning("No valid schedule specifications to create")
        return result

    # Get Atlan client
    client = get_atlan_client()

    # Schedule rules for each asset
    for spec in specs:
        try:
            asset_cls = _ASSET_TYPE_MAP.get(spec.asset_type)
            if not asset_cls:
                raise ValueError(f"Unsupported asset type: {spec.asset_type.value}")

            client.asset.add_dq_rule_schedule(
                asset_type=asset_cls,
                asset_name=spec.asset_name,
                asset_qualified_name=spec.asset_qualified_name,
                schedule_crontab=spec.schedule_crontab,
                schedule_time_zone=spec.schedule_time_zone,
            )

            result.scheduled_assets.append(
                ScheduledAssetInfo(
                    asset_name=spec.asset_name,
                    asset_qualified_name=spec.asset_qualified_name,
                    schedule_crontab=spec.schedule_crontab,
                    schedule_time_zone=spec.schedule_time_zone,
                )
            )
            result.scheduled_count += 1

        except Exception as e:
            error_msg = f"Error scheduling {spec.asset_name}: {str(e)}"
            result.errors.append(error_msg)
            logger.error(error_msg)

    return result


def delete_dq_rules(
    rule_guids: Union[str, List[str]],
) -> DQRuleDeleteResponse:
    """
    Delete one or multiple data quality rules in Atlan.

    Args:
        rule_guids: Single rule GUID or list of rule GUIDs to delete.

    Returns:
        DQRuleDeleteResponse with deletion results and any errors.

    Example:
        # Delete single rule
        result = delete_dq_rules("rule-guid-123")

        # Delete multiple rules
        result = delete_dq_rules(["rule-guid-1", "rule-guid-2"])
    """
    # Convert single GUID to list for consistent handling
    data = rule_guids if isinstance(rule_guids, list) else [rule_guids]

    result = DQRuleDeleteResponse()

    # Validate and parse specifications
    specs = []
    for idx, item in enumerate(data):
        try:
            if isinstance(item, str):
                spec = DQRuleInfo(rule_guid=item)
            else:
                spec = DQRuleInfo(**item)
            specs.append(spec)
        except Exception as e:
            result.errors.append(f"Rule {idx + 1} error: {str(e)}")
            logger.error(f"Error parsing rule specification {idx + 1}: {e}")

    if not specs:
        logger.warning("No valid rule specifications to delete")
        return result

    # Get Atlan client
    client = get_atlan_client()

    # Delete each rule
    for spec in specs:
        try:
            response = client.asset.delete_by_guid(guid=spec.rule_guid)
            deleted_assets = response.assets_deleted(asset_type=DataQualityRule)

            if deleted_assets:
                result.deleted_rules.append(DQRuleInfo(rule_guid=spec.rule_guid))
                result.deleted_count += 1
                logger.info(f"Successfully deleted rule: {spec.rule_guid}")
            else:
                error_msg = f"No rule found with GUID: {spec.rule_guid}"
                result.errors.append(error_msg)
                logger.warning(error_msg)

        except Exception as e:
            error_msg = f"Error deleting rule {spec.rule_guid}: {str(e)}"
            result.errors.append(error_msg)
            logger.error(error_msg)

    return result


def update_dq_rules(
    rules: Union[Dict[str, Any], List[Dict[str, Any]]],
) -> DQRuleUpdateResponse:
    """
    Update one or multiple existing data quality rules in Atlan.

    To update a rule, you only need to provide the qualified name, rule_type, and
    asset_qualified_name. All other parameters are optional and will only be updated
    if provided.

    Args:
        rules (Union[Dict[str, Any], List[Dict[str, Any]]): Either a single rule
            specification or a list of rule specifications. Each specification must include:
            - qualified_name (str): The qualified name of the rule to update (required)
            - rule_type (str): Type of rule (required for validation)
            - asset_qualified_name (str): Qualified name of the table/view (required)
            - Additional optional fields to update (see examples)

    Returns:
        DQRuleUpdateResponse: Response containing:
            - updated_count: Number of rules successfully updated
            - updated_rules: List of updated rule details (guid, qualified_name, rule_type)
            - errors: List of any errors encountered

    Raises:
        Exception: If there's an error updating the rules.
    """
    # Convert single rule to list for consistent handling
    data = rules if isinstance(rules, list) else [rules]
    logger.info(f"Updating {len(data)} data quality rule(s)")

    result = DQRuleUpdateResponse()

    try:
        # Validate and parse specifications
        specs = []
        for idx, item in enumerate(data):
            try:
                # Pydantic model validation happens automatically
                spec = DQRuleUpdateSpecification(**item)
                specs.append(spec)
            except ValueError as e:
                # Pydantic validation errors
                result.errors.append(f"Rule {idx + 1} validation error: {str(e)}")
                logger.error(
                    f"Error validating rule update specification {idx + 1}: {e}"
                )
            except Exception as e:
                result.errors.append(f"Rule {idx + 1} error: {str(e)}")
                logger.error(f"Error parsing rule update specification {idx + 1}: {e}")

        if not specs:
            logger.warning("No valid rule update specifications to process")
            return result

        # Get Atlan client
        client = get_atlan_client()

        # Update rules
        updated_assets = []
        for spec in specs:
            try:
                logger.debug(
                    f"Updating {spec.rule_type.value} rule: {spec.qualified_name}"
                )
                rule = _update_dq_rule(spec, client)
                updated_assets.append(rule)

            except Exception as e:
                error_msg = f"Error updating rule {spec.qualified_name}: {str(e)}"
                result.errors.append(error_msg)
                logger.error(error_msg)

        if not updated_assets:
            return result

        # Bulk save all updated rules
        logger.info(f"Saving {len(updated_assets)} updated data quality rules")
        response = client.asset.save(updated_assets)

        # Process response
        for updated_rule in response.mutated_entities.UPDATE:
            result.updated_rules.append(
                UpdatedRuleInfo(
                    guid=updated_rule.guid,
                    qualified_name=updated_rule.qualified_name,
                    rule_type=updated_rule.dq_rule_type
                    if hasattr(updated_rule, "dq_rule_type")
                    else None,
                )
            )

        result.updated_count = len(result.updated_rules)
        logger.info(f"Successfully updated {result.updated_count} data quality rules")

        return result

    except Exception as e:
        error_msg = f"Error in bulk rule update: {str(e)}"
        logger.error(error_msg)
        result.errors.append(error_msg)
        return result


def _update_dq_rule(spec: DQRuleUpdateSpecification, client) -> DataQualityRule:
    """
    Update a data quality rule based on specification.

    Args:
        spec (DQRuleUpdateSpecification): Rule update specification
        client: Atlan client instance

    Returns:
        DataQualityRule: Updated rule asset
    """
    logger.debug(f"Updating {spec.rule_type.value} rule: {spec.qualified_name}")

    # Base parameters - only qualified_name and client are required
    params = {
        "client": client,
        "qualified_name": spec.qualified_name,
    }

    # Add optional threshold parameters if provided
    if spec.threshold_value is not None:
        params["threshold_value"] = spec.threshold_value

    if spec.threshold_compare_operator:
        params["threshold_compare_operator"] = DataQualityRuleThresholdCompareOperator[
            spec.threshold_compare_operator
        ]

    if spec.threshold_unit:
        params["threshold_unit"] = DataQualityRuleThresholdUnit[spec.threshold_unit]

    if spec.alert_priority:
        params["alert_priority"] = DataQualityRuleAlertPriority[spec.alert_priority]

    # Add Custom SQL specific parameters if provided
    if spec.custom_sql:
        params["custom_sql"] = spec.custom_sql

    if spec.rule_name:
        params["rule_name"] = spec.rule_name

    if spec.dimension:
        params["dimension"] = DataQualityDimension[spec.dimension]

    # Add rule conditions if provided
    if spec.rule_conditions:
        params["rule_conditions"] = _build_rule_conditions(spec.rule_conditions)

    if spec.row_scope_filtering_enabled is not None:
        params["row_scope_filtering_enabled"] = spec.row_scope_filtering_enabled

    # Use the updater method from DataQualityRule
    updated_rule = DataQualityRule.updater(**params)

    # Add description if provided
    if spec.description:
        updated_rule.description = spec.description

    return updated_rule

```

--------------------------------------------------------------------------------
/modelcontextprotocol/server.py:
--------------------------------------------------------------------------------

```python
import argparse
import json
import os
from typing import Any, Dict, List
from fastmcp import FastMCP
from tools import (
    search_assets,
    get_assets_by_dsl,
    traverse_lineage,
    update_assets,
    query_asset,
    create_glossary_category_assets,
    create_glossary_assets,
    create_glossary_term_assets,
    create_data_domain_assets,
    create_data_product_assets,
    create_dq_rules,
    schedule_dq_rules,
    delete_dq_rules,
    update_dq_rules,
    UpdatableAttribute,
    CertificateStatus,
    UpdatableAsset,
    TermOperations,
)
from pyatlan.model.lineage import LineageDirection
from utils.parameters import (
    parse_json_parameter,
    parse_list_parameter,
)
from middleware import ToolRestrictionMiddleware
from settings import get_settings


mcp = FastMCP("Atlan MCP Server", dependencies=["pyatlan", "fastmcp"])

# Get restricted tools from environment variable or use default
restricted_tools_env = os.getenv("RESTRICTED_TOOLS", "")
if restricted_tools_env:
    restricted_tools = [
        tool.strip() for tool in restricted_tools_env.split(",") if tool.strip()
    ]
else:
    # Default configuration - modify this list to restrict specific tools
    restricted_tools = []

tool_restriction = ToolRestrictionMiddleware(restricted_tools=restricted_tools)
mcp.add_middleware(tool_restriction)


@mcp.tool()
def search_assets_tool(
    conditions=None,
    negative_conditions=None,
    some_conditions=None,
    min_somes=1,
    include_attributes=None,
    asset_type=None,
    include_archived=False,
    limit=10,
    offset=0,
    sort_by=None,
    sort_order="ASC",
    connection_qualified_name=None,
    tags=None,
    directly_tagged=True,
    domain_guids=None,
    date_range=None,
    guids=None,
):
    """
    Advanced asset search using FluentSearch with flexible conditions.

    Args:
        conditions (Dict[str, Any], optional): Dictionary of attribute conditions to match.
            Format: {"attribute_name": value} or {"attribute_name": {"operator": operator, "value": value}}
        negative_conditions (Dict[str, Any], optional): Dictionary of attribute conditions to exclude.
            Format: {"attribute_name": value} or {"attribute_name": {"operator": operator, "value": value}}
        some_conditions (Dict[str, Any], optional): Conditions for where_some() queries that require min_somes of them to match.
            Format: {"attribute_name": value} or {"attribute_name": {"operator": operator, "value": value}}
        min_somes (int): Minimum number of some_conditions that must match. Defaults to 1.
        include_attributes (List[Union[str, AtlanField]], optional): List of specific attributes to include in results.
            Can be string attribute names or AtlanField objects.
        asset_type (Union[Type[Asset], str], optional): Type of asset to search for.
            Either a class (e.g., Table, Column) or a string type name (e.g., "Table", "Column")
        include_archived (bool): Whether to include archived assets. Defaults to False.
        limit (int, optional): Maximum number of results to return. Defaults to 10.
        offset (int, optional): Offset for pagination. Defaults to 0.
        sort_by (str, optional): Attribute to sort by. Defaults to None.
        sort_order (str, optional): Sort order, "ASC" or "DESC". Defaults to "ASC".
        connection_qualified_name (str, optional): Connection qualified name to filter by. ex: default/snowflake/123456/abc
        tags (List[str], optional): List of tags to filter by.
        directly_tagged (bool): Whether to filter for directly tagged assets only. Defaults to True.
        domain_guids (List[str], optional): List of domain GUIDs to filter by.
        date_range (Dict[str, Dict[str, Any]], optional): Date range filters.
            Format: {"attribute_name": {"gte": start_timestamp, "lte": end_timestamp}}
        guids (List[str], optional): List of asset GUIDs to filter by.

    Returns:
        List[Asset]: List of assets matching the search criteria

    Raises:
        Exception: If there's an error executing the search

    Examples:
        # Search for verified tables
        tables = search_assets(
            asset_type="Table",
            conditions={"certificate_status": CertificateStatus.VERIFIED.value}
        )

        # Search for assets missing descriptions from the database/connection default/snowflake/123456/abc
        missing_desc = search_assets(
            connection_qualified_name="default/snowflake/123456/abc",
            negative_conditions={
                "description": "has_any_value",
                "user_description": "has_any_value"
            },
            include_attributes=["owner_users", "owner_groups"]
        )

        # Search for columns with specific certificate status
        columns = search_assets(
            asset_type="Column",
            some_conditions={
                "certificate_status": [CertificateStatus.DRAFT.value, CertificateStatus.VERIFIED.value]
            },
            tags=["PRD"],
            conditions={"created_by": "username"},
            date_range={"create_time": {"gte": 1641034800000, "lte": 1672570800000}}
        )
        # Search for assets with a specific search text
        assets = search_assets(
            conditions = {
                "name": {
                    "operator": "match",
                    "value": "search_text"
                },
                "description": {
                    "operator": "match",
                    "value": "search_text"
                }
            }
        )


        # Search for assets using advanced operators
        assets = search_assets(
            conditions={
                "name": {
                    "operator": "startswith",
                    "value": "prefix_",
                    "case_insensitive": True
                },
                "description": {
                    "operator": "contains",
                    "value": "important data",
                    "case_insensitive": True
                },
                "create_time": {
                    "operator": "between",
                    "value": [1640995200000, 1643673600000]
                }
            }
        )

        # For multiple asset types queries. ex: Search for Table, Column, or View assets from the database/connection default/snowflake/123456/abc
        assets = search_assets(
            connection_qualified_name="default/snowflake/123456/abc",
            conditions={
                "type_name": ["Table", "Column", "View"],
            }
        )

        # Search for assets with compliant business policy
        assets = search_assets(
            conditions={
                "asset_policy_guids": "business_policy_guid"
            },
            include_attributes=["asset_policy_guids"]
        )

        # Search for assets with non compliant business policy
        assets = search_assets(
            conditions={
                "non_compliant_asset_policy_guids": "business_policy_guid"
            },
            include_attributes=["non_compliant_asset_policy_guids"]
        )

        # get non compliant business policies for an asset
         assets = search_assets(
            conditions={
                "name": "has_any_value",
                "displayName": "has_any_value",
                "guid": "has_any_value"
            },
            include_attributes=["non_compliant_asset_policy_guids"]
        )

        # get compliant business policies for an asset
         assets = search_assets(
            conditions={
                "name": "has_any_value",
                "displayName": "has_any_value",
                "guid": "has_any_value"
            },
            include_attributes=["asset_policy_guids"]
        )

        # get incident for a business policy
         assets = search_assets(
            conditions={
                "asset_type": "BusinessPolicyIncident",
                "business_policy_incident_related_policy_guids": "business_policy_guid"
            },
            some_conditions={
                "certificate_status": [CertificateStatus.DRAFT.value, CertificateStatus.VERIFIED.value]
            }
        )

        # Search for glossary terms by name and status
        glossary_terms = search_assets(
            asset_type="AtlasGlossaryTerm",
            conditions={
                "certificate_status": CertificateStatus.VERIFIED.value,
                "name": {
                    "operator": "contains",
                    "value": "customer",
                    "case_insensitive": True
                }
            },
            include_attributes=["categories"]
        )

        # Find popular but expensive assets (cost optimization)
        search_assets(
            conditions={
                "popularityScore": {"operator": "gte", "value": 0.8},
                "sourceReadQueryCost": {"operator": "gte", "value": 1000}
            },
            include_attributes=["sourceReadExpensiveQueryRecordList", "sourceCostUnit"]
        )

        # Find unused assets accessed before 2024
        search_assets(
            conditions={"sourceLastReadAt": {"operator": "lt", "value": 1704067200000}}, # Unix epoch in milliseconds
            include_attributes=["sourceReadCount", "sourceLastReadAt"]
        )

        # Get top users for a specific table
        # Note: Can't directly filter by user, but can retrieve the list
        search_assets(
            conditions={"name": "customer_transactions"},
            include_attributes=["sourceReadTopUserList", "sourceReadUserCount"]
        )

        # Find frequently accessed uncertified assets (governance gap)
        search_assets(
            conditions={
                "sourceReadUserCount": {"operator": "gte", "value": 10},
                "certificate_status": {"operator": "ne", "value": "VERIFIED"}
            }
        )

        # Query assets in specific connection with cost filters
        search_assets(
            connection_qualified_name="default/snowflake/123456",
            conditions={"sourceTotalCost": {"operator": "gte", "value": 500}},
            sort_by="sourceTotalCost",
            sort_order="DESC",
            include_attributes=[
                "sourceReadQueryComputeCostRecordList",  # Shows breakdown by warehouse
                "sourceQueryComputeCostList",  # List of warehouses used
                "sourceCostUnit"
            ]
        )

    The search supports various analytics attributes following similar patterns:
    - Usage Metrics:
        - `sourceReadCount`, `sourceReadUserCount` - Filter by read frequency or user diversity
        - `sourceLastReadAt`, `lastRowChangedAt` - Time-based filtering (Unix timestamp in ms)
        - `popularityScore` - Float value 0-1 indicating asset popularity

    - Cost Metrics:
        - `sourceReadQueryCost`, `sourceTotalCost` - Filter by cost thresholds
        - Include `sourceCostUnit` in attributes to get cost units
        - Include `sourceReadExpensiveQueryRecordList` for detailed breakdowns

    - User Analytics:
        - `sourceReadTopUserList`, `sourceReadRecentUserList` - Get user lists
        - `sourceReadTopUserRecordList`, `sourceReadRecentUserRecordList` - Get detailed records

    - Query Analytics:
        - `sourceReadPopularQueryRecordList` - Popular queries for the asset
        - `lastRowChangedQuery` - Query that last modified the asset

    Additional attributes you can include in the conditions to extract more metadata from an asset:
        - columns
        - column_count
        - row_count
        - readme
        - owner_users
    """
    try:
        # Parse JSON string parameters if needed
        conditions = parse_json_parameter(conditions)
        negative_conditions = parse_json_parameter(negative_conditions)
        some_conditions = parse_json_parameter(some_conditions)
        date_range = parse_json_parameter(date_range)
        include_attributes = parse_list_parameter(include_attributes)
        tags = parse_list_parameter(tags)
        domain_guids = parse_list_parameter(domain_guids)
        guids = parse_list_parameter(guids)

        return search_assets(
            conditions,
            negative_conditions,
            some_conditions,
            min_somes,
            include_attributes,
            asset_type,
            include_archived,
            limit,
            offset,
            sort_by,
            sort_order,
            connection_qualified_name,
            tags,
            directly_tagged,
            domain_guids,
            date_range,
            guids,
        )
    except (json.JSONDecodeError, ValueError) as e:
        return {"error": f"Parameter parsing error: {str(e)}"}


@mcp.tool()
def get_assets_by_dsl_tool(dsl_query):
    """
    Execute the search with the given query
    dsl_query : Union[str, Dict[str, Any]] (required):
        The DSL query used to search the index.

    Example:
    dsl_query = '''{
    "query": {
        "function_score": {
            "boost_mode": "sum",
            "functions": [
                {"filter": {"match": {"starredBy": "john.doe"}}, "weight": 10},
                {"filter": {"match": {"certificateStatus": "VERIFIED"}}, "weight": 15},
                {"filter": {"match": {"certificateStatus": "DRAFT"}}, "weight": 10},
                {"filter": {"bool": {"must_not": [{"exists": {"field": "certificateStatus"}}]}}, "weight": 8},
                {"filter": {"bool": {"must_not": [{"terms": {"__typeName.keyword": ["Process", "DbtProcess"]}}]}}, "weight": 20}
            ],
            "query": {
                "bool": {
                    "filter": [
                        {
                            "bool": {
                                "minimum_should_match": 1,
                                "must": [
                                    {"bool": {"should": [{"terms": {"certificateStatus": ["VERIFIED"]}}]}},
                                    {"term": {"__state": "ACTIVE"}}
                                ],
                                "must_not": [
                                    {"term": {"isPartial": "true"}},
                                    {"terms": {"__typeName.keyword": ["Procedure", "DbtColumnProcess", "BIProcess", "MatillionComponent", "SnowflakeTag", "DbtTag", "BigqueryTag", "AIApplication", "AIModel"]}},
                                    {"terms": {"__typeName.keyword": ["MCIncident", "AnomaloCheck"]}}
                                ],
                                "should": [
                                    {"terms": {"__typeName.keyword": ["Query", "Collection", "AtlasGlossary", "AtlasGlossaryCategory", "AtlasGlossaryTerm", "Connection", "File"]}},
                                ]
                            }
                        }
                    ]
                },
                "score_mode": "sum"
            },
            "score_mode": "sum"
        }
    },
    "post_filter": {
        "bool": {
            "filter": [
                {
                    "bool": {
                        "must": [{"terms": {"__typeName.keyword": ["Table", "Column"]}}],
                        "must_not": [{"exists": {"field": "termType"}}]
                    }
                }
            ]
        },
        "sort": [
            {"_score": {"order": "desc"}},
            {"popularityScore": {"order": "desc"}},
            {"starredCount": {"order": "desc"}},
            {"name.keyword": {"order": "asc"}}
        ],
        "track_total_hits": true,
        "size": 10,
        "include_meta": false
    }'''
    response = get_assets_by_dsl(dsl_query)
    """
    return get_assets_by_dsl(dsl_query)


@mcp.tool()
def traverse_lineage_tool(
    guid,
    direction,
    depth=1000000,
    size=10,
    immediate_neighbors=True,
    include_attributes=None,
):
    """
    Traverse asset lineage in specified direction.

    By default, essential attributes are included in results. Additional attributes can be
    specified via include_attributes parameter for richer lineage information.

    Args:
        guid (str): GUID of the starting asset
        direction (str): Direction to traverse ("UPSTREAM" or "DOWNSTREAM")
        depth (int, optional): Maximum depth to traverse. Defaults to 1000000.
        size (int, optional): Maximum number of results to return. Defaults to 10.
        immediate_neighbors (bool, optional): Only return immediate neighbors. Defaults to True.
        include_attributes (List[str], optional): List of additional attribute names to include in results.
            These will be added to the default set.

    Default Attributes (always included):
        - name, display_name, description, qualified_name, user_description
        - certificate_status, owner_users, owner_groups
        - connector_name, has_lineage, source_created_at, source_updated_at
        - readme, asset_tags

    Returns:
        Dict[str, Any]: Dictionary containing:
            - assets: List of assets in the lineage with processed attributes
            - error: None if no error occurred, otherwise the error message

    Examples:
        # Get lineage with default attributes
        lineage = traverse_lineage_tool(
            guid="asset-guid-here",
            direction="DOWNSTREAM",
            depth=1000,
            size=10
        )
    """
    try:
        direction_enum = LineageDirection[direction.upper()]
    except KeyError:
        raise ValueError(
            f"Invalid direction: {direction}. Must be either 'UPSTREAM' or 'DOWNSTREAM'"
        )

    # Parse include_attributes parameter if provided
    parsed_include_attributes = parse_list_parameter(include_attributes)

    return traverse_lineage(
        guid=guid,
        direction=direction_enum,
        depth=int(depth),
        size=int(size),
        immediate_neighbors=bool(immediate_neighbors),
        include_attributes=parsed_include_attributes,
    )


@mcp.tool()
def update_assets_tool(
    assets,
    attribute_name,
    attribute_values,
):
    """
    Update one or multiple assets with different values for attributes or term operations.

    Args:
        assets (Union[Dict[str, Any], List[Dict[str, Any]]]): Asset(s) to update.
            Can be a single UpdatableAsset or a list of UpdatableAsset objects.
            For asset of type_name=AtlasGlossaryTerm or type_name=AtlasGlossaryCategory, each asset dictionary MUST include a "glossary_guid" key which is the GUID of the glossary that the term belongs to.
        attribute_name (str): Name of the attribute to update.
            Supports "user_description", "certificate_status", "readme", and "term".
        attribute_values (List[Union[str, Dict[str, Any]]]): List of values to set for the attribute.
            For certificateStatus, only "VERIFIED", "DRAFT", or "DEPRECATED" are allowed.
            For readme, the value must be a valid Markdown string.
            For term, the value must be a dict with "operation" and "term_guids" keys.

    Returns:
        Dict[str, Any]: Dictionary containing:
            - updated_count: Number of assets successfully updated
            - errors: List of any errors encountered
            - operation: The operation that was performed (for term operations)

    Examples:
        # Update certificate status for a single asset
        result = update_assets_tool(
            assets={
                "guid": "asset-guid-here",
                "name": "Asset Name",
                "type_name": "Asset Type Name",
                "qualified_name": "Asset Qualified Name"
            },
            attribute_name="certificate_status",
            attribute_values=["VERIFIED"]
        )

        # Update user description for multiple assets
        result = update_assets_tool(
            assets=[
                {
                    "guid": "asset-guid-1",
                    "name": "Asset Name 1",
                    "type_name": "Asset Type Name 1",
                    "qualified_name": "Asset Qualified Name 1"
                },
                {
                    "guid": "asset-guid-2",
                    "name": "Asset Name 2",
                    "type_name": "Asset Type Name 2",
                    "qualified_name": "Asset Qualified Name 2"
                }
            ],
            attribute_name="user_description",
            attribute_values=[
                "New description for asset 1", "New description for asset 2"
            ]
        )

        # Update readme for a single asset with Markdown
        result = update_assets_tool(
            assets={
                "guid": "asset-guid-here",
                "name": "Asset Name",
                "type_name": "Asset Type Name",
                "qualified_name": "Asset Qualified Name"
            },
            attribute_name="readme",
            attribute_values=['''# Customer Data Table
            Contains customer transaction records for analytics.
            **Key Info:**
            - Updated daily at 2 AM
            - Contains PII data
            - [Documentation](https://docs.example.com)''']
        )

        # Append terms to a single asset
        result = update_assets_tool(
            assets={
                "guid": "asset-guid-here",
                "name": "Customer Name Column",
                "type_name": "Column",
                "qualified_name": "default/snowflake/123456/abc/CUSTOMER_NAME"
            },
            attribute_name="term",
            attribute_values=[{
                "operation": "append",
                "term_guids": ["term-guid-1", "term-guid-2"]
            }]
        )

        # Replace all terms on multiple assets
        result = update_assets_tool(
            assets=[
                {
                    "guid": "asset-guid-1",
                    "name": "Table 1",
                    "type_name": "Table",
                    "qualified_name": "default/snowflake/123456/abc/TABLE_1"
                },
                {
                    "guid": "asset-guid-2",
                    "name": "Table 2",
                    "type_name": "Table",
                    "qualified_name": "default/snowflake/123456/abc/TABLE_2"
                }
            ],
            attribute_name="term",
            attribute_values=[
                {
                    "operation": "replace",
                    "term_guids": ["new-term-for-table-1-guid-1", "new-term-for-table-1-guid-2"]
                },
                {
                    "operation": "replace",
                    "term_guids": ["new-term-for-table-2-guid-1", "new-term-for-table-2-guid-2"]
                }
            ]
        )

        # Remove specific terms from an asset
        result = update_assets_tool(
            assets={
                "guid": "asset-guid-here",
                "name": "Customer Data Table",
                "type_name": "Table",
                "qualified_name": "default/snowflake/123456/abc/CUSTOMER_DATA"
            },
            attribute_name="term",
            attribute_values=[{
                "operation": "remove",
                "term_guids": ["term-guid-to-remove"]
            }]
        )
    """
    try:
        # Parse JSON parameters
        parsed_assets = parse_json_parameter(assets)
        parsed_attribute_values = parse_list_parameter(attribute_values)

        # Convert string attribute name to enum
        attr_enum = UpdatableAttribute(attribute_name)

        # Handle term operations - convert dict to TermOperations object
        if attr_enum == UpdatableAttribute.TERM:
            term_operations = []
            for value in parsed_attribute_values:
                if isinstance(value, dict):
                    term_operations.append(TermOperations(**value))
                else:
                    return {
                        "error": "Term attribute values must be dictionaries with 'operation' and 'term_guids' keys",
                        "updated_count": 0,
                    }
            parsed_attribute_values = term_operations
        # For certificate status, convert values to enum
        elif attr_enum == UpdatableAttribute.CERTIFICATE_STATUS:
            parsed_attribute_values = [
                CertificateStatus(val) for val in parsed_attribute_values
            ]

        # Convert assets to UpdatableAsset objects
        if isinstance(parsed_assets, dict):
            updatable_assets = [UpdatableAsset(**parsed_assets)]
        else:
            updatable_assets = [UpdatableAsset(**asset) for asset in parsed_assets]

        return update_assets(
            updatable_assets=updatable_assets,
            attribute_name=attr_enum,
            attribute_values=parsed_attribute_values,
        )
    except (json.JSONDecodeError, ValueError, TypeError) as e:
        return {
            "error": f"Parameter parsing/conversion error: {str(e)}",
            "updated_count": 0,
        }


@mcp.tool()
def query_asset_tool(
    sql: str, connection_qualified_name: str, default_schema: str | None = None
):
    """
    Execute a SQL query on a table/view asset.

    This tool enables querying table/view assets on the source similar to
    what's available in the insights table. It uses the Atlan query capabilities
    to execute SQL against connected data sources.

    CRITICAL: Use READ-ONLY queries to retrieve data. Write and modify queries are not supported by this tool.


    Args:
        sql (str): The SQL query to execute (read-only queries allowed)
        connection_qualified_name (str): Connection qualified name to use for the query.
            This is the same parameter used in search_assets_tool.
            You can find this value by searching for Table/View assets using search_assets_tool
            and extracting the first part of the 'qualifiedName' attribute.
            Example: from "default/snowflake/1657275059/LANDING/FRONTEND_PROD/PAGES"
            use "default/snowflake/1657275059"
        default_schema (str, optional): Default schema name to use for unqualified
            objects in the SQL, in the form "DB.SCHEMA"
            (e.g., "RAW.WIDEWORLDIMPORTERS_WAREHOUSE")

    Examples:
        # Use case: How to query the PAGES table and retrieve the first 10 rows
        # Find tables to query using search_assets_tool
        tables = search_assets_tool(
            asset_type="Table",
            conditions={"name": "PAGES"},
            limit=5
        )
        # Extract connection info from the table's qualifiedName
        # Example qualifiedName: "default/snowflake/1657275059/LANDING/FRONTEND_PROD/PAGES"
        # connection_qualified_name: "default/snowflake/1657275059"
        # database.schema: "LANDING.FRONTEND_PROD"

        # Query the table using extracted connection info
        result = query_asset_tool(
            sql='SELECT * FROM PAGES LIMIT 10',
            connection_qualified_name="default/snowflake/1657275059",
            default_schema="LANDING.FRONTEND_PROD"
        )

        # Query without specifying default schema (fully qualified table names)
        result = query_asset_tool(
            sql='SELECT COUNT(*) FROM "LANDING"."FRONTEND_PROD"."PAGES"',
            connection_qualified_name="default/snowflake/1657275059"
        )

        # Complex analytical query on PAGES table
        result = query_asset_tool(
            sql='''
            SELECT
                page_type,
                COUNT(*) AS page_count,
                AVG(load_time) AS avg_load_time,
                MAX(views) AS max_views
            FROM PAGES
            WHERE created_date >= '2024-01-01'
            GROUP BY page_type
            ORDER BY page_count DESC
            ''',
            connection_qualified_name="default/snowflake/1657275059",
            default_schema="LANDING.FRONTEND_PROD"
        )
    """
    return query_asset(sql, connection_qualified_name, default_schema)


@mcp.tool()
def create_glossaries(glossaries) -> List[Dict[str, Any]]:
    """
    Create one or multiple AtlasGlossary assets in Atlan.

    IMPORTANT BUSINESS RULES & CONSTRAINTS:
    - Check for duplicate names within the same request and ask user to choose different names
    - Do NOT use search tool before creating glossaries - Atlan will handle existence validation
    - If user gives ambiguous instructions, ask clarifying questions

    Args:
        glossaries (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single glossary
            specification (dict) or a list of glossary specifications. Each specification
            can be a dictionary containing:
            - name (str): Name of the glossary (required)
            - user_description (str, optional): Detailed description of the glossary
              proposed by the user
            - certificate_status (str, optional): Certification status
              ("VERIFIED", "DRAFT", or "DEPRECATED")

    Returns:
        List[Dict[str, Any]]: List of dictionaries, each with details for a created glossary:
            - guid: The GUID of the created glossary
            - name: The name of the glossary
            - qualified_name: The qualified name of the created glossary


    Examples:
        Multiple glossaries creation:
        [
            {
                "name": "Business Terms",
                "user_description": "Common business terminology",
                "certificate_status": "VERIFIED"
            },
            {
                "name": "Technical Dictionary",
                "user_description": "Technical terminology and definitions",
                "certificate_status": "DRAFT"
            }
        ]
    """

    # Parse parameters to handle JSON strings using shared utility
    try:
        glossaries = parse_json_parameter(glossaries)
    except json.JSONDecodeError as e:
        return {"error": f"Invalid JSON format for glossaries parameter: {str(e)}"}

    return create_glossary_assets(glossaries)


@mcp.tool()
def create_glossary_terms(terms) -> List[Dict[str, Any]]:
    """
    Create one or multiple AtlasGlossaryTerm assets in Atlan.

    IMPORTANT BUSINESS RULES & CONSTRAINTS:
    - Within a glossary, a term (single GUID) can be associated with many categories
    - Two terms with the same name CANNOT exist within the same glossary (regardless of categories)
    - A term is always anchored to a glossary and may also be associated with one or more categories inside the same glossary
    - Before creating a term, perform a single search to check if the glossary, categories, or term with the same name already exist. Search for all relevant glossaries, categories, and terms in one call. Skip this step if you already have the required GUIDs.
    - Example call for searching glossary categories and terms before term creation(Query - create a term fighterz under category Characters and Locations under Marvel Cinematic Universe (MCU) glossary):
        {
            "limit": 10,
            "conditions": {
                "type_name": ["AtlasGlossary", "AtlasGlossaryCategory","AtlasGlossaryTerm"],
                "name": ["Marvel Cinematic Universe (MCU)", "Characters", "Locations","fighterz"]
            }
        }

    Args:
        terms (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single term
            specification (dict) or a list of term specifications. Each specification
            can be a dictionary containing:
            - name (str): Name of the term (required)
            - glossary_guid (str): GUID of the glossary this term belongs to (required)
            - user_description (str, optional): Detailed description of the term
              proposed by the user
            - certificate_status (str, optional): Certification status
              ("VERIFIED", "DRAFT", or "DEPRECATED")
            - category_guids (List[str], optional): List of category GUIDs this term
              belongs to.

    Returns:
        List[Dict[str, Any]]: List of dictionaries, each with details for a created term:
            - guid: The GUID of the created term
            - name: The name of the term
            - qualified_name: The qualified name of the created term

    Examples:
        Multiple terms creation:
        [
            {
                "name": "Customer",
                "glossary_guid": "glossary-guid-here",
                "user_description": "An individual or organization that purchases goods or services",
                "certificate_status": "VERIFIED"
            },
            {
                "name": "Annual Recurring Revenue",
                "glossary_guid": "glossary-guid-here",
                "user_description": "The yearly value of recurring revenue from customers",
                "certificate_status": "DRAFT",
                "category_guids": ["category-guid-1"]
            }
        ]
    """
    # Parse parameters to handle JSON strings using shared utility
    try:
        terms = parse_json_parameter(terms)
    except json.JSONDecodeError as e:
        return {"error": f"Invalid JSON format for terms parameter: {str(e)}"}

    return create_glossary_term_assets(terms)


@mcp.tool()
def create_glossary_categories(categories) -> List[Dict[str, Any]]:
    """
    Create one or multiple AtlasGlossaryCategory assets in Atlan.

    IMPORTANT BUSINESS RULES & CONSTRAINTS:
    - There cannot be two categories with the same name under the same glossary (at the same level)
    - Under a parent category, there cannot be subcategories with the same name (at the same level)
    - Categories with the same name can exist under different glossaries (this is allowed)
    - Cross-level naming is allowed: category "a" can have subcategory "b", and category "b" can have subcategory "a"
    - Example allowed structure: Glossary "bui" → category "a" → subcategory "b" AND category "b" → subcategory "a"
    - Always check for duplicate names at the same level and ask user to choose different names
    - Before creating a category, perform a single search to check if the glossary or categories with the same name already exist. Skip this step if you already have the required GUIDs.
    - Example call for searching glossary and categories before category creation(Query - create categories Locations and Characters under Marvel Cinematic Universe (MCU) glossary):
        {
            "limit": 10,
            "conditions": {
                "type_name": ["AtlasGlossary", "AtlasGlossaryCategory"],
                "name": ["Marvel Cinematic Universe (MCU)", "Characters", "Locations"]
            }
        }
    - If user gives ambiguous instructions, ask clarifying questions

    Args:
        categories (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single category
            specification (dict) or a list of category specifications. Each specification
            can be a dictionary containing:
            - name (str): Name of the category (required)
            - glossary_guid (str): GUID of the glossary this category belongs to (required)
            - user_description (str, optional): Detailed description of the category
              proposed by the user
            - certificate_status (str, optional): Certification status
              ("VERIFIED", "DRAFT", or "DEPRECATED")
            - parent_category_guid (str, optional): GUID of the parent category if this
              is a subcategory

    Returns:
        List[Dict[str, Any]]: List of dictionaries, each with details for a created category:
            - guid: The GUID of the created category
            - name: The name of the category
            - qualified_name: The qualified name of the created category

    Examples:
        Multiple categories creation:
        [
            {
                "name": "Customer Data",
                "glossary_guid": "glossary-guid-here",
                "user_description": "Terms related to customer information and attributes",
                "certificate_status": "VERIFIED"
            },
            {
                "name": "PII",
                "glossary_guid": "glossary-guid-here",
                "parent_category_guid": "parent-category-guid-here",
                "user_description": "Subcategory for PII terms",
                "certificate_status": "DRAFT"
            }
        ]
    """
    # Parse parameters to handle JSON strings using shared utility
    try:
        categories = parse_json_parameter(categories)
    except json.JSONDecodeError as e:
        return {"error": f"Invalid JSON format for categories parameter: {str(e)}"}

    return create_glossary_category_assets(categories)


@mcp.tool()
def create_domains(domains) -> List[Dict[str, Any]]:
    """
    Create Data Domains or Sub Domains in Atlan.

    IMPORTANT BUSINESS RULES & CONSTRAINTS:
    - Before creating a domain/subdomain, you may want to search for existing
      domains to avoid duplicates or to get the qualified_name for parent relationships
    - Domain names must be unique at the top level
    - Subdomain names must be unique within the same parent domain

    Args:
        domains (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single domain
            specification (dict) or a list of domain specifications.

    For Data Domain:
        - name (str): Name of the domain (required)
        - user_description (str, optional): Detailed description
        - certificate_status (str, optional): "VERIFIED", "DRAFT", or "DEPRECATED"

    For Sub Domain:
        - name (str): Name of the subdomain (required)
        - parent_domain_qualified_name (str): Qualified name of parent domain (required)
        - user_description (str, optional): Detailed description
        - certificate_status (str, optional): "VERIFIED", "DRAFT", or "DEPRECATED"

    Returns:
        List[Dict[str, Any]]: List of dictionaries, each with details for a created asset:
            - guid: The GUID of the created asset
            - name: The name of the asset
            - qualified_name: The qualified name of the created asset

    Examples:
        # Create a single Data Domain
        create_domains({
            "name": "Marketing",
            "user_description": "Marketing data domain",
            "certificate_status": "VERIFIED"
        })

        # Create a Sub Domain under an existing domain
        create_domains({
            "name": "Social Marketing",
            "parent_domain_qualified_name": "default/domain/marketing",
            "user_description": "Social media marketing subdomain",
            "certificate_status": "DRAFT"
        })

        # Create multiple domains in one call
        create_domains([
            {
                "name": "Sales",
                "user_description": "Sales data domain"
            },
            {
                "name": "E-commerce Sales",
                "parent_domain_qualified_name": "default/domain/sales",
                "user_description": "E-commerce sales subdomain"
            }
        ])
    """
    # Parse parameters to handle JSON strings using shared utility
    try:
        domains = parse_json_parameter(domains)
    except json.JSONDecodeError as e:
        return {"error": f"Invalid JSON format for domains parameter: {str(e)}"}

    return create_data_domain_assets(domains)


@mcp.tool()
def create_data_products(products) -> List[Dict[str, Any]]:
    """
    Create Data Products in Atlan.

    IMPORTANT BUSINESS RULES & CONSTRAINTS:
    - Before creating a product, you may want to search for existing domains
      to get the qualified_name for the domain relationship
    - Product names must be unique within the same domain
    - At least one asset GUID must be provided for each product

    Args:
        products (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single product
            specification (dict) or a list of product specifications.

    For Data Product:
        - name (str): Name of the product (required)
        - domain_qualified_name (str): Qualified name of the domain (required)
        - asset_guids (List[str]): List of asset GUIDs to link to this product (required).
          At least one asset GUID must be provided. Use search_assets_tool to find asset GUIDs.
        - user_description (str, optional): Detailed description
        - certificate_status (str, optional): "VERIFIED", "DRAFT", or "DEPRECATED"

    Returns:
        List[Dict[str, Any]]: List of dictionaries, each with details for a created asset:
            - guid: The GUID of the created asset
            - name: The name of the asset
            - qualified_name: The qualified name of the created asset

    Examples:
        # Create a Data Product with linked assets (asset_guids required)
        # First, search for assets to get their GUIDs using search_assets_tool
        create_data_products({
            "name": "Marketing Influence",
            "domain_qualified_name": "default/domain/marketing",
            "user_description": "Product for marketing influence analysis",
            "asset_guids": ["asset-guid-1", "asset-guid-2"]  # GUIDs from search_assets_tool
        })

        # Create multiple products in one call
        create_data_products([
            {
                "name": "Sales Analytics",
                "domain_qualified_name": "default/domain/sales",
                "user_description": "Sales analytics product",
                "asset_guids": ["table-guid-1", "table-guid-2"]
            },
            {
                "name": "Customer Insights",
                "domain_qualified_name": "default/domain/marketing",
                "user_description": "Customer insights product",
                "asset_guids": ["view-guid-1"]
            }
        ])
    """
    # Parse parameters to handle JSON strings using shared utility
    try:
        products = parse_json_parameter(products)
    except json.JSONDecodeError as e:
        return {"error": f"Invalid JSON format for products parameter: {str(e)}"}

    return create_data_product_assets(products)


@mcp.tool()
def create_dq_rules_tool(rules):
    """
    Create one or multiple data quality rules in Atlan.

    Supports all rule types: column-level, table-level, and custom SQL rules.
    Rules can be created individually or in bulk for efficient setup.

    Args:
        rules (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single rule
            specification or a list of rule specifications. Each specification
            must include:
            - rule_type (str): Type of rule (see Supported Rule Types) [REQUIRED]
            - asset_qualified_name (str): Qualified name of the asset (Table, View, MaterialisedView, or SnowflakeDynamicTable) [REQUIRED]
            - asset_type (str): Type of asset - "Table" | "View" | "MaterialisedView" | "SnowflakeDynamicTable" [OPTIONAL, default: "Table"]
            - threshold_value (int/float): Threshold value for comparison [REQUIRED]
            - column_qualified_name (str): Column qualified name [REQUIRED for column-level rules, NOT for Row Count/Custom SQL]
            - threshold_compare_operator (str): Comparison operator (EQUAL, GREATER_THAN, etc.) [OPTIONAL, default varies by rule]
            - threshold_unit (str): Time unit for Freshness rules (DAYS, HOURS, MINUTES) [REQUIRED for Freshness, N/A for others]
            - alert_priority (str): Alert priority level (LOW, NORMAL, URGENT) [OPTIONAL, default: NORMAL]
            - row_scope_filtering_enabled (bool): Enable row-level filtering [OPTIONAL]
            - rule_conditions (List[Dict]): Conditions for String Length/Regex/Valid Values [REQUIRED for conditional rules]
            - custom_sql (str): SQL query [REQUIRED for Custom SQL rules]
            - rule_name (str): Name for the rule [REQUIRED for Custom SQL rules]
            - dimension (str): DQ dimension [REQUIRED for Custom SQL rules]
            - description (str): Rule description [OPTIONAL]

    Returns:
        Dict[str, Any]: Dictionary containing:
            - created_count: Number of rules successfully created
            - created_rules: List of created rules with guid, qualified_name, rule_type
            - errors: List of any errors encountered

    Examples:
        # Column-level rules (Null Count, Min/Max Value, Unique/Duplicate Count, etc.)
        rule = create_dq_rules_tool({
            "rule_type": "Null Count",  # or "Min Value", "Max Value", "Unique Count", etc.
            "asset_qualified_name": "default/snowflake/123/DB/SCHEMA/TABLE",
            "column_qualified_name": "default/snowflake/123/DB/SCHEMA/TABLE/EMAIL",
            "threshold_compare_operator": "LESS_THAN_EQUAL",  # EQUAL, GREATER_THAN, etc.
            "threshold_value": 5,
            "alert_priority": "URGENT",  # LOW, NORMAL, URGENT
            "row_scope_filtering_enabled": True,
            "description": "Email column should have minimal nulls"
        })

        # Conditional rules (String Length, Regex, Valid Values)
        rule = create_dq_rules_tool({
            "rule_type": "String Length",  # or "Regex", "Valid Values"
            "asset_qualified_name": "default/snowflake/123/DB/SCHEMA/TABLE",
            "column_qualified_name": "default/snowflake/123/DB/SCHEMA/TABLE/PHONE",
            "threshold_value": 10,
            "alert_priority": "URGENT",
            "rule_conditions": [{
                "type": "STRING_LENGTH_BETWEEN",  # See Rule Condition Types below
                "min_value": 10,
                "max_value": 15
            }],
            # For Regex: {"type": "REGEX_NOT_MATCH", "value": "pattern"}
            # For Valid Values: {"type": "IN_LIST", "value": ["ACTIVE", "INACTIVE"]}
            "row_scope_filtering_enabled": True
        })

        # Table-level (Row Count) and Time-based (Freshness)
        rule = create_dq_rules_tool({
            "rule_type": "Row Count",  # No column_qualified_name needed
            "asset_qualified_name": "default/snowflake/123/DB/SCHEMA/TABLE",
            "asset_type": "Table",  # Optional: "Table" (default), "View", "MaterialisedView", "SnowflakeDynamicTable"
            "threshold_compare_operator": "GREATER_THAN_EQUAL",
            "threshold_value": 1000,
            "alert_priority": "URGENT"
        })
        # For Freshness: Add "column_qualified_name" + "threshold_unit": "DAYS"/"HOURS"/"MINUTES"

        # Custom SQL rule
        rule = create_dq_rules_tool({
            "rule_type": "Custom SQL",
            "asset_qualified_name": "default/snowflake/123/DB/SCHEMA/TABLE",
            "rule_name": "Revenue Consistency Check",
            "custom_sql": "SELECT COUNT(*) FROM TABLE WHERE revenue < 0 OR revenue > 1000000",
            "threshold_compare_operator": "EQUAL",
            "threshold_value": 0,
            "alert_priority": "URGENT",
            "dimension": "CONSISTENCY",  # See Data Quality Dimensions below
            "description": "Ensure revenue values are within expected range"
        })

        # Bulk creation - Pass array instead of single dict
        rules = create_dq_rules_tool([
            {"rule_type": "Null Count", "column_qualified_name": "...EMAIL", ...},
            {"rule_type": "Duplicate Count", "column_qualified_name": "...USER_ID", ...},
            {"rule_type": "Row Count", "asset_qualified_name": "...", ...}
        ])

    Supported Rule Types:
        Completeness: "Null Count", "Null Percentage", "Blank Count", "Blank Percentage"
        Statistical: "Min Value", "Max Value", "Average", "Standard Deviation"
        Uniqueness: "Unique Count", "Duplicate Count"
        Validity: "Regex", "String Length", "Valid Values"
        Timeliness: "Freshness"
        Volume: "Row Count"
        Custom: "Custom SQL"

    Supported Asset Types:
        "Table", "View", "MaterialisedView", "SnowflakeDynamicTable"

    Valid Alert Priority Levels:
        "LOW", "NORMAL" (default), "URGENT"

    Threshold Operators:
        "EQUAL", "GREATER_THAN", "GREATER_THAN_EQUAL", "LESS_THAN", "LESS_THAN_EQUAL", "BETWEEN"

    Threshold Units (Freshness only):
        "DAYS", "HOURS", "MINUTES"

    Data Quality Dimensions (Custom SQL only):
        "COMPLETENESS", "VALIDITY", "UNIQUENESS", "TIMELINESS", "VOLUME", "ACCURACY", "CONSISTENCY"

    Rule Condition Types:
        String Length: "STRING_LENGTH_EQUALS", "STRING_LENGTH_BETWEEN",
                      "STRING_LENGTH_GREATER_THAN", "STRING_LENGTH_LESS_THAN"
        Regex: "REGEX_MATCH", "REGEX_NOT_MATCH"
        Valid Values: "IN_LIST", "NOT_IN_LIST"
    """
    try:
        parsed_rules = parse_json_parameter(rules)
        return create_dq_rules(parsed_rules)
    except (json.JSONDecodeError, ValueError) as e:
        return {
            "created_count": 0,
            "created_rules": [],
            "errors": [f"Parameter parsing error: {str(e)}"],
        }


@mcp.tool()
def schedule_dq_rules_tool(schedules):
    """
    Schedule data quality rule execution for one or multiple assets.

    Args:
        schedules: Single schedule or list of schedules. Each schedule requires:
            - asset_type (str): "Table", "View", "MaterialisedView", or "SnowflakeDynamicTable"
            - asset_name (str): Name of the asset
            - asset_qualified_name (str): Qualified name of the asset
            - schedule_crontab (str): Cron expression (5 fields: min hour day month weekday)
            - schedule_time_zone (str): Timezone (e.g., "UTC", "America/New_York")

    Returns:
        Dict with scheduled_count, scheduled_assets, and errors.

    Example:
        schedule_dq_rules_tool({
            "asset_type": "Table",
            "asset_name": "CUSTOMERS",
            "asset_qualified_name": "default/snowflake/123/DB/SCHEMA/CUSTOMERS",
            "schedule_crontab": "0 2 * * *",
            "schedule_time_zone": "UTC"
        })
    """
    try:
        parsed_schedules = parse_json_parameter(schedules)
        return schedule_dq_rules(parsed_schedules)
    except (json.JSONDecodeError, ValueError) as e:
        return {
            "scheduled_count": 0,
            "scheduled_assets": [],
            "errors": [f"Parameter parsing error: {str(e)}"],
        }


@mcp.tool()
def delete_dq_rules_tool(rule_guids):
    """
    Delete one or multiple data quality rules in Atlan.

    Args:
        rule_guids: Single rule GUID (string) or list of rule GUIDs to delete.

    Returns:
        Dict with deleted_count, deleted_rules (list of GUIDs), and errors.

    Example:
        # Delete single rule
        delete_dq_rules_tool("rule-guid-123")

        # Delete multiple rules
        delete_dq_rules_tool(["rule-guid-1", "rule-guid-2"])
    """
    try:
        parsed_guids = parse_json_parameter(rule_guids)
        return delete_dq_rules(parsed_guids)
    except (json.JSONDecodeError, ValueError) as e:
        return {
            "deleted_count": 0,
            "deleted_rules": [],
            "errors": [f"Parameter parsing error: {str(e)}"],
        }


@mcp.tool()
def update_dq_rules_tool(rules):
    """
    Update existing data quality rules in Atlan.

    Args:
        rules: Single rule dict or list of rule dicts. Required fields:
            - qualified_name: Rule's qualified name
            - rule_type: Rule type (e.g., "Null Count", "Row Count", "Custom SQL")
            - asset_qualified_name: Table/view qualified name
        Optional fields: threshold_value, threshold_compare_operator, threshold_unit,
        alert_priority, custom_sql, rule_name, dimension, rule_conditions,
        row_scope_filtering_enabled, description

    Returns:
        Dict with updated_count, updated_rules, and errors.

    Examples:
        # Single rule update
        update_dq_rules_tool({
            "qualified_name": "default/snowflake/123/DB/SCHEMA/TABLE/rule/abc-123",
            "rule_type": "Null Count",
            "asset_qualified_name": "default/snowflake/123/DB/SCHEMA/TABLE",
            "threshold_value": 10,
            "alert_priority": "URGENT"
        })

        # Bulk update with conditions
        update_dq_rules_tool([
            {"qualified_name": "...", "rule_type": "Null Count", "threshold_value": 5},
            {"qualified_name": "...", "rule_type": "String Length",
             "rule_conditions": [{"type": "STRING_LENGTH_BETWEEN", "min_value": 10, "max_value": 100}]}
        ])

    Rule Types: "Null Count", "Null Percentage", "Blank Count", "Blank Percentage",
    "Min Value", "Max Value", "Average", "Standard Deviation", "Unique Count",
    "Duplicate Count", "Regex", "String Length", "Valid Values", "Freshness",
    "Row Count", "Custom SQL"

    Alert Priority: "LOW", "NORMAL", "URGENT"
    Operators: "EQUAL", "GREATER_THAN", "GREATER_THAN_EQUAL", "LESS_THAN",
               "LESS_THAN_EQUAL", "BETWEEN"
    Threshold Units: "DAYS", "HOURS", "MINUTES" (Freshness only)
    Dimensions: "COMPLETENESS", "VALIDITY", "UNIQUENESS", "TIMELINESS", "VOLUME",
                "ACCURACY", "CONSISTENCY" (Custom SQL only)
    Condition Types: "STRING_LENGTH_EQUALS", "STRING_LENGTH_BETWEEN",
                     "STRING_LENGTH_GREATER_THAN", "STRING_LENGTH_LESS_THAN",
                     "REGEX_MATCH", "REGEX_NOT_MATCH", "IN_LIST", "NOT_IN_LIST"
    """
    try:
        parsed_rules = parse_json_parameter(rules)
        return update_dq_rules(parsed_rules)
    except (json.JSONDecodeError, ValueError) as e:
        return {
            "updated_count": 0,
            "updated_rules": [],
            "errors": [f"Parameter parsing error: {str(e)}"],
        }


def main():
    """Main entry point for the Atlan MCP Server."""

    settings = get_settings()

    parser = argparse.ArgumentParser(description="Atlan MCP Server")
    parser.add_argument(
        "--transport",
        type=str,
        default=settings.MCP_TRANSPORT,
        choices=["stdio", "sse", "streamable-http"],
        help="Transport protocol (stdio/sse/streamable-http)",
    )
    parser.add_argument(
        "--host",
        type=str,
        default=settings.MCP_HOST,
        help="Host to run the server on",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=settings.MCP_PORT,
        help="Port to run the server on",
    )
    parser.add_argument(
        "--path",
        type=str,
        default=settings.MCP_PATH,
        help="Path of the streamable HTTP server",
    )
    args = parser.parse_args()

    kwargs = {"transport": args.transport}
    if args.transport == "streamable-http" or args.transport == "sse":
        kwargs = {
            "transport": args.transport,
            "host": args.host,
            "port": args.port,
            "path": args.path,
        }
    # Run the server with the specified transport and host/port/path
    mcp.run(**kwargs)


if __name__ == "__main__":
    main()

```