This is page 1 of 2. Use http://codebase.md/atlanhq/agent-toolkit?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .github
│ ├── CODEOWNERS
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── custom.md
│ │ └── feature_request.md
│ ├── SECURITY.md
│ └── workflows
│ ├── checks.yml
│ └── mcp-server-release.yml
├── .gitignore
├── .pre-commit-config.yaml
├── CHANGELOG.md
├── CODE_OF_CONDUCT.md
├── commitlint.config.js
├── CONTRIBUTING.md
├── LICENSE
├── modelcontextprotocol
│ ├── .cursor
│ │ └── rules
│ │ ├── mcp-guidelines.mdc
│ │ ├── project-structure.mdc
│ │ ├── python.mdc
│ │ └── tool-development-guide.mdc
│ ├── .dockerignore
│ ├── .env.template
│ ├── .python-version
│ ├── client.py
│ ├── Dockerfile
│ ├── docs
│ │ ├── DEPLOYMENT.md
│ │ └── LOCAL_BUILD.md
│ ├── middleware.py
│ ├── pyproject.toml
│ ├── README.md
│ ├── server.py
│ ├── settings.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── assets.py
│ │ ├── domain.py
│ │ ├── dq_rules.py
│ │ ├── dsl.py
│ │ ├── glossary.py
│ │ ├── lineage.py
│ │ ├── models.py
│ │ ├── query.py
│ │ └── search.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── assets.py
│ │ ├── constants.py
│ │ ├── parameters.py
│ │ └── search.py
│ ├── uv.lock
│ └── version.py
└── README.md
```
# Files
--------------------------------------------------------------------------------
/modelcontextprotocol/.python-version:
--------------------------------------------------------------------------------
```
1 | 3.11
2 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/.env.template:
--------------------------------------------------------------------------------
```
1 | ATLAN_BASE_URL=https://domain.atlan.com
2 | ATLAN_API_KEY=your_api_key
3 | ATLAN_AGENT_ID=your_agent_id
4 | MCP_TRANSPORT="stdio" # "stdio" , "sse" or "streamable-http"
5 | MCP_HOST=0.0.0.0
6 | MCP_PORT=8000
7 | MCP_PATH="/"
8 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/.dockerignore:
--------------------------------------------------------------------------------
```
1 | # Environment files
2 | .env
3 | .env.*
4 |
5 | # Python virtual environments
6 | .venv
7 | venv/
8 |
9 | # Python cache and compiled files
10 | __pycache__/
11 | *.pyc
12 | *.pyo
13 | *.pyd
14 | .Python
15 | *.so
16 |
17 | # IDE and editor files
18 | .cursor/
19 | .vscode/
20 | .vscodeignore
21 | .idea/
22 | .DS_Store
23 |
24 | # Git
25 | .git/
26 | .gitignore
27 |
28 | # Python development and testing
29 | .pytest_cache/
30 | .ruff_cache/
31 | .mypy_cache/
32 | .coverage
33 | .tox/
34 | .nox/
35 |
36 | # Python build artifacts
37 | *.egg-info/
38 | dist/
39 | build/
40 |
41 |
42 | # Development configuration
43 | .pre-commit-config.yaml
44 |
```
--------------------------------------------------------------------------------
/.pre-commit-config.yaml:
--------------------------------------------------------------------------------
```yaml
1 | repos:
2 | - repo: https://github.com/pre-commit/pre-commit-hooks
3 | rev: v4.5.0
4 | hooks:
5 | - id: trailing-whitespace
6 | - id: end-of-file-fixer
7 | - id: check-yaml
8 | - id: check-added-large-files
9 | - id: check-ast
10 | - id: check-json
11 | - id: check-merge-conflict
12 | - id: detect-private-key
13 |
14 | - repo: https://github.com/alessandrojcm/commitlint-pre-commit-hook
15 | rev: v9.11.0
16 | hooks:
17 | - id: commitlint
18 | stages: [commit-msg]
19 | additional_dependencies: ['@commitlint/config-conventional']
20 |
21 | - repo: https://github.com/astral-sh/ruff-pre-commit
22 | rev: v0.3.0
23 | hooks:
24 | - id: ruff
25 | args: [--fix, --exit-non-zero-on-fix]
26 | - id: ruff-format
27 |
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | # Byte-compiled / optimized / DLL files
2 | __pycache__/
3 | *.py[cod]
4 | *$py.class
5 |
6 | # C extensions
7 | *.so
8 |
9 | # Distribution / packaging
10 | .Python
11 | build/
12 | develop-eggs/
13 | dist/
14 | downloads/
15 | eggs/
16 | .eggs/
17 | lib/
18 | lib64/
19 | parts/
20 | sdist/
21 | var/
22 | wheels/
23 | share/python-wheels/
24 | *.egg-info/
25 | .installed.cfg
26 | *.egg
27 | MANIFEST
28 |
29 | # PyInstaller
30 | # Usually these files are written by a python script from a template
31 | # before PyInstaller builds the exe, so as to inject date/other infos into it.
32 | *.manifest
33 | *.spec
34 |
35 | # Installer logs
36 | pip-log.txt
37 | pip-delete-this-directory.txt
38 |
39 | # Unit test / coverage reports
40 | htmlcov/
41 | .tox/
42 | .nox/
43 | .coverage
44 | .coverage.*
45 | .cache
46 | nosetests.xml
47 | coverage.xml
48 | *.cover
49 | *.py,cover
50 | .hypothesis/
51 | .pytest_cache/
52 | cover/
53 |
54 | # Translations
55 | *.mo
56 | *.pot
57 |
58 | # Django stuff:
59 | *.log
60 | local_settings.py
61 | db.sqlite3
62 | db.sqlite3-journal
63 |
64 | # Flask stuff:
65 | instance/
66 | .webassets-cache
67 |
68 | # Scrapy stuff:
69 | .scrapy
70 |
71 | # Sphinx documentation
72 | docs/_build/
73 |
74 | # PyBuilder
75 | .pybuilder/
76 | target/
77 |
78 | # Jupyter Notebook
79 | .ipynb_checkpoints
80 |
81 | # IPython
82 | profile_default/
83 | ipython_config.py
84 |
85 | # pyenv
86 | # For a library or package, you might want to ignore these files since the code is
87 | # intended to run in multiple environments; otherwise, check them in:
88 | # .python-version
89 |
90 | # pipenv
91 | # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
92 | # However, in case of collaboration, if having platform-specific dependencies or dependencies
93 | # having no cross-platform support, pipenv may install dependencies that don't work, or not
94 | # install all needed dependencies.
95 | #Pipfile.lock
96 |
97 | # poetry
98 | # Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
99 | # This is especially recommended for binary packages to ensure reproducibility, and is more
100 | # commonly ignored for libraries.
101 | # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
102 | #poetry.lock
103 |
104 | # pdm
105 | # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
106 | #pdm.lock
107 | # pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
108 | # in version control.
109 | # https://pdm.fming.dev/#use-with-ide
110 | .pdm.toml
111 |
112 | # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
113 | __pypackages__/
114 |
115 | # Celery stuff
116 | celerybeat-schedule
117 | celerybeat.pid
118 |
119 | # SageMath parsed files
120 | *.sage.py
121 |
122 | # Environments
123 | .env
124 | .venv
125 | env/
126 | venv/
127 | ENV/
128 | env.bak/
129 | venv.bak/
130 |
131 | # Spyder project settings
132 | .spyderproject
133 | .spyproject
134 |
135 | # Rope project settings
136 | .ropeproject
137 |
138 | # mkdocs documentation
139 | /site
140 |
141 | # mypy
142 | .mypy_cache/
143 | .dmypy.json
144 | dmypy.json
145 |
146 | # Pyre type checker
147 | .pyre/
148 |
149 | # pytype static type analyzer
150 | .pytype/
151 |
152 | # Cython debug symbols
153 | cython_debug/
154 |
155 | # PyCharm
156 | # JetBrains specific template is maintained in a separate JetBrains.gitignore that can
157 | # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
158 | # and can be added to the global gitignore or merged into this file. For a more nuclear
159 | # option (not recommended) you can uncomment the following to ignore the entire idea folder.
160 | .idea/
161 | .vscode/
162 |
163 | .DS_Store
164 |
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
1 | # Atlan Agent Toolkit
2 |
3 | [](code_of_conduct.md)
4 | [](https://pypi.org/project/atlan-mcp-server)
5 | [](https://github.com/atlanhq/agent-toolkit/blob/main/LICENSE)
6 |
7 |
8 | This repository contains a collection of tools and protocols for interacting with Atlan services for AI agents. Each component is designed to provide specific functionality and can be used independently or together.
9 |
10 | ## Components
11 |
12 | ### Model Context Protocol (MCP)
13 |
14 | An MCP server that enables interaction with Atlan services through tool calling. Provides tools for asset search, and retrieval using [pyatlan](https://developer.atlan.com/sdks/python/).
15 |
16 | You can find the documentation and setup instructions for the MCP server [here](modelcontextprotocol/README.md).
17 |
18 |
19 | ## 🔍 DeepWiki: Ask Questions About This Project
20 |
21 | [](https://deepwiki.com/atlanhq/agent-toolkit)
22 |
23 |
24 | ## Contributing
25 |
26 | See [CONTRIBUTING.md](CONTRIBUTING.md) for details on how to contribute to the Atlan Agent Toolkit.
27 |
28 |
29 | ## License
30 |
31 | The project is licensed under the [MIT License](LICENSE). Please see the [LICENSE](LICENSE) file for details.
32 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/README.md:
--------------------------------------------------------------------------------
```markdown
1 | # Atlan MCP Server
2 |
3 | The Atlan [Model Context Protocol](https://modelcontextprotocol.io/introduction) server allows your AI agents to interact with Atlan services.
4 |
5 | ## Quick Start
6 |
7 | 1. Generate Atlan API key by following the [documentation](https://ask.atlan.com/hc/en-us/articles/8312649180049-API-authentication).
8 | 2. Select one of the following approaches based on your preference:
9 | - **[Install via Docker](#install-via-docker)** - Uses Docker containers (recommended)
10 | - **[Install via uv](#install-via-uv)** - Uses UV package manager
11 |
12 | > [!NOTE]
13 | > Make sure to replace `<YOUR_API_KEY>`, `<YOUR_INSTANCE>`, and `<YOUR_AGENT_ID>` with your actual Atlan API key, instance URL, and agent ID(optional) in the configuration file respectively.
14 |
15 | ## Install via Docker
16 |
17 | **Prerequisites:**
18 | - Follow the official [Docker installation guide](https://docs.docker.com/get-docker/) for your operating system
19 | - Verify Docker is running:
20 | ```bash
21 | docker --version
22 | ```
23 |
24 | ### Add to Claude Desktop
25 |
26 | Go to `Claude > Settings > Developer > Edit Config > claude_desktop_config.json` and add:
27 |
28 | ```json
29 | {
30 | "mcpServers": {
31 | "atlan": {
32 | "command": "docker",
33 | "args": [
34 | "run",
35 | "-i",
36 | "--rm",
37 | "-e",
38 | "ATLAN_API_KEY=<YOUR_API_KEY>",
39 | "-e",
40 | "ATLAN_BASE_URL=https://<YOUR_INSTANCE>.atlan.com",
41 | "-e",
42 | "ATLAN_AGENT_ID=<YOUR_AGENT_ID>",
43 | "ghcr.io/atlanhq/atlan-mcp-server:latest"
44 | ]
45 | }
46 | }
47 | }
48 | ```
49 |
50 | ### Add to Cursor
51 |
52 | Open `Cursor > Settings > Tools & Integrations > New MCP Server` to include the following:
53 |
54 | ```json
55 | {
56 | "mcpServers": {
57 | "atlan": {
58 | "command": "docker",
59 | "args": [
60 | "run",
61 | "-i",
62 | "--rm",
63 | "-e",
64 | "ATLAN_API_KEY=<YOUR_API_KEY>",
65 | "-e",
66 | "ATLAN_BASE_URL=https://<YOUR_INSTANCE>.atlan.com",
67 | "-e",
68 | "ATLAN_AGENT_ID=<YOUR_AGENT_ID>",
69 | "ghcr.io/atlanhq/atlan-mcp-server:latest"
70 | ]
71 | }
72 | }
73 | }
74 | ```
75 |
76 | ## Install via uv
77 |
78 | **Prerequisites:**
79 | - Install uv:
80 | ```bash
81 | # macOS/Linux
82 | curl -LsSf https://astral.sh/uv/install.sh | sh
83 |
84 | # Windows (PowerShell)
85 | powershell -c "irm https://astral.sh/uv/install.ps1 | iex"
86 |
87 | # Alternative: if you already have Python/pip
88 | pip install uv
89 | ```
90 | - Verify installation:
91 | ```bash
92 | uv --version
93 | ```
94 |
95 | > [!NOTE]
96 | > With uv, `uvx` automatically fetches the latest version each time you run it. For more predictable behavior, consider using the Docker option.
97 |
98 | ### Add to Claude Desktop
99 |
100 | Go to `Claude > Settings > Developer > Edit Config > claude_desktop_config.json` to include the following:
101 |
102 | ```json
103 | {
104 | "mcpServers": {
105 | "atlan": {
106 | "command": "uvx",
107 | "args": ["atlan-mcp-server"],
108 | "env": {
109 | "ATLAN_API_KEY": "<YOUR_API_KEY>",
110 | "ATLAN_BASE_URL": "https://<YOUR_INSTANCE>.atlan.com",
111 | "ATLAN_AGENT_ID": "<YOUR_AGENT_ID>"
112 | }
113 | }
114 | }
115 | }
116 | ```
117 |
118 | ### Add to Cursor
119 |
120 | Open `Cursor > Settings > Tools & Integrations > New MCP Server` to include the following:
121 |
122 | ```json
123 | {
124 | "mcpServers": {
125 | "atlan": {
126 | "command": "uvx",
127 | "args": ["atlan-mcp-server"],
128 | "env": {
129 | "ATLAN_API_KEY": "<YOUR_API_KEY>",
130 | "ATLAN_BASE_URL": "https://<YOUR_INSTANCE>.atlan.com",
131 | "ATLAN_AGENT_ID": "<YOUR_AGENT_ID>"
132 | }
133 | }
134 | }
135 | }
136 | ```
137 |
138 | ## Available Tools
139 |
140 | | Tool | Description |
141 | | ------------------- | ----------------------------------------------------------------- |
142 | | `search_assets` | Search for assets based on conditions |
143 | | `get_assets_by_dsl` | Retrieve assets using a DSL query |
144 | | `traverse_lineage` | Retrieve lineage for an asset |
145 | | `update_assets` | Update asset attributes (user description and certificate status) |
146 | | `create_glossaries` | Create glossaries |
147 | | `create_glossary_categories` | Create glossary categories |
148 | | `create_glossary_terms` | Create glossary terms |
149 | | `create_dq_rules` | Create data quality rules on Table, View, MaterialisedView, or SnowflakeDynamicTable assets (column-level, table-level, custom SQL) |
150 | | `update_dq_rules` | Update existing data quality rules (threshold, priority, conditions, etc.) |
151 | | `schedule_dq_rules` | Schedule data quality rule execution for assets using cron expressions |
152 | | `delete_dq_rules` | Delete one or multiple data quality rules by GUID |
153 | | `query_asset` | Execute SQL queries on table/view assets |
154 |
155 | ## Tool Access Control
156 |
157 | The Atlan MCP Server includes a configurable tool restriction middleware that allows you to control which tools are available to users. This is useful for implementing role-based access control or restricting certain operations in specific environments.
158 |
159 | ### Restricting Tools
160 |
161 | You can restrict access to specific tools using the `RESTRICTED_TOOLS` environment variable. Provide a comma-separated list of tool names that should be blocked:
162 |
163 | #### Docker Configuration
164 |
165 | ```json
166 | {
167 | "mcpServers": {
168 | "atlan": {
169 | "command": "docker",
170 | "args": [
171 | "run",
172 | "-i",
173 | "--rm",
174 | "-e",
175 | "ATLAN_API_KEY=<YOUR_API_KEY>",
176 | "-e",
177 | "ATLAN_BASE_URL=https://<YOUR_INSTANCE>.atlan.com",
178 | "-e",
179 | "ATLAN_AGENT_ID=<YOUR_AGENT_ID>",
180 | "-e",
181 | "RESTRICTED_TOOLS=get_assets_by_dsl_tool,update_assets_tool",
182 | "ghcr.io/atlanhq/atlan-mcp-server:latest"
183 | ]
184 | }
185 | }
186 | }
187 | ```
188 |
189 | #### uv Configuration
190 |
191 | ```json
192 | {
193 | "mcpServers": {
194 | "atlan": {
195 | "command": "uvx",
196 | "args": ["atlan-mcp-server"],
197 | "env": {
198 | "ATLAN_API_KEY": "<YOUR_API_KEY>",
199 | "ATLAN_BASE_URL": "https://<YOUR_INSTANCE>.atlan.com",
200 | "ATLAN_AGENT_ID": "<YOUR_AGENT_ID>",
201 | "RESTRICTED_TOOLS": "get_assets_by_dsl_tool,update_assets_tool"
202 | }
203 | }
204 | }
205 | }
206 | ```
207 |
208 | ### Available Tool Names for Restriction
209 |
210 | You can restrict any of the following tools:
211 |
212 | - `search_assets_tool` - Asset search functionality
213 | - `get_assets_by_dsl_tool` - DSL query execution
214 | - `traverse_lineage_tool` - Lineage traversal
215 | - `update_assets_tool` - Asset updates (descriptions, certificates)
216 | - `create_glossaries` - Glossary creation
217 | - `create_glossary_categories` - Category creation
218 | - `create_glossary_terms` - Term creation
219 | - `create_dq_rules_tool` - Data quality rule creation
220 | - `update_dq_rules_tool` - Data quality rule updates
221 | - `schedule_dq_rules_tool` - Data quality rule scheduling
222 | - `delete_dq_rules_tool` - Data quality rule deletion
223 |
224 | ### Common Use Cases
225 |
226 | #### Read-Only Access
227 | Restrict all write operations:
228 | ```
229 | RESTRICTED_TOOLS=update_assets_tool,create_glossaries,create_glossary_categories,create_glossary_terms,create_dq_rules_tool,update_dq_rules_tool,schedule_dq_rules_tool,delete_dq_rules_tool
230 | ```
231 |
232 | #### Disable DSL Queries
233 | For security or performance reasons:
234 | ```
235 | RESTRICTED_TOOLS=get_assets_by_dsl_tool
236 | ```
237 |
238 | #### Minimal Access
239 | Allow only basic search:
240 | ```
241 | RESTRICTED_TOOLS=get_assets_by_dsl_tool,update_assets_tool,traverse_lineage_tool,create_glossaries,create_glossary_categories,create_glossary_terms,create_dq_rules_tool,update_dq_rules_tool,schedule_dq_rules_tool,delete_dq_rules_tool
242 | ```
243 |
244 | ### How It Works
245 |
246 | When tools are restricted:
247 | 1. **Hidden from listings**: Restricted tools won't appear when clients request available tools
248 | 2. **Execution blocked**: If someone tries to execute a restricted tool, they'll receive a clear error message
249 | 3. **Logged**: All access decisions are logged for monitoring and debugging
250 |
251 | ### No Restrictions (Default)
252 |
253 | If you don't set the `RESTRICTED_TOOLS` environment variable, all tools will be available by default.
254 |
255 | ## Transport Modes
256 |
257 | The Atlan MCP Server supports three transport modes, each optimized for different deployment scenarios. For more details about MCP transport modes, see the [official MCP documentation](https://modelcontextprotocol.io/specification/2025-06-18/basic/transports).
258 |
259 | | Transport Mode | Use Case | Benefits | When to Use |
260 | |---|---|---|---|
261 | | **stdio** (Default) | Local development, IDE integrations | Simple, direct communication | Claude Desktop, Cursor IDE |
262 | | **SSE** (Server-Sent Events) | Remote deployments, web browsers | Real-time streaming, web-compatible | Cloud deployments, web clients |
263 | | **streamable-http** | HTTP-based remote connections | Standard HTTP, load balancer friendly | Kubernetes, containerized deployments |
264 |
265 | For comprehensive deployment instructions, configuration examples, and production best practices, see our [Deployment Guide](./docs/Deployment.md).
266 |
267 | ## Production Deployment
268 |
269 | - Host the Atlan MCP container image on the cloud/platform of your choice
270 | - Make sure you add all the required environment variables
271 | - Choose the appropriate transport mode for your deployment scenario. SSE Transport is recommended for production (`-e MCP_TRANSPORT=sse`)
272 | - For detailed deployment scenarios and configurations, refer to the [Deployment Guide](./docs/Deployment.md)
273 |
274 | ### Remote MCP Configuration
275 |
276 | We currently do not have a remote MCP server for Atlan generally available.
277 |
278 | You can use the [mcp-remote](https://www.npmjs.com/package/mcp-remote) local proxy tool to connect it to your remote MCP server.
279 |
280 | This lets you to test what an interaction with your remote MCP server will be like with a real-world MCP client.
281 |
282 | ```json
283 | {
284 | "mcpServers": {
285 | "math": {
286 | "command": "npx",
287 | "args": ["mcp-remote", "https://hosted-domain"]
288 | }
289 | }
290 | }
291 | ```
292 |
293 | ## Develop Locally
294 |
295 | Want to develop locally? Check out our [Local Build](./docs/LOCAL_BUILD.md) Guide for a step-by-step walkthrough!
296 |
297 | ## Need Help?
298 |
299 | - Reach out to [email protected] for any questions or feedback
300 | - You can also directly create a [GitHub issue](https://github.com/atlanhq/agent-toolkit/issues) and we will answer it for you
301 |
302 | ## Frequently Asked Questions
303 |
304 | ### Do I need Python installed?
305 |
306 | **Short answer**: It depends on your installation method.
307 |
308 | - **Docker (Recommended)**: No Python installation required on your host machine. The container includes everything needed.
309 | - **uv**: A Python runtime is needed, but uv will automatically download and manage Python 3.11+ for you if it's not already available.
310 |
311 | **Technical details**: The Atlan MCP server is implemented as a Python application. The Model Context Protocol itself is language-agnostic, but our current implementation requires Python 3.11+ to run.
312 |
313 | ## Troubleshooting
314 |
315 | 1. If Claude Desktop shows an error similar to `spawn uv ENOENT {"context":"connection","stack":"Error: spawn uv ENOENT\n at ChildProcess._handle.onexit`, it is most likely [this](https://github.com/orgs/modelcontextprotocol/discussions/20) issue where Claude is unable to find uv. To fix it:
316 | - Make sure uv is installed and available in your PATH
317 | - Run `which uv` to verify the installation path
318 | - Update Claude's configuration to point to the exact uv path by running `whereis uv` and use that path
319 |
```
--------------------------------------------------------------------------------
/.github/SECURITY.md:
--------------------------------------------------------------------------------
```markdown
1 | # Vulnerability Disclosure
2 |
3 | If you think you have found a potential security vulnerability,
4 | please open a [draft Security Advisory](https://github.com/atlanhq/agent-toolkit/security/advisories/new)
5 | via GitHub. We will coordinate verification and next steps through
6 | that secure medium.
7 |
8 | If English is not your first language, please try to describe the
9 | problem and its impact to the best of your ability. For greater detail,
10 | please use your native language and we will try our best to translate it
11 | using online services.
12 |
13 | Please also include the code you used to find the problem and the
14 | shortest amount of code necessary to reproduce it.
15 |
16 | Please do not disclose this to anyone else. We will retrieve a CVE
17 | identifier if necessary and give you full credit under whatever name or
18 | alias you provide. We will only request an identifier when we have a fix
19 | and can publish it in a release.
20 |
21 | We will respect your privacy and will only publicize your involvement if
22 | you grant us permission.
23 |
```
--------------------------------------------------------------------------------
/CONTRIBUTING.md:
--------------------------------------------------------------------------------
```markdown
1 | # Contributing
2 |
3 | We welcome contributions to the Atlan Agent Toolkit! Please follow these guidelines when submitting pull requests:
4 |
5 | 1. **Create a New Branch:**
6 | - Create a new branch for your changes.
7 | - Use a descriptive name for the branch (e.g., `feature/add-new-tool`).
8 |
9 | 2. **Make Your Changes:**
10 | - Make your changes in the new branch.
11 | - Ensure your tools are well-defined and follow the MCP specification.
12 |
13 | 3. **Submit a Pull Request:**
14 | - Push your changes to your branch.
15 | - Create a pull request against the `main` branch.
16 | - Provide a clear description of the changes and any related issues.
17 | - Ensure the PR passes all CI checks before requesting a review.
18 |
19 | 4. **Code Quality:**
20 | - We use pre-commit hooks to maintain code quality.
21 | - Install pre-commit in your local environment:
22 | ```bash
23 | uv pip install pre-commit
24 | pre-commit install
25 | ```
26 | - Pre-commit will automatically run checks before each commit, including:
27 | - Code formatting with Ruff
28 | - Trailing whitespace removal
29 | - End-of-file fixing
30 | - YAML and JSON validation
31 | - Other quality checks
32 |
33 | 5. **Environment Setup:**
34 | - This project uses [uv](https://docs.astral.sh/uv/) for dependency management.
35 | - Refer to the [Model Context Protocol README](modelcontextprotocol/README.md) for setup instructions.
36 | - Python 3.11 or higher is required.
37 |
38 | 6. **Documentation:**
39 | - Update documentation to reflect your changes.
40 | - Add comments to your code where necessary.
41 |
42 | Please open an issue or discussion for questions or suggestions before starting significant work!
43 |
```
--------------------------------------------------------------------------------
/CODE_OF_CONDUCT.md:
--------------------------------------------------------------------------------
```markdown
1 | # Contributor Covenant Code of Conduct
2 |
3 | ## Our Pledge
4 |
5 | In the interest of fostering an open and welcoming environment, we as
6 | contributors and maintainers pledge to make participation in our project and
7 | our community a harassment-free experience for everyone, regardless of age, body
8 | size, disability, ethnicity, sex characteristics, gender identity and expression,
9 | level of experience, education, socio-economic status, nationality, personal
10 | appearance, race, religion, or sexual identity and orientation.
11 |
12 | ## Our Standards
13 |
14 | Examples of behavior that contributes to creating a positive environment
15 | include:
16 |
17 | - Using welcoming and inclusive language
18 | - Being respectful of differing viewpoints and experiences
19 | - Gracefully accepting constructive criticism
20 | - Focusing on what is best for the community
21 | - Showing empathy towards other community members
22 |
23 | Examples of unacceptable behavior by participants include:
24 |
25 | - The use of sexualized language or imagery and unwelcome sexual attention or
26 | advances
27 | - Trolling, insulting/derogatory comments, and personal or political attacks
28 | - Public or private harassment
29 | - Publishing others' private information, such as a physical or electronic
30 | address, without explicit permission
31 | - Other conduct which could reasonably be considered inappropriate in a
32 | professional setting
33 |
34 | ## Our Responsibilities
35 |
36 | Project maintainers are responsible for clarifying the standards of acceptable
37 | behavior and are expected to take appropriate and fair corrective action in
38 | response to any instances of unacceptable behavior.
39 |
40 | Project maintainers have the right and responsibility to remove, edit, or
41 | reject comments, commits, code, wiki edits, issues, and other contributions
42 | that are not aligned to this Code of Conduct, or to ban temporarily or
43 | permanently any contributor for other behaviors that they deem inappropriate,
44 | threatening, offensive, or harmful.
45 |
46 | ## Scope
47 |
48 | This Code of Conduct applies within all project spaces, and it also applies when
49 | an individual is representing the project or its community in public spaces.
50 | Examples of representing a project or community include using an official
51 | project e-mail address, posting via an official social media account, or acting
52 | as an appointed representative at an online or offline event. Representation of
53 | a project may be further defined and clarified by project maintainers.
54 |
55 | ## Enforcement
56 |
57 | Instances of abusive, harassing, or otherwise unacceptable behavior may be
58 | reported by contacting the project team at [[email protected]](mailto:[email protected]). All
59 | complaints will be reviewed and investigated and will result in a response that
60 | is deemed necessary and appropriate to the circumstances. The project team is
61 | obligated to maintain confidentiality with regard to the reporter of an incident.
62 | Further details of specific enforcement policies may be posted separately.
63 |
64 | Project maintainers who do not follow or enforce the Code of Conduct in good
65 | faith may face temporary or permanent repercussions as determined by other
66 | members of the project's leadership.
67 |
68 | ## Attribution
69 |
70 | This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
71 | available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html
72 |
73 | [homepage]: https://www.contributor-covenant.org
74 |
75 | For answers to common questions about this code of conduct, see
76 | https://www.contributor-covenant.org/faq
77 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/version.py:
--------------------------------------------------------------------------------
```python
1 | """Version information."""
2 |
3 | __version__ = "0.3.0"
4 |
```
--------------------------------------------------------------------------------
/.github/dependabot.yml:
--------------------------------------------------------------------------------
```yaml
1 | version: 2
2 | updates:
3 | - package-ecosystem: pip
4 | directory: "/modelcontextprotocol"
5 | schedule:
6 | interval: daily
7 | open-pull-requests-limit: 100
8 | allow:
9 | - dependency-type: "all"
10 |
11 | - package-ecosystem: "github-actions"
12 | directory: "/modelcontextprotocol"
13 | schedule:
14 | interval: daily
15 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/utils/constants.py:
--------------------------------------------------------------------------------
```python
1 | VALID_RELATIONSHIPS = ["anchor"]
2 |
3 | DEFAULT_SEARCH_ATTRIBUTES = [
4 | "name",
5 | "display_name",
6 | "description",
7 | "qualified_name",
8 | "user_description",
9 | "certificate_status",
10 | "owner_users",
11 | "connector_name",
12 | "has_lineage",
13 | "source_created_at",
14 | "source_updated_at",
15 | "readme",
16 | "owner_groups",
17 | "asset_tags",
18 | ]
19 |
```
--------------------------------------------------------------------------------
/commitlint.config.js:
--------------------------------------------------------------------------------
```javascript
1 | module.exports = {
2 | extends: ['@commitlint/config-conventional'],
3 | rules: {
4 | 'type-enum': [
5 | 2,
6 | 'always',
7 | [
8 | 'feat',
9 | 'fix',
10 | 'docs',
11 | 'style',
12 | 'refactor',
13 | 'perf',
14 | 'test',
15 | 'build',
16 | 'ci',
17 | 'chore',
18 | 'revert'
19 | ]
20 | ],
21 | 'subject-case': [0], // Disabled to allow any case
22 | }
23 | };
24 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/utils/__init__.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Utilities for the Atlan MCP server.
3 |
4 | This package provides common utilities used across the server components.
5 | """
6 |
7 | from .assets import save_assets
8 | from .constants import DEFAULT_SEARCH_ATTRIBUTES
9 | from .search import SearchUtils
10 | from .parameters import (
11 | parse_json_parameter,
12 | parse_list_parameter,
13 | )
14 |
15 | __all__ = [
16 | "DEFAULT_SEARCH_ATTRIBUTES",
17 | "SearchUtils",
18 | "parse_json_parameter",
19 | "parse_list_parameter",
20 | "save_assets",
21 | ]
22 |
```
--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/custom.md:
--------------------------------------------------------------------------------
```markdown
1 | ---
2 | name: Documentation or Question
3 | about: Ask a question or request documentation improvements
4 | title: '[DOCS] '
5 | labels: 'documentation'
6 | assignees: ''
7 |
8 | ---
9 |
10 | **What is your question or documentation request?**
11 | A clear and concise description of what you need help with or what documentation you'd like to see improved.
12 |
13 | **Additional context**
14 | Add any other context about your question or documentation request here, such as:
15 | - Related documentation you've already reviewed
16 | - Specific sections that need clarification
17 | - Examples or use cases you'd like to see documented
18 |
```
--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/feature_request.md:
--------------------------------------------------------------------------------
```markdown
1 | ---
2 | name: Feature request
3 | about: Suggest a new feature or enhancement for the agent toolkit
4 | title: '[FEATURE] '
5 | labels: 'enhancement'
6 | assignees: ''
7 |
8 | ---
9 |
10 | **Is your feature request related to a problem? Please describe.**
11 | A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
12 |
13 | **Describe the solution you'd like**
14 | A clear and concise description of what you want to happen.
15 |
16 | **Technical Details**
17 | - Proposed API changes (if any)
18 | - Impact on existing functionality
19 | - Required dependencies or new packages
20 |
21 | **Describe alternatives you've considered**
22 | A clear and concise description of any alternative solutions or features you've considered.
23 |
24 | **Additional context**
25 | Add any other context about the feature request here, such as:
26 | - Use cases or scenarios
27 | - Related issues or pull requests
28 | - Implementation considerations
29 |
```
--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/bug_report.md:
--------------------------------------------------------------------------------
```markdown
1 | ---
2 | name: Bug report
3 | about: Report a bug or unexpected behavior in the agent toolkit
4 | title: '[BUG] '
5 | labels: 'bug'
6 | assignees: ''
7 |
8 | ---
9 |
10 | **Describe the bug**
11 | A clear and concise description of what the bug is.
12 |
13 | **To Reproduce**
14 | Steps to reproduce the behavior:
15 | 1. Python version and environment details
16 | 2. Command or code that triggered the bug
17 | 3. Expected output vs actual output
18 |
19 | **Environment Information**
20 | - Python version: [e.g. 3.9.0]
21 | - OS: [e.g. macOS, Linux, Windows]
22 | - Package versions: [e.g. python-dotenv==1.0.0, pydantic==2.0.0]
23 |
24 | **Error Message**
25 | ```
26 | Paste any error messages or stack traces here
27 | ```
28 |
29 | **Expected behavior**
30 | A clear and concise description of what you expected to happen.
31 |
32 | **Additional context**
33 | Add any other context about the problem here, such as:
34 | - Related configuration files
35 | - Relevant environment variables
36 | - Any workarounds you've tried
37 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
1 | # Use a Python image with uv pre-installed
2 | FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim AS builder
3 |
4 | # Set environment variables for build
5 | ENV PYTHONDONTWRITEBYTECODE=1 \
6 | PYTHONUNBUFFERED=1 \
7 | PIP_NO_CACHE_DIR=1
8 |
9 | # Install the project into `/app`
10 | WORKDIR /app
11 |
12 | ADD . /app
13 |
14 | # Create a virtual environment and install dependencies
15 | RUN python -m venv /app/.venv
16 | ENV PATH="/app/.venv/bin:$PATH"
17 | RUN uv sync --no-cache-dir --no-dev --python /app/.venv/bin/python
18 |
19 | FROM python:3.12-slim-bookworm AS runtime
20 |
21 | RUN groupadd -r appuser && useradd -r -g appuser -m -d /home/appuser appuser
22 |
23 | WORKDIR /appuser
24 |
25 | COPY --from=builder --chown=appuser:appuser /app /appuser
26 |
27 | # Set the PATH to use the virtual environment
28 | ENV PATH="/appuser/.venv/bin:$PATH"
29 |
30 | ENV MCP_TRANSPORT="stdio"
31 | ENV MCP_HOST="0.0.0.0"
32 | ENV MCP_PORT="8000"
33 | ENV MCP_PATH="/"
34 |
35 | USER appuser
36 |
37 | ENTRYPOINT exec python server.py --transport "$MCP_TRANSPORT" --host "$MCP_HOST" --port "$MCP_PORT" --path "$MCP_PATH"
38 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/client.py:
--------------------------------------------------------------------------------
```python
1 | """Client factory for Atlan."""
2 |
3 | import logging
4 | from typing import Optional
5 |
6 | from pyatlan.client.atlan import AtlanClient
7 | from settings import get_settings
8 |
9 | logger = logging.getLogger(__name__)
10 |
11 | _client_instance: Optional[AtlanClient] = None
12 |
13 |
14 | def get_atlan_client() -> AtlanClient:
15 | """
16 | Get the singleton AtlanClient instance for connection reuse.
17 |
18 | Returns:
19 | AtlanClient: The singleton AtlanClient instance.
20 |
21 | Raises:
22 | Exception: If client creation fails.
23 | """
24 | global _client_instance
25 |
26 | if _client_instance is None:
27 | settings = get_settings()
28 | try:
29 | _client_instance = AtlanClient(
30 | base_url=settings.ATLAN_BASE_URL, api_key=settings.ATLAN_API_KEY
31 | )
32 | _client_instance.update_headers(settings.headers)
33 | logger.info("AtlanClient initialized successfully")
34 | except Exception:
35 | logger.error("Failed to create Atlan client", exc_info=True)
36 | raise
37 |
38 | return _client_instance
39 |
```
--------------------------------------------------------------------------------
/.github/workflows/checks.yml:
--------------------------------------------------------------------------------
```yaml
1 | # Pre-commit-checks. This can be reused across all the applications.
2 |
3 | name: Pre-commit Checks
4 | on:
5 | workflow_call:
6 | pull_request:
7 | types: [ opened, synchronize, labeled, reopened ]
8 | branches: "main"
9 |
10 | jobs:
11 | pre-commit:
12 | concurrency:
13 | group: ${{ github.workflow }}-${{ github.ref }}
14 | cancel-in-progress: ${{ startsWith(github.ref, 'refs/pull/') }}
15 | runs-on: ubuntu-latest
16 | timeout-minutes: 10
17 | steps:
18 | - uses: actions/checkout@v4
19 | #----------------------------------------------
20 | # ----- install & configure Python + UV -----
21 | #----------------------------------------------
22 | - uses: actions/setup-python@v5
23 | with:
24 | python-version: '3.11'
25 | - name: Install UV
26 | uses: astral-sh/setup-uv@v5
27 | #----------------------------------------------
28 | # ----- install dependencies & run pre-commit -----
29 | #----------------------------------------------
30 | - name: Install dependencies and run pre-commit
31 | run: |
32 | uv pip install --system pre-commit
33 | # Run pre-commit directly
34 | pre-commit run --all-files
35 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/pyproject.toml:
--------------------------------------------------------------------------------
```toml
1 | [project]
2 | name = "atlan-mcp-server"
3 | dynamic = ["version"]
4 | description = "Atlan Model Context Protocol server for interacting with Atlan services"
5 | readme = "README.md"
6 | requires-python = ">=3.11"
7 | license = { text = "MIT" }
8 | authors = [
9 | {name = "AtlanHQ", email = "[email protected]"}
10 | ]
11 | classifiers = [
12 | "Programming Language :: Python :: 3",
13 | "Programming Language :: Python :: 3.11",
14 | "License :: OSI Approved :: MIT License",
15 | "Operating System :: OS Independent",
16 | ]
17 |
18 | dependencies = [
19 | "fastmcp==2.13.2",
20 | "pyatlan>=6.0.1",
21 | "uvicorn>=0.35.0"
22 | ]
23 |
24 | [project.scripts]
25 | atlan-mcp-server = "server:main"
26 |
27 | [project.urls]
28 | "Homepage" = "https://github.com/atlanhq/agent-toolkit"
29 | "Documentation" = "https://ask.atlan.com/hc/en-us/articles/12525731740175-How-to-implement-the-Atlan-MCP-server"
30 | "Bug Tracker" = "https://github.com/atlanhq/agent-toolkit/issues"
31 | "Source" = "https://github.com/atlanhq/agent-toolkit.git"
32 | "Changelog" = "https://github.com/atlanhq/agent-toolkit/blob/main/CHANGELOG.md"
33 |
34 | [tool.hatch.version]
35 | path = "version.py"
36 |
37 | [tool.hatch.build.targets.wheel]
38 | packages = ["."]
39 |
40 | [build-system]
41 | requires = ["hatchling"]
42 | build-backend = "hatchling.build"
43 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/settings.py:
--------------------------------------------------------------------------------
```python
1 | """Configuration settings for the application."""
2 |
3 | from typing import Optional
4 | from pydantic_settings import BaseSettings
5 | from version import __version__ as MCP_VERSION
6 |
7 |
8 | class Settings(BaseSettings):
9 | """Application settings loaded from environment variables or .env file."""
10 |
11 | ATLAN_BASE_URL: str
12 | ATLAN_API_KEY: str
13 | ATLAN_AGENT_ID: str = "NA"
14 | ATLAN_AGENT: str = "atlan-mcp"
15 | ATLAN_MCP_USER_AGENT: str = f"Atlan MCP Server {MCP_VERSION}"
16 | MCP_TRANSPORT: str = "stdio"
17 | MCP_HOST: str = "0.0.0.0"
18 | MCP_PORT: int = 8000
19 | MCP_PATH: str = "/"
20 |
21 | @property
22 | def headers(self) -> dict:
23 | """Get the headers for API requests."""
24 | return {
25 | "User-Agent": self.ATLAN_MCP_USER_AGENT,
26 | "X-Atlan-Agent": self.ATLAN_AGENT,
27 | "X-Atlan-Agent-Id": self.ATLAN_AGENT_ID,
28 | "X-Atlan-Client-Origin": self.ATLAN_AGENT,
29 | }
30 |
31 | class Config:
32 | env_file = ".env"
33 | env_file_encoding = "utf-8"
34 | extra = "allow"
35 | # Allow case-insensitive environment variables
36 | case_sensitive = False
37 |
38 |
39 | _settings: Optional[Settings] = None
40 |
41 |
42 | def get_settings() -> Settings:
43 | """
44 | Get the singleton Settings instance.
45 | Loads settings once from environment/file and reuses the instance.
46 |
47 | Returns:
48 | Settings: The singleton settings instance
49 | """
50 | global _settings
51 | if _settings is None:
52 | _settings = Settings()
53 | return _settings
54 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/tools/__init__.py:
--------------------------------------------------------------------------------
```python
1 | from .search import search_assets
2 | from .dsl import get_assets_by_dsl
3 | from .lineage import traverse_lineage
4 | from .assets import update_assets
5 | from .query import query_asset
6 | from .dq_rules import (
7 | create_dq_rules,
8 | schedule_dq_rules,
9 | delete_dq_rules,
10 | update_dq_rules,
11 | )
12 | from .glossary import (
13 | create_glossary_category_assets,
14 | create_glossary_assets,
15 | create_glossary_term_assets,
16 | )
17 | from .domain import create_data_domain_assets, create_data_product_assets
18 | from .models import (
19 | CertificateStatus,
20 | UpdatableAttribute,
21 | UpdatableAsset,
22 | TermOperations,
23 | Glossary,
24 | GlossaryCategory,
25 | GlossaryTerm,
26 | DQRuleType,
27 | DQAssetType,
28 | DQRuleSpecification,
29 | DQRuleScheduleSpecification,
30 | DQRuleScheduleResponse,
31 | ScheduledAssetInfo,
32 | DQRuleInfo,
33 | DQRuleDeleteResponse,
34 | )
35 |
36 | __all__ = [
37 | "search_assets",
38 | "get_assets_by_dsl",
39 | "traverse_lineage",
40 | "update_assets",
41 | "query_asset",
42 | "create_glossary_category_assets",
43 | "create_glossary_assets",
44 | "create_glossary_term_assets",
45 | "create_data_domain_assets",
46 | "create_data_product_assets",
47 | "CertificateStatus",
48 | "UpdatableAttribute",
49 | "UpdatableAsset",
50 | "TermOperations",
51 | "Glossary",
52 | "GlossaryCategory",
53 | "GlossaryTerm",
54 | "create_dq_rules",
55 | "schedule_dq_rules",
56 | "delete_dq_rules",
57 | "update_dq_rules",
58 | "DQRuleType",
59 | "DQAssetType",
60 | "DQRuleSpecification",
61 | "DQRuleScheduleSpecification",
62 | "DQRuleScheduleResponse",
63 | "ScheduledAssetInfo",
64 | "DQRuleInfo",
65 | "DQRuleDeleteResponse",
66 | ]
67 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/utils/parameters.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Parameter parsing and validation utilities for MCP tools.
3 |
4 | This module provides reusable functions for parsing and validating
5 | parameters that are commonly used across different MCP tools.
6 | """
7 |
8 | import json
9 | import logging
10 | from typing import Any, List, Optional, Union
11 |
12 | logger = logging.getLogger(__name__)
13 |
14 |
15 | def parse_json_parameter(param: Any) -> Union[dict, list, None]:
16 | """
17 | Parse a parameter that might be a JSON string.
18 |
19 | Args:
20 | param: The parameter value to parse (could be string, dict, list, etc.)
21 |
22 | Returns:
23 | The parsed parameter value
24 |
25 | Raises:
26 | json.JSONDecodeError: If the JSON string is invalid
27 | """
28 | if param is None:
29 | return None
30 |
31 | if isinstance(param, str):
32 | try:
33 | return json.loads(param)
34 | except json.JSONDecodeError as e:
35 | logger.error(f"Invalid JSON parameter: {param}")
36 | raise e
37 |
38 | return param
39 |
40 |
41 | def parse_list_parameter(param: Any) -> Optional[List[Any]]:
42 | """
43 | Parse a parameter that might be a JSON string representing a list.
44 |
45 | Args:
46 | param: The parameter value to parse
47 |
48 | Returns:
49 | The parsed list, None if param is None, or original value converted to list if needed
50 |
51 | Raises:
52 | json.JSONDecodeError: If the JSON string is invalid
53 | """
54 | if param is None:
55 | return None
56 |
57 | if isinstance(param, str):
58 | try:
59 | parsed = json.loads(param)
60 | except json.JSONDecodeError as e:
61 | logger.error(f"Invalid JSON parameter: {param}")
62 | raise e
63 |
64 | if isinstance(parsed, list):
65 | return parsed
66 | return [parsed]
67 |
68 | if isinstance(param, list):
69 | return param
70 |
71 | return [param]
72 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/docs/LOCAL_BUILD.md:
--------------------------------------------------------------------------------
```markdown
1 | # Local Build
2 |
3 | 1. Clone the repository:
4 | ```bash
5 | git clone https://github.com/atlanhq/agent-toolkit.git
6 | cd agent-toolkit
7 | ```
8 |
9 | 2. Install UV package manager:
10 | For macOS:
11 | ```bash
12 | # Using Homebrew
13 | brew install uv
14 | ```
15 |
16 | For more installation options and detailed instructions, refer to the [official UV documentation](https://docs.astral.sh/uv/getting-started/installation/).
17 |
18 | 3. Install dependencies:
19 | > python version should be >= 3.11
20 | ```bash
21 | cd modelcontextprotocol
22 | uv sync
23 | ```
24 |
25 | 4. Configure Atlan credentials:
26 |
27 | a. Using a .env file:
28 | Create a `.env` file in the root directory (or copy the `.env.template` file and rename it to `.env`) with the following content:
29 | ```
30 | ATLAN_BASE_URL=https://your-instance.atlan.com
31 | ATLAN_API_KEY=your_api_key
32 | ATLAN_AGENT_ID=your_agent_id
33 | ```
34 |
35 | **Note: `ATLAN_AGENT_ID` is optional but recommended. It will be used to identify which Agent is making the request on Atlan UI**
36 |
37 | To generate the API key, refer to the [Atlan documentation](https://ask.atlan.com/hc/en-us/articles/8312649180049-API-authentication).
38 |
39 | 5. Run the server:
40 | ```bash
41 | uv run .venv/bin/atlan-mcp-server
42 | ```
43 |
44 | 6. (For debugging) Run the server with MCP inspector:
45 | ```bash
46 | uv run mcp dev server.py
47 | ```
48 | 7. Integrate local MCP changes with Claude Desktop(For E2E testing):
49 | When claude is integrated with Atlan MCP, it runs its own MCP server
50 | Update config in claude desktop config as below to use your local code changes for testing end to end:
51 | ```bash
52 | {
53 | "mcpServers": {
54 | "atlan-local": {
55 | "command": "uv",
56 | "args": [
57 | "run",
58 | "/path/to/agent-toolkit/modelcontextprotocol/.venv/bin/atlan-mcp-server"
59 | ],
60 | "cwd": "/path/to/agent-toolkit/modelcontextprotocol",
61 | "env": {
62 | "ATLAN_API_KEY": "your_api_key",
63 | "ATLAN_BASE_URL": "https://your-instance.atlan.com",
64 | "ATLAN_AGENT_ID": "your_agent_id"
65 | }
66 | }
67 | }
68 | }
69 | ```
70 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/utils/assets.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Asset utilities for the Atlan MCP server.
3 |
4 | This module provides reusable functions for asset operations
5 | that are commonly used across different MCP tools.
6 | """
7 |
8 | import logging
9 | from typing import Any, Dict, List
10 |
11 | from pyatlan.model.assets import Asset
12 |
13 | from client import get_atlan_client
14 |
15 | logger = logging.getLogger(__name__)
16 |
17 |
18 | def save_assets(assets: List[Asset]) -> List[Dict[str, Any]]:
19 | """
20 | Common bulk save and response processing for any asset type.
21 |
22 | Args:
23 | assets (List[Asset]): List of Asset objects to save.
24 |
25 | Returns:
26 | List[Dict[str, Any]]: List of dictionaries with details for each created
27 | or updated asset.
28 |
29 | Raises:
30 | Exception: If there's an error saving the assets.
31 | """
32 | logger.info("Starting bulk save operation")
33 | client = get_atlan_client()
34 | try:
35 | response = client.asset.save(assets)
36 | except Exception as e:
37 | logger.error(f"Error saving assets: {e}")
38 | raise
39 |
40 | created_assets = response.mutated_entities.CREATE or []
41 | updated_assets = response.mutated_entities.UPDATE or []
42 |
43 | logger.info(
44 | f"Save operation completed: {len(created_assets)} created, "
45 | f"{len(updated_assets)} updated"
46 | )
47 |
48 | results = []
49 |
50 | # Process created assets
51 | for asset in created_assets:
52 | results.append(
53 | {
54 | "guid": asset.guid,
55 | "name": asset.name,
56 | "qualified_name": asset.qualified_name,
57 | "operation": "CREATE",
58 | }
59 | )
60 |
61 | # Process updated assets
62 | for asset in updated_assets:
63 | results.append(
64 | {
65 | "guid": asset.guid,
66 | "name": asset.name,
67 | "qualified_name": asset.qualified_name,
68 | "operation": "UPDATE",
69 | }
70 | )
71 |
72 | logger.info(f"Bulk save completed successfully for {len(results)} assets")
73 | return results
74 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/tools/dsl.py:
--------------------------------------------------------------------------------
```python
1 | import logging
2 | import json
3 | from typing import Dict, Any, Union
4 |
5 | from client import get_atlan_client
6 | from pyatlan.model.search import DSL, IndexSearchRequest
7 | from utils.search import SearchUtils
8 |
9 | # Configure logging
10 | logger = logging.getLogger(__name__)
11 |
12 |
13 | def get_assets_by_dsl(dsl_query: Union[str, Dict[str, Any]]) -> Dict[str, Any]:
14 | """
15 | Execute the search with the given query
16 | Args:
17 | dsl_query (Union[str, Dict[str, Any]]): The DSL query as either a string or dictionary
18 | Returns:
19 | Dict[str, Any]: A dictionary containing the results and aggregations
20 | """
21 | logger.info("Starting DSL-based asset search")
22 | try:
23 | # Parse string to dict if needed
24 | if isinstance(dsl_query, str):
25 | logger.debug("Converting DSL string to JSON")
26 | try:
27 | dsl_dict = json.loads(dsl_query)
28 | except json.JSONDecodeError as e:
29 | logger.error(f"Invalid JSON in DSL query: {e}")
30 | return {
31 | "results": [],
32 | "aggregations": {},
33 | "error": "Invalid JSON in DSL query",
34 | }
35 | else:
36 | logger.debug("Using provided DSL dictionary")
37 | dsl_dict = dsl_query
38 |
39 | logger.debug("Creating IndexSearchRequest")
40 | index_request = IndexSearchRequest(
41 | dsl=DSL(**dsl_dict),
42 | suppress_logs=True,
43 | show_search_score=True,
44 | exclude_meanings=False,
45 | exclude_atlan_tags=False,
46 | )
47 |
48 | logger.info("Executing DSL search request")
49 | client = get_atlan_client()
50 | search_response = client.asset.search(index_request)
51 | processed_results = SearchUtils.process_results(search_response)
52 | return processed_results
53 | except Exception as e:
54 | logger.error(f"Error in DSL search: {str(e)}")
55 | return {"results": [], "aggregations": {}, "error": str(e)}
56 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/docs/DEPLOYMENT.md:
--------------------------------------------------------------------------------
```markdown
1 | # Atlan MCP Server Deployment Guide
2 |
3 | This guide covers transport modes and basic deployment options for the Atlan MCP Server.
4 |
5 | ## Transport Modes
6 |
7 | The Atlan MCP Server supports three transport modes. For more details about MCP transport modes, see the [official MCP documentation](https://modelcontextprotocol.io/specification/2025-06-18/basic/transports).
8 |
9 | | Transport Mode | Use Case | Benefits | When to Use |
10 | |---|---|---|---|
11 | | **stdio** (Default) | Local development, IDE integrations | Simple, direct communication | Claude Desktop, Cursor IDE |
12 | | **SSE** (Server-Sent Events) | Remote deployments, web browsers | Real-time streaming, web-compatible | Cloud deployments, web clients |
13 | | **streamable-http** | HTTP-based remote connections | Standard HTTP, load balancer friendly | Kubernetes, containerized deployments |
14 |
15 | ## Basic Deployment Examples
16 |
17 | ### Local Development (stdio)
18 | ```bash
19 | # Default stdio mode
20 | python server.py
21 |
22 | # Or explicitly specify stdio
23 | python server.py --transport stdio
24 | ```
25 |
26 | ### Cloud Deployment (SSE)
27 | ```bash
28 | # Docker with SSE
29 | docker run -d \
30 | -p 8000:8000 \
31 | -e ATLAN_API_KEY="<YOUR_API_KEY>" \
32 | -e ATLAN_BASE_URL="https://<YOUR_INSTANCE>.atlan.com" \
33 | -e MCP_TRANSPORT="sse" \
34 | ghcr.io/atlanhq/atlan-mcp-server:latest
35 |
36 | # Python with SSE
37 | python server.py --transport sse --host 0.0.0.0 --port 8000
38 | ```
39 |
40 | ### HTTP Deployment (streamable-http)
41 | ```bash
42 | # Docker with HTTP
43 | docker run -d \
44 | -p 8000:8000 \
45 | -e ATLAN_API_KEY="<YOUR_API_KEY>" \
46 | -e ATLAN_BASE_URL="https://<YOUR_INSTANCE>.atlan.com" \
47 | -e MCP_TRANSPORT="streamable-http" \
48 | ghcr.io/atlanhq/atlan-mcp-server:latest
49 |
50 | # Python with HTTP
51 | python server.py --transport streamable-http --host 0.0.0.0 --port 8000
52 | ```
53 |
54 | ## Environment Variables
55 |
56 | ### Required
57 | - `ATLAN_API_KEY`: Your Atlan API key
58 | - `ATLAN_BASE_URL`: Your Atlan instance URL
59 |
60 | ### Transport Configuration
61 | - `MCP_TRANSPORT`: Transport mode (stdio/sse/streamable-http)
62 | - `MCP_HOST`: Host address for network transports (default: 0.0.0.0)
63 | - `MCP_PORT`: Port number for network transports (default: 8000)
64 | - `MCP_PATH`: Path for streamable-http transport (default: /)
65 |
66 | ### Optional
67 | - `ATLAN_AGENT_ID`: Agent identifier
68 | - `RESTRICTED_TOOLS`: Comma-separated list of tools to restrict
69 |
70 | For additional support, refer to the main [README](../README.md) or contact [email protected].
71 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/tools/query.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Query tool for executing SQL queries on table/view assets.
3 |
4 | This module provides functionality to execute SQL queries on data sources
5 | using the Atlan client.
6 | """
7 |
8 | import logging
9 | from typing import Dict, Any, Optional
10 |
11 | from client import get_atlan_client
12 | from pyatlan.model.query import QueryRequest
13 |
14 | # Configure logging
15 | logger = logging.getLogger(__name__)
16 |
17 |
18 | def query_asset(
19 | sql: str,
20 | connection_qualified_name: str,
21 | default_schema: Optional[str] = None,
22 | ) -> Dict[str, Any]:
23 | """
24 | Execute a SQL query on a table/view asset.
25 |
26 | Note:
27 | Use read-only queries to retrieve data.
28 | Please add reasonable LIMIT clauses to your SQL queries to avoid
29 | overwhelming the client or causing timeouts. Large result sets can
30 | cause performance issues or crash the client application.
31 |
32 | Args:
33 | sql (str): The SQL query to execute (read-only queries)
34 | connection_qualified_name (str): Connection qualified name to use for the query
35 | (e.g., "default/snowflake/1705755637")
36 | default_schema (str, optional): Default schema name to use for unqualified
37 | objects in the SQL, in the form "DB.SCHEMA"
38 | (e.g., "RAW.WIDEWORLDIMPORTERS_WAREHOUSE")
39 |
40 | Returns:
41 | Dict[str, Any]: Dictionary containing:
42 | - success: Boolean indicating if the query was successful
43 | - data: Query result data (rows, columns) if successful
44 | - error: Error message if query failed
45 | - query_info: Additional query execution information
46 |
47 | Raises:
48 | Exception: If there's an error executing the query
49 | """
50 | logger.info(
51 | f"Starting SQL query execution on connection: {connection_qualified_name}"
52 | )
53 | logger.debug(f"SQL query: {sql}")
54 | logger.debug(f"Parameters - default_schema: {default_schema}")
55 |
56 | try:
57 | # Validate required parameters
58 | if not sql or not sql.strip():
59 | error_msg = "SQL query cannot be empty"
60 | logger.error(error_msg)
61 | return {
62 | "success": False,
63 | "data": None,
64 | "error": error_msg,
65 | "query_info": {},
66 | }
67 |
68 | if not connection_qualified_name or not connection_qualified_name.strip():
69 | error_msg = "Connection qualified name cannot be empty"
70 | logger.error(error_msg)
71 | return {
72 | "success": False,
73 | "data": None,
74 | "error": error_msg,
75 | "query_info": {},
76 | }
77 |
78 | # Get Atlan client
79 | logger.debug("Getting Atlan client")
80 | client = get_atlan_client()
81 |
82 | # Build query request
83 | logger.debug("Building QueryRequest object")
84 | query_request = QueryRequest(
85 | sql=sql,
86 | data_source_name=connection_qualified_name,
87 | default_schema=default_schema,
88 | )
89 |
90 | # Execute query
91 | logger.info("Executing SQL query")
92 | query_response = client.queries.stream(request=query_request)
93 |
94 | logger.info("Query executed successfully, returning response")
95 |
96 | return {
97 | "success": True,
98 | "data": query_response,
99 | "error": None,
100 | "query_info": {
101 | "data_source": connection_qualified_name,
102 | "default_schema": default_schema,
103 | "sql": sql,
104 | },
105 | }
106 |
107 | except Exception as e:
108 | error_msg = f"Error executing SQL query: {str(e)}"
109 | logger.error(error_msg)
110 | logger.exception("Exception details:")
111 |
112 | return {
113 | "success": False,
114 | "data": None,
115 | "error": error_msg,
116 | "query_info": {
117 | "data_source": connection_qualified_name,
118 | "default_schema": default_schema,
119 | "sql": sql,
120 | },
121 | }
122 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/tools/lineage.py:
--------------------------------------------------------------------------------
```python
1 | import logging
2 | from typing import Dict, Any, List, Optional, Union
3 |
4 | from client import get_atlan_client
5 | from pyatlan.model.enums import LineageDirection
6 | from pyatlan.model.lineage import FluentLineage
7 | from pyatlan.model.fields.atlan_fields import AtlanField
8 | from utils.search import SearchUtils
9 | from utils.constants import DEFAULT_SEARCH_ATTRIBUTES
10 |
11 | # Configure logging
12 | logger = logging.getLogger(__name__)
13 |
14 |
15 | def traverse_lineage(
16 | guid: str,
17 | direction: LineageDirection,
18 | depth: int = 1000000,
19 | size: int = 10,
20 | immediate_neighbors: bool = False,
21 | include_attributes: Optional[List[Union[str, AtlanField]]] = None,
22 | ) -> Dict[str, Any]:
23 | """
24 | Traverse asset lineage in specified direction.
25 |
26 | By default, essential attributes used in search operations are included.
27 | Additional attributes can be specified via include_attributes parameter.
28 |
29 | Args:
30 | guid (str): GUID of the starting asset
31 | direction (LineageDirection): Direction to traverse (UPSTREAM or DOWNSTREAM)
32 | depth (int, optional): Maximum depth to traverse. Defaults to 1000000.
33 | size (int, optional): Maximum number of results to return. Defaults to 10.
34 | immediate_neighbors (bool, optional): Only return immediate neighbors. Defaults to False.
35 | include_attributes (Optional[List[Union[str, AtlanField]]], optional): List of additional
36 | attributes to include in results. Can be string attribute names or AtlanField objects.
37 | These will be added to the default set. Defaults to None.
38 |
39 | Returns:
40 | Dict[str, Any]: Dictionary containing:
41 | - assets: List of assets in the lineage with processed attributes
42 | - error: None if no error occurred, otherwise the error message
43 |
44 | Raises:
45 | Exception: If there's an error executing the lineage request
46 | """
47 | logger.info(
48 | f"Starting lineage traversal from {guid} in direction {direction}, "
49 | f"depth={depth}, size={size}, immediate_neighbors={immediate_neighbors}"
50 | )
51 | logger.debug(f"Include attributes parameter: {include_attributes}")
52 |
53 | try:
54 | # Initialize base request
55 | logger.debug("Initializing FluentLineage object")
56 | lineage_builder = (
57 | FluentLineage(starting_guid=guid)
58 | .direction(direction)
59 | .depth(depth)
60 | .size(size)
61 | .immediate_neighbors(immediate_neighbors)
62 | )
63 |
64 | # Prepare attributes to include: default attributes + additional user-specified attributes
65 | all_attributes = DEFAULT_SEARCH_ATTRIBUTES.copy()
66 |
67 | if include_attributes:
68 | logger.debug(f"Adding user-specified attributes: {include_attributes}")
69 | for attr in include_attributes:
70 | if isinstance(attr, str) and attr not in all_attributes:
71 | all_attributes.append(attr)
72 |
73 | logger.debug(f"Total attributes to include: {all_attributes}")
74 |
75 | # Include all string attributes in results
76 | for attr_name in all_attributes:
77 | attr_obj = SearchUtils._get_asset_attribute(attr_name)
78 | if attr_obj is None:
79 | logger.warning(
80 | f"Unknown attribute for inclusion: {attr_name}, skipping"
81 | )
82 | continue
83 | logger.debug(f"Including attribute: {attr_name}")
84 | lineage_builder = lineage_builder.include_on_results(attr_obj)
85 |
86 | # Execute request
87 | logger.debug("Converting FluentLineage to request object")
88 | request = lineage_builder.request
89 |
90 | logger.info("Executing lineage request")
91 | client = get_atlan_client()
92 | response = client.asset.get_lineage_list(request)
93 |
94 | # Process results using same pattern as search
95 | logger.info("Processing lineage results")
96 | if response is None:
97 | logger.info("No lineage results found")
98 | return {"assets": [], "error": None}
99 |
100 | # Convert results to list and process using Pydantic serialization
101 | results_list = [
102 | result.dict(by_alias=True, exclude_unset=True)
103 | for result in response
104 | if result is not None
105 | ]
106 |
107 | logger.info(
108 | f"Lineage traversal completed, returned {len(results_list)} results"
109 | )
110 | return {"assets": results_list, "error": None}
111 |
112 | except Exception as e:
113 | logger.error(f"Error traversing lineage: {str(e)}")
114 | return {"assets": [], "error": str(e)}
115 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/middleware.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tool restriction middleware for FastMCP to control tool access.
3 |
4 | This middleware restricts access to specified tools based on configuration.
5 | Tools can be restricted globally by providing a list during initialization.
6 | """
7 |
8 | from typing import List, Set, Optional
9 | from fastmcp.server.middleware import Middleware, MiddlewareContext
10 | from fastmcp.exceptions import ToolError
11 | import logging
12 |
13 | logger = logging.getLogger(__name__)
14 |
15 |
16 | class ToolRestrictionMiddleware(Middleware):
17 | """
18 | Middleware to restrict tool access based on configuration.
19 |
20 | Allows specifying which tools should be restricted during initialization.
21 | Restricted tools will be hidden from the tools list and blocked from execution.
22 | """
23 |
24 | def __init__(self, restricted_tools: Optional[List[str]] = None):
25 | """
26 | Initialize the Tool Restriction Middleware.
27 |
28 | Args:
29 | restricted_tools: List of tool names to restrict. If None, no tools are restricted.
30 | """
31 | self.restricted_tools: Set[str] = set(restricted_tools or [])
32 | self._log_initialization()
33 |
34 | def _log_initialization(self) -> None:
35 | """Log middleware initialization details."""
36 | logger.info(
37 | f"Tool Restriction Middleware initialized with {len(self.restricted_tools)} restricted tools",
38 | restricted_tools=list(self.restricted_tools),
39 | )
40 |
41 | def _is_tool_restricted(self, tool_name: str) -> bool:
42 | """
43 | Check if a tool is restricted.
44 |
45 | Args:
46 | tool_name: Name of the tool being called.
47 |
48 | Returns:
49 | True if the tool is restricted, False otherwise.
50 | """
51 | is_restricted = tool_name in self.restricted_tools
52 |
53 | if is_restricted:
54 | logger.info(f"Tool {tool_name} is restricted", tool=tool_name)
55 |
56 | return is_restricted
57 |
58 | def _get_error_message(self, tool_name: str) -> str:
59 | """
60 | Get appropriate error message for a restricted tool.
61 |
62 | Args:
63 | tool_name: Name of the restricted tool.
64 |
65 | Returns:
66 | Error message string.
67 | """
68 | return f"Tool '{tool_name}' is not available due to access restrictions"
69 |
70 | async def on_call_tool(self, context: MiddlewareContext, call_next):
71 | """
72 | Hook called when a tool is being executed.
73 |
74 | Checks if the tool is restricted and either allows execution or raises an error.
75 |
76 | Args:
77 | context: The middleware context containing request information.
78 | call_next: Function to call the next middleware/handler in the chain.
79 |
80 | Returns:
81 | The result from the next handler if allowed.
82 |
83 | Raises:
84 | ToolError: If the tool is restricted.
85 | """
86 | tool_name = context.message.name
87 |
88 | try:
89 | # Check if tool is restricted
90 | if self._is_tool_restricted(tool_name):
91 | error_message = self._get_error_message(tool_name)
92 |
93 | logger.warning(
94 | f"Tool access denied: {tool_name}",
95 | tool=tool_name,
96 | reason=error_message,
97 | )
98 |
99 | raise ToolError(error_message)
100 |
101 | # Tool is allowed, proceed with execution
102 | logger.debug(f"Tool access granted: {tool_name}", tool=tool_name)
103 |
104 | return await call_next(context)
105 |
106 | except ToolError:
107 | # Re-raise ToolError as-is
108 | raise
109 | except Exception as e:
110 | # Handle unexpected errors
111 | logger.error(
112 | f"Error in tool restriction middleware: {str(e)}",
113 | tool=tool_name,
114 | exc_info=True,
115 | )
116 | # Re-raise the original exception
117 | raise
118 |
119 | async def on_list_tools(self, context: MiddlewareContext, call_next):
120 | """
121 | Hook called when listing available tools.
122 |
123 | Filters the tool list to hide restricted tools.
124 |
125 | Args:
126 | context: The middleware context.
127 | call_next: Function to call the next handler.
128 |
129 | Returns:
130 | Filtered list of tools.
131 | """
132 | # Get the full list of tools
133 | all_tools = await call_next(context)
134 |
135 | try:
136 | # If no tools are restricted, return all tools
137 | if not self.restricted_tools:
138 | return all_tools
139 |
140 | # Filter out restricted tools
141 | filtered_tools = [
142 | tool for tool in all_tools if tool.name not in self.restricted_tools
143 | ]
144 |
145 | logger.debug(
146 | "Filtered tool list",
147 | total_tools=len(all_tools),
148 | filtered_tools=len(filtered_tools),
149 | restricted_tools=list(self.restricted_tools),
150 | )
151 |
152 | return filtered_tools
153 |
154 | except Exception as e:
155 | logger.error(
156 | f"Error filtering tool list: {str(e)}",
157 | exc_info=True,
158 | )
159 | # On error, return the original list to avoid breaking functionality
160 | return all_tools
161 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/tools/domain.py:
--------------------------------------------------------------------------------
```python
1 | from __future__ import annotations
2 |
3 | import logging
4 | from typing import Any, Dict, List, Union
5 |
6 | from pyatlan.model.assets import Asset, DataDomain, DataProduct
7 | from pyatlan.model.fluent_search import CompoundQuery, FluentSearch
8 |
9 | from utils import save_assets
10 | from .models import DataDomainSpec, DataProductSpec
11 |
12 | logger = logging.getLogger(__name__)
13 |
14 |
15 | def create_data_domain_assets(
16 | domains: Union[Dict[str, Any], List[Dict[str, Any]]],
17 | ) -> List[Dict[str, Any]]:
18 | """
19 | Create one or multiple Data Domain or Sub Domain assets in Atlan.
20 |
21 | Args:
22 | domains (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single domain
23 | specification (dict) or a list of domain specifications. Each specification
24 | can be a dictionary containing:
25 | - name (str): Name of the domain (required)
26 | - parent_domain_qualified_name (str, optional): Qualified name of the parent
27 | domain. If provided, creates a Sub Domain under that parent.
28 | - user_description (str, optional): Detailed description of the domain
29 | - certificate_status (str, optional): Certification status
30 | ("VERIFIED", "DRAFT", or "DEPRECATED")
31 |
32 | Returns:
33 | List[Dict[str, Any]]: List of dictionaries, each with details for a created domain:
34 | - guid: The GUID of the created domain
35 | - name: The name of the domain
36 | - qualified_name: The qualified name of the created domain
37 |
38 | Raises:
39 | Exception: If there's an error creating the domain assets.
40 | """
41 | data = domains if isinstance(domains, list) else [domains]
42 | logger.info(f"Creating {len(data)} data domain asset(s)")
43 | logger.debug(f"Domain specifications: {data}")
44 |
45 | specs = [DataDomainSpec(**item) for item in data]
46 |
47 | assets: List[DataDomain] = []
48 | for spec in specs:
49 | logger.debug(
50 | f"Creating DataDomain: {spec.name}"
51 | + (
52 | f" under {spec.parent_domain_qualified_name}"
53 | if spec.parent_domain_qualified_name
54 | else ""
55 | )
56 | )
57 | domain = DataDomain.creator(
58 | name=spec.name,
59 | parent_domain_qualified_name=spec.parent_domain_qualified_name,
60 | )
61 | domain.user_description = spec.user_description
62 | domain.certificate_status = (
63 | spec.certificate_status.value if spec.certificate_status else None
64 | )
65 |
66 | if spec.certificate_status:
67 | logger.debug(
68 | f"Set certificate status for {spec.name}: {spec.certificate_status.value}"
69 | )
70 |
71 | assets.append(domain)
72 |
73 | return save_assets(assets)
74 |
75 |
76 | def create_data_product_assets(
77 | products: Union[Dict[str, Any], List[Dict[str, Any]]],
78 | ) -> List[Dict[str, Any]]:
79 | """
80 | Create one or multiple Data Product assets in Atlan.
81 |
82 | Args:
83 | products (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single product
84 | specification (dict) or a list of product specifications. Each specification
85 | can be a dictionary containing:
86 | - name (str): Name of the product (required)
87 | - domain_qualified_name (str): Qualified name of the domain this product
88 | belongs to (required)
89 | - asset_guids (List[str]): List of asset GUIDs to link to this product
90 | (required, at least one)
91 | - user_description (str, optional): Detailed description of the product
92 | - certificate_status (str, optional): Certification status
93 | ("VERIFIED", "DRAFT", or "DEPRECATED")
94 |
95 | Returns:
96 | List[Dict[str, Any]]: List of dictionaries, each with details for a created product:
97 | - guid: The GUID of the created product
98 | - name: The name of the product
99 | - qualified_name: The qualified name of the created product
100 |
101 | Raises:
102 | Exception: If there's an error creating the product assets.
103 | ValueError: If no asset_guids are provided (validated in DataProductSpec model).
104 | """
105 | data = products if isinstance(products, list) else [products]
106 | logger.info(f"Creating {len(data)} data product asset(s)")
107 | logger.debug(f"Product specifications: {data}")
108 |
109 | # Validation for asset_guids is now handled by DataProductSpec model
110 | specs = [DataProductSpec(**item) for item in data]
111 |
112 | assets: List[DataProduct] = []
113 | for spec in specs:
114 | logger.debug(
115 | f"Creating DataProduct: {spec.name} under {spec.domain_qualified_name}"
116 | )
117 | logger.debug(f"Linking {len(spec.asset_guids)} asset(s) to product")
118 |
119 | # Build FluentSearch to select assets by their GUIDs
120 | asset_selection = (
121 | FluentSearch()
122 | .where(CompoundQuery.active_assets())
123 | .where(Asset.GUID.within(spec.asset_guids))
124 | ).to_request()
125 |
126 | product = DataProduct.creator(
127 | name=spec.name,
128 | domain_qualified_name=spec.domain_qualified_name,
129 | asset_selection=asset_selection,
130 | )
131 | product.user_description = spec.user_description
132 | product.certificate_status = (
133 | spec.certificate_status.value if spec.certificate_status else None
134 | )
135 |
136 | if spec.certificate_status:
137 | logger.debug(
138 | f"Set certificate status for {spec.name}: {spec.certificate_status.value}"
139 | )
140 |
141 | assets.append(product)
142 |
143 | return save_assets(assets)
144 |
```
--------------------------------------------------------------------------------
/CHANGELOG.md:
--------------------------------------------------------------------------------
```markdown
1 | # Changelog
2 |
3 | All notable changes to this project will be documented in this file.
4 |
5 | The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
6 | and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
7 |
8 | ## [0.3.0] - 2025-12-03
9 | - Added new tools:
10 | - `create_data_domain_assets`
11 | - `create_data_product_assets`
12 | - `create_dq_rules`
13 |
14 | ## [0.2.12] - 2025-11-14
15 |
16 | ### Changed
17 | - Added explicit uvicorn>=0.35.0 dependency to ensure compatibility with FastMCP 2.13.0.2
18 |
19 | ## [0.2.11] - 2025-11-07
20 |
21 | ### Changed
22 | - Upgraded FastMCP dependency from 2.11.0 to 2.13.0.2
23 |
24 | ## [0.2.10] - 2025-10-13
25 |
26 | ### Fixed
27 | - Fixed a bug in `update_assets_tool` where glossary terms and categories could not be updated because the glossary GUID parameter was not being sent - now properly includes the glossary GUID as a required parameter for these asset types
28 |
29 | ## [0.2.9] - 2025-09-22
30 |
31 | ### Fixed
32 | - Transport configuration not working when installed via PyPI and executed using `uvx atlan-mcp-server` - server would ignore environment variables and command-line arguments, always defaulting to stdio mode
33 |
34 | ## [0.2.8] - 2025-09-15
35 |
36 | ### Added
37 | - Term linking functionality for improved glossary term relationships (#138)
38 | - Enhanced search tool with popularity attributes context and examples (#132)
39 | - Comprehensive MCP transport mode documentation (#124)
40 |
41 | ### Changed
42 | - Implemented client singleton pattern for improved connection pool reuse and performance (#131)
43 | - Enhanced Docker configuration with improved .dockerignore settings (#127)
44 |
45 | ## [0.2.7] - 2025-09-02
46 |
47 | ### Added
48 | - Configurable tool access control via `RESTRICTED_TOOLS` environment variable
49 | - Organizations can now restrict any combination of tools for MCP clients
50 |
51 |
52 | ## [0.2.6] - 2025-08-19
53 |
54 | ### Added
55 | - Glossary management tools to streamline glossary creation and management:
56 | - `create_glossaries`: Create top-level glossaries with metadata (name, `user_description`, optional `certificate_status`)
57 | - `create_glossary_terms`: Add individual terms to an existing glossary; supports `user_description`, optional `certificate_status`, and `category_guids`
58 | - `create_glossary_categories`: Add categories (and nested subcategories) anchored to a glossary or parent category; supports `user_description` and optional `certificate_status`
59 | - Bulk creation support across glossaries, terms, and categories to enable scalable glossary builds
60 | - Foundation for automated, structured glossary generation from unstructured content
61 |
62 |
63 | ## [0.2.5] - 2025-08-05
64 |
65 | ### Changed
66 | - Enhanced `search_assets_tool` documentation and usage examples for `connection_qualified_name` parameter
67 | - Added `connection_qualified_name` parameter to example function calls for missing descriptions and multiple asset types searches
68 |
69 | ## [0.2.4] - 2025-07-24
70 |
71 | ### Added
72 | - Enhanced lineage traversal tool with configurable attribute inclusion support
73 | - `include_attributes` parameter in `traverse_lineage_tool` allowing users to specify additional attributes beyond defaults
74 | - Default attributes are now always included: name, display_name, description, qualified_name, user_description, certificate_status, owner_users, owner_groups, connector_name, has_lineage, source_created_at, source_updated_at, readme, asset_tags
75 |
76 | ### Changed
77 | - Improved lineage tool return format to standardized dictionary structure with `assets` and `error` keys
78 | - Enhanced lineage processing using Pydantic serialization with `dict(by_alias=True, exclude_unset=True)` for consistent API responses
79 | - Updated `immediate_neighbors` default value from `True` to `False` to align with underlying FluentLineage behavior
80 | - Better error handling and logging throughout lineage traversal operations
81 |
82 | ### Fixed
83 | - Lineage tool now returns richer attribute information instead of just default minimal attributes
84 | - Resolved issue where lineage results only contained basic metadata without requested additional attributes
85 |
86 | ## [0.2.3] - 2025-07-16
87 |
88 | ### Added
89 | - Expanded docstring attributes for LLM context in `server.py` for improved clarity and developer experience
90 |
91 | ### Changed
92 | - Major documentation and README refactoring for easier setup and integration with Claude Desktop and Cursor, including clearer configuration examples and troubleshooting guidance
93 |
94 | ### Fixed
95 | - Made `ATLAN_MCP_USER_AGENT` dynamic in `settings.py` to always reflect the current MCP server version in API requests
96 |
97 | ## [0.2.2] - 2025-06-23
98 |
99 | ### Added
100 | - Multi-architecture build support for Docker images (ARM64 and AMD64)
101 | - README support for asset updates - allows updating asset documentation/readme using markdown content
102 | - Enhanced parameter parsing utilities for better Claude Desktop integration
103 |
104 | ### Fixed
105 | - Search and Update Assets Tool compatibility issues with Claude Desktop
106 | - String input parsing from Claude Desktop for better tool interaction
107 | - Parameter validation and error handling improvements
108 |
109 | ### Changed
110 | - Upgraded FastMCP dependency version for improved performance and stability
111 | - Enhanced tool parameter processing with better error handling
112 | - Improved asset update functionality with support for README content management
113 |
114 | ## [0.2.1] - 2025-05-24
115 |
116 | ### Added
117 | - Advanced search operators support in `search_assets` including `contains`, `between`, and case-insensitive comparisons
118 | - Default attributes for search results via `DEFAULT_SEARCH_ATTRIBUTES` constant with dynamic user-specified attribute support
119 | - Enhanced "some conditions" handling with support for advanced operators and case-insensitive logic
120 | - New search examples demonstrating OR logic for multiple type names and glossary term searches by specific attributes
121 |
122 | ### Changed
123 | - Integrated `SearchUtils` for centralized and consistent search result processing
124 | - Improved search API flexibility and precision with advanced query capabilities
125 |
126 | ### Fixed
127 | - Release workflow changelog generation issues that previously caused empty release notes
128 | - Improved commit range calculation and error handling in GitHub Actions workflow
129 |
130 | ## [0.2.0] - 2025-05-17
131 |
132 | ### Added
133 | - Support for new transport modes: streamable HTTP and SSE
134 | - MCP server executable script (`atlan-mcp-server`)
135 | - Improved Docker image with non-root user and security enhancements
136 |
137 | ### Changed
138 | - Made MCP server an installable package
139 | - Updated dependencies and bumped versions
140 | - Improved build process for faster Docker builds
141 | - Restructured release workflow for better isolation and PR-based releases
142 |
143 | ### Fixed
144 | - Various minor bugs and stability issues
145 |
146 | ### Documentation
147 | - Updated setup and usage instructions
148 | - Added more comprehensive examples
149 |
150 |
151 | ## [0.1.0] - 2024-05-05
152 |
153 | ### Added
154 | - Initial release of Atlan MCP Server
155 | - Basic functionality for integrating with Atlan
156 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/utils/search.py:
--------------------------------------------------------------------------------
```python
1 | from typing import Dict, Any
2 | import logging
3 | from pyatlan.model.assets import Asset
4 |
5 | logger = logging.getLogger(__name__)
6 |
7 |
8 | class SearchUtils:
9 | @staticmethod
10 | def process_results(results: Any) -> Dict[str, Any]:
11 | """
12 | Process the results from the search index using Pydantic serialization.
13 |
14 | This method uses Pydantic's .dict(by_alias=True, exclude_unset=True) to:
15 | - Convert field names to their API-friendly camelCase format (by_alias=True)
16 | - Exclude any fields that weren't explicitly set (exclude_unset=True)
17 |
18 | Args:
19 | results: The search results from Atlan
20 |
21 | Returns:
22 | Dict[str, Any]: Dictionary containing:
23 | - results: List of processed results
24 | - aggregations: Search aggregations if available
25 | - error: None if no error occurred, otherwise the error message
26 | """
27 | current_page_results = (
28 | results.current_page()
29 | if hasattr(results, "current_page") and callable(results.current_page)
30 | else []
31 | )
32 | aggregations = results.aggregations
33 |
34 | logger.info(f"Processing {len(current_page_results)} search results")
35 | results_list = [
36 | result.dict(by_alias=True, exclude_unset=True)
37 | for result in current_page_results
38 | if result is not None
39 | ]
40 |
41 | return {"results": results_list, "aggregations": aggregations, "error": None}
42 |
43 | @staticmethod
44 | def _get_asset_attribute(attr_name: str):
45 | """
46 | Get Asset attribute by name.
47 | """
48 | return getattr(Asset, attr_name.upper(), None)
49 |
50 | @staticmethod
51 | def _apply_operator_condition(
52 | attr, operator: str, value: Any, case_insensitive: bool = False
53 | ):
54 | """
55 | Apply an operator condition to an attribute.
56 |
57 | Args:
58 | attr: The Asset attribute object
59 | operator (str): The operator to apply
60 | value: The value for the condition
61 | case_insensitive (bool): Whether to apply case insensitive matching
62 |
63 | Returns:
64 | The condition object to be used with where/where_not/where_some
65 |
66 | Raises:
67 | ValueError: If the operator is unknown or value format is invalid
68 | """
69 | logger.debug(
70 | f"Applying operator '{operator}' with value '{value}' (case_insensitive={case_insensitive})"
71 | )
72 |
73 | if operator == "startswith":
74 | return attr.startswith(value, case_insensitive=case_insensitive)
75 | elif operator == "match":
76 | return attr.match(value)
77 | elif operator == "eq":
78 | return attr.eq(value, case_insensitive=case_insensitive)
79 | elif operator == "neq":
80 | return attr.neq(value, case_insensitive=case_insensitive)
81 | elif operator == "gte":
82 | return attr.gte(value)
83 | elif operator == "lte":
84 | return attr.lte(value)
85 | elif operator == "gt":
86 | return attr.gt(value)
87 | elif operator == "lt":
88 | return attr.lt(value)
89 | elif operator == "has_any_value":
90 | return attr.has_any_value()
91 | elif operator == "contains":
92 | return attr.contains(value, case_insensitive=case_insensitive)
93 | elif operator == "between":
94 | # Expecting value to be a list/tuple with [start, end]
95 | if isinstance(value, (list, tuple)) and len(value) == 2:
96 | return attr.between(value[0], value[1])
97 | else:
98 | raise ValueError(
99 | f"Invalid value format for 'between' operator: {value}, expected [start, end]"
100 | )
101 | else:
102 | # Try to get the operator method from the attribute
103 | op_method = getattr(attr, operator, None)
104 | if op_method is None:
105 | raise ValueError(f"Unknown operator: {operator}")
106 |
107 | # Try to pass case_insensitive if the method supports it
108 | try:
109 | return op_method(value, case_insensitive=case_insensitive)
110 | except TypeError:
111 | # Fallback if case_insensitive is not supported
112 | return op_method(value)
113 |
114 | @staticmethod
115 | def _process_condition(
116 | search, attr, condition, attr_name: str, search_method_name: str
117 | ):
118 | """
119 | Process a single condition and apply it to the search using the specified method.
120 |
121 | Args:
122 | search: The FluentSearch object
123 | attr: The Asset attribute object
124 | condition: The condition value (dict, list, or simple value)
125 | attr_name (str): The attribute name for logging
126 | search_method_name (str): The search method to use ('where', 'where_not', 'where_some')
127 |
128 | Returns:
129 | FluentSearch: The updated search object
130 | """
131 | search_method = getattr(search, search_method_name)
132 |
133 | if isinstance(condition, dict):
134 | operator = condition.get("operator", "eq")
135 | value = condition.get("value")
136 | case_insensitive = condition.get("case_insensitive", False)
137 |
138 | try:
139 | condition_obj = SearchUtils._apply_operator_condition(
140 | attr, operator, value, case_insensitive
141 | )
142 | search = search_method(condition_obj)
143 | return search
144 | except ValueError as e:
145 | logger.warning(f"Skipping condition for {attr_name}: {e}")
146 | return search
147 | elif isinstance(condition, list):
148 | if search_method_name == "where_some":
149 | # Handle multiple values for where_some
150 | logger.debug(
151 | f"Adding multiple '{search_method_name}' values for {attr_name}: {condition}"
152 | )
153 | for value in condition:
154 | search = search_method(attr.eq(value))
155 | return search
156 | else:
157 | # Handle list of values with OR logic using .within()
158 | logger.debug(f"Applying multiple values for {attr_name}: {condition}")
159 | search = search_method(attr.within(condition))
160 | return search
161 | elif condition == "has_any_value" and search_method_name == "where_not":
162 | # Special case for has_any_value in negative conditions
163 | logger.debug(f"Excluding assets where {attr_name} has any value")
164 | search = search_method(attr.has_any_value())
165 | return search
166 | else:
167 | # Default to equality operator
168 | logger.debug(
169 | f"Applying {search_method_name} equality condition {attr_name}={condition}"
170 | )
171 | search = search_method(attr.eq(condition))
172 | return search
173 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/tools/glossary.py:
--------------------------------------------------------------------------------
```python
1 | from __future__ import annotations
2 | import logging
3 | from typing import Dict, Any, List, Union
4 |
5 | from pyatlan.model.assets import (
6 | AtlasGlossary,
7 | AtlasGlossaryCategory,
8 | AtlasGlossaryTerm,
9 | )
10 |
11 | from utils import parse_list_parameter, save_assets
12 | from .models import (
13 | Glossary,
14 | GlossaryCategory,
15 | GlossaryTerm,
16 | )
17 |
18 | logger = logging.getLogger(__name__)
19 |
20 |
21 | def create_glossary_assets(
22 | glossaries: Union[Dict[str, Any], List[Dict[str, Any]]],
23 | ) -> List[Dict[str, Any]]:
24 | """
25 | Create one or multiple AtlasGlossary assets in Atlan.
26 |
27 | Args:
28 | glossaries (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single glossary
29 | specification (dict) or a list of glossary specifications. Each specification
30 | can be a dictionary containing:
31 | - name (str): Name of the glossary (required)
32 | - user_description (str, optional): Detailed description of the glossary
33 | proposed by the user
34 | - certificate_status (str, optional): Certification status
35 | ("VERIFIED", "DRAFT", or "DEPRECATED")
36 |
37 | Returns:
38 | List[Dict[str, Any]]: List of dictionaries, each with details for a created glossary:
39 | - guid: The GUID of the created glossary
40 | - name: The name of the glossary
41 | - qualified_name: The qualified name of the created glossary
42 |
43 | Raises:
44 | Exception: If there's an error creating the glossary assets.
45 | """
46 | data = glossaries if isinstance(glossaries, list) else [glossaries]
47 | logger.info(f"Creating {len(data)} glossary asset(s)")
48 | logger.debug(f"Glossary specifications: {data}")
49 |
50 | specs = [Glossary(**item) for item in data]
51 |
52 | assets: List[AtlasGlossary] = []
53 | for spec in specs:
54 | logger.debug(f"Creating AtlasGlossary for: {spec.name}")
55 | glossary = AtlasGlossary.creator(name=spec.name)
56 | glossary.user_description = spec.user_description
57 | if spec.certificate_status:
58 | glossary.certificate_status = spec.certificate_status.value
59 | logger.debug(
60 | f"Set certificate status for {spec.name}: {spec.certificate_status.value}"
61 | )
62 | assets.append(glossary)
63 |
64 | return save_assets(assets)
65 |
66 |
67 | def create_glossary_category_assets(
68 | categories: Union[Dict[str, Any], List[Dict[str, Any]]],
69 | ) -> List[Dict[str, Any]]:
70 | """
71 | Create one or multiple AtlasGlossaryCategory assets in Atlan.
72 |
73 | Args:
74 | categories (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single category
75 | specification (dict) or a list of category specifications. Each specification
76 | can be a dictionary containing:
77 | - name (str): Name of the category (required)
78 | - glossary_guid (str): GUID of the glossary this category belongs to (required)
79 | - user_description (str, optional): Detailed description of the category
80 | proposed by the user
81 | - certificate_status (str, optional): Certification status
82 | ("VERIFIED", "DRAFT", or "DEPRECATED")
83 | - parent_category_guid (str, optional): GUID of the parent category if this
84 | is a subcategory
85 |
86 | Returns:
87 | List[Dict[str, Any]]: List of dictionaries, each with details for a created category:
88 | - guid: The GUID of the created category
89 | - name: The name of the category
90 | - qualified_name: The qualified name of the created category
91 |
92 | Raises:
93 | Exception: If there's an error creating the glossary category assets.
94 | """
95 | data = categories if isinstance(categories, list) else [categories]
96 | logger.info(f"Creating {len(data)} glossary category asset(s)")
97 | logger.debug(f"Category specifications: {data}")
98 |
99 | specs = [GlossaryCategory(**item) for item in data]
100 |
101 | assets: List[AtlasGlossaryCategory] = []
102 | for spec in specs:
103 | logger.debug(f"Creating AtlasGlossaryCategory for: {spec.name}")
104 | anchor = AtlasGlossary.ref_by_guid(spec.glossary_guid)
105 | category = AtlasGlossaryCategory.creator(
106 | name=spec.name,
107 | anchor=anchor,
108 | parent_category=(
109 | AtlasGlossaryCategory.ref_by_guid(spec.parent_category_guid)
110 | if spec.parent_category_guid
111 | else None
112 | ),
113 | )
114 | category.user_description = spec.user_description
115 | if spec.certificate_status:
116 | category.certificate_status = spec.certificate_status.value
117 | logger.debug(
118 | f"Set certificate status for {spec.name}: {spec.certificate_status.value}"
119 | )
120 |
121 | assets.append(category)
122 |
123 | return save_assets(assets)
124 |
125 |
126 | def create_glossary_term_assets(
127 | terms: Union[Dict[str, Any], List[Dict[str, Any]]],
128 | ) -> List[Dict[str, Any]]:
129 | """
130 | Create one or multiple AtlasGlossaryTerm assets in Atlan.
131 |
132 | Args:
133 | terms (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single term
134 | specification (dict) or a list of term specifications. Each specification
135 | can be a dictionary containing:
136 | - name (str): Name of the term (required)
137 | - glossary_guid (str): GUID of the glossary this term belongs to (required)
138 | - user_description (str, optional): Detailed description of the term
139 | proposed by the user
140 | - certificate_status (str, optional): Certification status
141 | ("VERIFIED", "DRAFT", or "DEPRECATED")
142 | - category_guids (List[str], optional): List of category GUIDs this term
143 | belongs to
144 |
145 | Returns:
146 | List[Dict[str, Any]]: List of dictionaries, each with details for a created term:
147 | - guid: The GUID of the created term
148 | - name: The name of the term
149 | - qualified_name: The qualified name of the created term
150 |
151 | Raises:
152 | ValueError: If any provided category_guids are not found.
153 | Exception: If there's an error creating the glossary term assets.
154 | """
155 | data = terms if isinstance(terms, list) else [terms]
156 | logger.info(f"Creating {len(data)} glossary term asset(s)")
157 | logger.debug(f"Term specifications: {data}")
158 |
159 | specs = [GlossaryTerm(**item) for item in data]
160 | per_term_guids = [set(parse_list_parameter(s.category_guids) or []) for s in specs]
161 |
162 | assets: List[AtlasGlossaryTerm] = []
163 | for spec, guids in zip(specs, per_term_guids):
164 | term = AtlasGlossaryTerm.creator(
165 | name=spec.name,
166 | anchor=AtlasGlossary.ref_by_guid(spec.glossary_guid),
167 | categories=[AtlasGlossaryCategory.ref_by_guid(g) for g in guids] or None,
168 | )
169 | term.user_description = spec.user_description
170 | if spec.certificate_status:
171 | term.certificate_status = spec.certificate_status.value
172 | assets.append(term)
173 |
174 | return save_assets(assets)
175 |
```
--------------------------------------------------------------------------------
/.github/workflows/mcp-server-release.yml:
--------------------------------------------------------------------------------
```yaml
1 | name: MCP-Release
2 |
3 | on:
4 | pull_request:
5 | types: [closed]
6 | branches:
7 | - main
8 |
9 | jobs:
10 | prepare-release:
11 | # Only run when a PR with the "release" label is merged
12 | if: github.event.pull_request.merged == true && contains(github.event.pull_request.labels.*.name, 'release')
13 | runs-on: ubuntu-latest
14 | permissions:
15 | contents: write
16 | outputs:
17 | version: ${{ steps.get_version.outputs.version }}
18 | should_release: ${{ steps.check_tag.outputs.exists == 'false' }}
19 | steps:
20 | - name: Checkout
21 | uses: actions/checkout@v4
22 | with:
23 | fetch-depth: 0
24 |
25 | - name: Get version
26 | id: get_version
27 | run: |
28 | VERSION=$(grep -m 1 "__version__" modelcontextprotocol/version.py | cut -d'"' -f2)
29 | echo "version=$VERSION" >> $GITHUB_OUTPUT
30 | echo "Found version: $VERSION"
31 |
32 | - name: Check if tag exists
33 | id: check_tag
34 | run: |
35 | TAG_NAME="v${{ steps.get_version.outputs.version }}"
36 | if git rev-parse "$TAG_NAME" >/dev/null 2>&1; then
37 | echo "Tag $TAG_NAME already exists, stopping workflow"
38 | echo "exists=true" >> $GITHUB_OUTPUT
39 | else
40 | echo "Tag $TAG_NAME does not exist, continuing workflow"
41 | echo "exists=false" >> $GITHUB_OUTPUT
42 | fi
43 |
44 | - name: Generate changelog entry
45 | id: changelog
46 | if: steps.check_tag.outputs.exists == 'false'
47 | run: |
48 | set +e
49 |
50 | VERSION="${{ steps.get_version.outputs.version }}"
51 | RELEASE_DATE=$(date +"%Y-%m-%d")
52 |
53 | echo "Generating changelog for version $VERSION ($RELEASE_DATE)"
54 |
55 | # Get the previous version tag
56 | PREV_TAG=$(git describe --tags --abbrev=0 HEAD~1 2>/dev/null || echo "")
57 |
58 | if [ -z "$PREV_TAG" ]; then
59 | # If no previous tag, get the first commit
60 | FIRST_COMMIT=$(git rev-list --max-parents=0 HEAD)
61 | RANGE="$FIRST_COMMIT..HEAD"
62 | echo "Using range from first commit to HEAD"
63 | else
64 | RANGE="$PREV_TAG..HEAD"
65 | echo "Using range from $PREV_TAG to HEAD"
66 | fi
67 |
68 | # Create temporary changelog entry for RELEASE_NOTES.md
69 | echo "## [$VERSION] - $RELEASE_DATE" > RELEASE_NOTES.md
70 | echo "" >> RELEASE_NOTES.md
71 |
72 | # Add features
73 | git log $RANGE --format="* %s (%h)" --grep="^feat" --perl-regexp --no-merges 2>/dev/null > features.txt || touch features.txt
74 |
75 | if [ -s features.txt ]; then
76 | echo "### Added" >> RELEASE_NOTES.md
77 | echo "" >> RELEASE_NOTES.md
78 | sed 's/^\* feat[[:space:]]*\([^:]*\):[[:space:]]*/* /' features.txt >> RELEASE_NOTES.md
79 | echo "" >> RELEASE_NOTES.md
80 | fi
81 |
82 | # Add fixes
83 | git log $RANGE --format="* %s (%h)" --grep="^fix" --perl-regexp --no-merges 2>/dev/null > fixes.txt || touch fixes.txt
84 |
85 | if [ -s fixes.txt ]; then
86 | echo "### Fixed" >> RELEASE_NOTES.md
87 | echo "" >> RELEASE_NOTES.md
88 | sed 's/^\* fix[[:space:]]*\([^:]*\):[[:space:]]*/* /' fixes.txt >> RELEASE_NOTES.md
89 | echo "" >> RELEASE_NOTES.md
90 | fi
91 |
92 | # Add other changes (excluding merge commits, chore, docs, style, refactor, test, ci)
93 | git log $RANGE --format="* %s (%h)" --no-merges 2>/dev/null | \
94 | grep -v -E "^\* (feat|fix|chore|docs|style|refactor|test|ci)(\(.*\))?:" > others.txt || touch others.txt
95 |
96 | if [ -s others.txt ]; then
97 | echo "### Changed" >> RELEASE_NOTES.md
98 | echo "" >> RELEASE_NOTES.md
99 | cat others.txt >> RELEASE_NOTES.md
100 | echo "" >> RELEASE_NOTES.md
101 | fi
102 |
103 | # If no specific changes found, add a simple entry
104 | if [ ! -s features.txt ] && [ ! -s fixes.txt ] && [ ! -s others.txt ]; then
105 | echo "### Changed" >> RELEASE_NOTES.md
106 | echo "" >> RELEASE_NOTES.md
107 | echo "* Release version $VERSION" >> RELEASE_NOTES.md
108 | echo "" >> RELEASE_NOTES.md
109 | fi
110 |
111 | # Clean up temporary files
112 | rm -f features.txt fixes.txt others.txt
113 |
114 | echo "Release notes generated successfully"
115 | echo "================================"
116 | cat RELEASE_NOTES.md
117 | echo "================================"
118 |
119 | - name: Create Tag
120 | if: steps.check_tag.outputs.exists == 'false'
121 | run: |
122 | git tag v${{ steps.get_version.outputs.version }}
123 | git push --tags
124 |
125 | - name: Create GitHub Release
126 | if: steps.check_tag.outputs.exists == 'false'
127 | uses: softprops/action-gh-release@v2
128 | with:
129 | tag_name: v${{ steps.get_version.outputs.version }}
130 | body_path: RELEASE_NOTES.md
131 | token: ${{ secrets.GITHUB_TOKEN }}
132 | draft: false
133 | prerelease: false
134 |
135 | # Upload release notes for other jobs to use
136 | - name: Upload release notes
137 | if: steps.check_tag.outputs.exists == 'false'
138 | uses: actions/upload-artifact@v4
139 | with:
140 | name: release-notes
141 | path: RELEASE_NOTES.md
142 | retention-days: 1
143 |
144 | publish-pypi:
145 | needs: prepare-release
146 | if: needs.prepare-release.outputs.should_release == 'true'
147 | runs-on: ubuntu-latest
148 | permissions:
149 | contents: read
150 | steps:
151 | - name: Checkout
152 | uses: actions/checkout@v4
153 | with:
154 | ref: v${{ needs.prepare-release.outputs.version }}
155 |
156 | - name: Set up Python
157 | uses: actions/setup-python@v5
158 | with:
159 | python-version: '3.11'
160 |
161 | - name: Install build dependencies
162 | run: |
163 | python -m pip install --upgrade pip
164 | pip install build wheel twine
165 |
166 | - name: Build package
167 | run: |
168 | cd modelcontextprotocol
169 | python -m build
170 |
171 | - name: Publish to PyPI
172 | env:
173 | TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }}
174 | TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
175 | run: |
176 | cd modelcontextprotocol
177 | twine upload dist/*
178 |
179 | publish-docker:
180 | needs: prepare-release
181 | if: needs.prepare-release.outputs.should_release == 'true'
182 | runs-on: ubuntu-latest
183 | permissions:
184 | contents: read
185 | packages: write
186 | steps:
187 | - name: Checkout
188 | uses: actions/checkout@v4
189 | with:
190 | ref: v${{ needs.prepare-release.outputs.version }}
191 |
192 | - name: Set up QEMU for Cross-Platform Builds
193 | uses: docker/setup-qemu-action@v3
194 |
195 | - name: Set up Docker Buildx
196 | uses: docker/setup-buildx-action@v3
197 |
198 | - name: Login to GitHub Container Registry
199 | uses: docker/login-action@v3
200 | with:
201 | registry: ghcr.io
202 | username: ${{ github.actor }}
203 | password: ${{ secrets.GITHUB_TOKEN }}
204 |
205 | - name: Build and push Docker image
206 | uses: docker/build-push-action@v5
207 | with:
208 | context: ./modelcontextprotocol/
209 | push: true
210 | tags: |
211 | ghcr.io/atlanhq/atlan-mcp-server:latest
212 | ghcr.io/atlanhq/atlan-mcp-server:${{ needs.prepare-release.outputs.version }}
213 | platforms: |
214 | linux/amd64
215 | linux/arm64
216 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/tools/assets.py:
--------------------------------------------------------------------------------
```python
1 | import logging
2 | from typing import List, Union, Dict, Any
3 | from client import get_atlan_client
4 | from .models import (
5 | UpdatableAsset,
6 | UpdatableAttribute,
7 | CertificateStatus,
8 | TermOperation,
9 | TermOperations,
10 | )
11 | from pyatlan.model.assets import Readme, AtlasGlossaryTerm, AtlasGlossaryCategory
12 | from pyatlan.model.fluent_search import CompoundQuery, FluentSearch
13 |
14 | # Initialize logging
15 | logger = logging.getLogger(__name__)
16 |
17 |
18 | def update_assets(
19 | updatable_assets: Union[UpdatableAsset, List[UpdatableAsset]],
20 | attribute_name: UpdatableAttribute,
21 | attribute_values: List[Union[str, CertificateStatus, TermOperations]],
22 | ) -> Dict[str, Any]:
23 | """
24 | Update one or multiple assets with different values for attributes or term operations.
25 |
26 | Args:
27 | updatable_assets (Union[UpdatableAsset, List[UpdatableAsset]]): Asset(s) to update.
28 | Can be a single UpdatableAsset or a list of UpdatableAssets.
29 | For asset of type_name=AtlasGlossaryTerm or type_name=AtlasGlossaryCategory, each asset dictionary MUST include a "glossary_guid" key which is the GUID of the glossary that the term belongs to.
30 | attribute_name (UpdatableAttribute): Name of the attribute to update.
31 | Supports userDescription, certificateStatus, readme, and term.
32 | attribute_values (List[Union[str, CertificateStatus, TermOperations]]): List of values to set for the attribute.
33 | For certificateStatus, only VERIFIED, DRAFT, or DEPRECATED are allowed.
34 | For readme, the value must be a valid Markdown string.
35 | For term, the value must be a TermOperations object with operation and term_guids.
36 |
37 | Returns:
38 | Dict[str, Any]: Dictionary containing:
39 | - updated_count: Number of assets successfully updated
40 | - errors: List of any errors encountered
41 | - operation: The operation that was performed (for term operations)
42 | """
43 | try:
44 | # Convert single asset to list for consistent handling
45 | if not isinstance(updatable_assets, list):
46 | updatable_assets = [updatable_assets]
47 |
48 | logger.info(
49 | f"Updating {len(updatable_assets)} assets with attribute '{attribute_name}'"
50 | )
51 |
52 | # Validate attribute values
53 | if len(updatable_assets) != len(attribute_values):
54 | error_msg = "Number of asset GUIDs must match number of attribute values"
55 | logger.error(error_msg)
56 | return {"updated_count": 0, "errors": [error_msg]}
57 |
58 | # Initialize result tracking
59 | result = {"updated_count": 0, "errors": []}
60 |
61 | # Validate certificate status values if applicable
62 | if attribute_name == UpdatableAttribute.CERTIFICATE_STATUS:
63 | for value in attribute_values:
64 | if value not in CertificateStatus.__members__.values():
65 | error_msg = f"Invalid certificate status: {value}"
66 | logger.error(error_msg)
67 | result["errors"].append(error_msg)
68 |
69 | # Get Atlan client
70 | client = get_atlan_client()
71 |
72 | # Create assets with updated values
73 | assets = []
74 | # readme_update_parent_assets: Assets that were updated with readme.
75 | readme_update_parent_assets = []
76 | for index, updatable_asset in enumerate(updatable_assets):
77 | type_name = updatable_asset.type_name
78 | qualified_name = updatable_asset.qualified_name
79 | asset_cls = getattr(
80 | __import__("pyatlan.model.assets", fromlist=[type_name]), type_name
81 | )
82 |
83 | # Special handling for Glossary Term updates
84 | if (
85 | updatable_asset.type_name == AtlasGlossaryTerm.__name__
86 | or updatable_asset.type_name == AtlasGlossaryCategory.__name__
87 | ):
88 | asset = asset_cls.updater(
89 | qualified_name=updatable_asset.qualified_name,
90 | name=updatable_asset.name,
91 | glossary_guid=updatable_asset.glossary_guid,
92 | )
93 | else:
94 | asset = asset_cls.updater(
95 | qualified_name=updatable_asset.qualified_name,
96 | name=updatable_asset.name,
97 | )
98 |
99 | # Special handling for README updates
100 | if attribute_name == UpdatableAttribute.README:
101 | # Get the current readme content for the asset
102 | # The below query is used to get the asset based on the qualified name and include the readme content.
103 | asset_readme_response = (
104 | FluentSearch()
105 | .select()
106 | .where(CompoundQuery.asset_type(asset_cls))
107 | .where(asset_cls.QUALIFIED_NAME.eq(qualified_name))
108 | .include_on_results(asset_cls.README)
109 | .include_on_relations(Readme.DESCRIPTION)
110 | .execute(client=client)
111 | )
112 |
113 | if first := asset_readme_response.current_page():
114 | updated_content = attribute_values[index]
115 | # We replace the existing readme content with the new content.
116 | # If the existing readme content is not present, we create a new readme asset.
117 | updated_readme = Readme.creator(
118 | asset=first[0], content=updated_content
119 | )
120 | # Save the readme asset
121 | assets.append(updated_readme)
122 | # Add the parent/actual asset to the list of assets that were updated with readme.
123 | readme_update_parent_assets.append(asset)
124 | elif attribute_name == UpdatableAttribute.TERM:
125 | # Special handling for term operations
126 | term_value = attribute_values[index]
127 | if not isinstance(term_value, TermOperations):
128 | error_msg = f"Term value must be a TermOperations object for asset {updatable_asset.qualified_name}"
129 | logger.error(error_msg)
130 | result["errors"].append(error_msg)
131 | continue
132 |
133 | term_operation = TermOperation(term_value.operation.lower())
134 | term_guids = term_value.term_guids
135 |
136 | # Create term references
137 | term_refs = [
138 | AtlasGlossaryTerm.ref_by_guid(guid=guid) for guid in term_guids
139 | ]
140 |
141 | try:
142 | # Perform the appropriate term operation
143 | if term_operation == TermOperation.APPEND:
144 | client.asset.append_terms(
145 | asset_type=asset_cls,
146 | qualified_name=updatable_asset.qualified_name,
147 | terms=term_refs,
148 | )
149 | elif term_operation == TermOperation.REPLACE:
150 | client.asset.replace_terms(
151 | asset_type=asset_cls,
152 | qualified_name=updatable_asset.qualified_name,
153 | terms=term_refs,
154 | )
155 | elif term_operation == TermOperation.REMOVE:
156 | client.asset.remove_terms(
157 | asset_type=asset_cls,
158 | qualified_name=updatable_asset.qualified_name,
159 | terms=term_refs,
160 | )
161 |
162 | result["updated_count"] += 1
163 | logger.info(
164 | f"Successfully {term_operation.value}d terms on asset: {updatable_asset.qualified_name}"
165 | )
166 |
167 | except Exception as e:
168 | error_msg = f"Error updating terms on asset {updatable_asset.qualified_name}: {str(e)}"
169 | logger.error(error_msg)
170 | result["errors"].append(error_msg)
171 | else:
172 | # Regular attribute update flow
173 | setattr(asset, attribute_name.value, attribute_values[index])
174 | assets.append(asset)
175 |
176 | if len(readme_update_parent_assets) > 0:
177 | result["readme_updated"] = len(readme_update_parent_assets)
178 | # Collect qualified names or other identifiers for assets that were updated with readme
179 | result["updated_readme_assets"] = [
180 | asset.qualified_name
181 | for asset in readme_update_parent_assets
182 | if hasattr(asset, "qualified_name")
183 | ]
184 | logger.info(
185 | f"Successfully updated {result['readme_updated']} readme assets: {result['updated_readme_assets']}"
186 | )
187 |
188 | # Proces response
189 | if len(assets) > 0:
190 | response = client.asset.save(assets)
191 | result["updated_count"] = len(response.guid_assignments)
192 | logger.info(f"Successfully updated {result['updated_count']} assets")
193 |
194 | return result
195 |
196 | except Exception as e:
197 | error_msg = f"Error updating assets: {str(e)}"
198 | logger.error(error_msg)
199 | return {"updated_count": 0, "errors": [error_msg]}
200 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/tools/models.py:
--------------------------------------------------------------------------------
```python
1 | import logging
2 | from enum import Enum
3 | from typing import Optional, List, Union, Dict, Any
4 |
5 | from pydantic import BaseModel, field_validator, model_validator
6 |
7 | logger = logging.getLogger(__name__)
8 |
9 |
10 | class CertificateStatus(str, Enum):
11 | """Enum for allowed certificate status values."""
12 |
13 | VERIFIED = "VERIFIED"
14 | DRAFT = "DRAFT"
15 | DEPRECATED = "DEPRECATED"
16 |
17 |
18 | class UpdatableAttribute(str, Enum):
19 | """Enum for attributes that can be updated."""
20 |
21 | USER_DESCRIPTION = "user_description"
22 | CERTIFICATE_STATUS = "certificate_status"
23 | README = "readme"
24 | TERM = "term"
25 |
26 |
27 | class TermOperation(str, Enum):
28 | """Enum for term operations on assets."""
29 |
30 | APPEND = "append"
31 | REPLACE = "replace"
32 | REMOVE = "remove"
33 |
34 |
35 | class TermOperations(BaseModel):
36 | """Model for term operations on assets."""
37 |
38 | operation: TermOperation
39 | term_guids: List[str]
40 |
41 |
42 | class UpdatableAsset(BaseModel):
43 | """Class representing an asset that can be updated."""
44 |
45 | guid: str
46 | name: str
47 | qualified_name: str
48 | type_name: str
49 | user_description: Optional[str] = None
50 | certificate_status: Optional[CertificateStatus] = None
51 | glossary_guid: Optional[str] = None
52 |
53 |
54 | class Glossary(BaseModel):
55 | """Payload model for creating a glossary asset."""
56 |
57 | name: str
58 | user_description: Optional[str] = None
59 | certificate_status: Optional[CertificateStatus] = None
60 |
61 |
62 | class GlossaryCategory(BaseModel):
63 | """Payload model for creating a glossary category asset."""
64 |
65 | name: str
66 | glossary_guid: str
67 | user_description: Optional[str] = None
68 | certificate_status: Optional[CertificateStatus] = None
69 | parent_category_guid: Optional[str] = None
70 |
71 |
72 | class GlossaryTerm(BaseModel):
73 | """Payload model for creating a glossary term asset."""
74 |
75 | name: str
76 | glossary_guid: str
77 | user_description: Optional[str] = None
78 | certificate_status: Optional[CertificateStatus] = None
79 | category_guids: Optional[List[str]] = None
80 |
81 |
82 | class DataDomainSpec(BaseModel):
83 | """Payload model for creating a Data Domain or Sub Domain asset."""
84 |
85 | name: str
86 | parent_domain_qualified_name: Optional[str] = (
87 | None # if passed, will be created as a sub domain
88 | )
89 | user_description: Optional[str] = None
90 | certificate_status: Optional[CertificateStatus] = None
91 |
92 |
93 | class DataProductSpec(BaseModel):
94 | """Payload model for creating a Data Product asset."""
95 |
96 | name: str
97 | domain_qualified_name: str
98 | asset_guids: List[str] # Required: at least one asset GUID for data products
99 | user_description: Optional[str] = None
100 | certificate_status: Optional[CertificateStatus] = None
101 |
102 | @field_validator("asset_guids")
103 | @classmethod
104 | def validate_asset_guids(cls, v: List[str]) -> List[str]:
105 | """Validate that asset_guids is not empty."""
106 | if not v:
107 | raise ValueError(
108 | "Data products require at least one asset GUID. "
109 | "Please provide asset_guids to link assets to this product."
110 | )
111 | return v
112 |
113 |
114 | class DQRuleCondition(BaseModel):
115 | """Model representing a single data quality rule condition."""
116 |
117 | type: (
118 | str # Condition type (e.g., "STRING_LENGTH_BETWEEN", "REGEX_MATCH", "IN_LIST")
119 | )
120 | value: Optional[Union[str, List[str]]] = None # Single value or list of values
121 | min_value: Optional[Union[int, float]] = None # Minimum value for range conditions
122 | max_value: Optional[Union[int, float]] = None # Maximum value for range conditions
123 |
124 |
125 | class DQAssetType(str, Enum):
126 | """Enum for supported asset types for data quality rules."""
127 |
128 | TABLE = "Table"
129 | VIEW = "View"
130 | MATERIALIZED_VIEW = "MaterialisedView"
131 | SNOWFLAKE_DYNAMIC_TABLE = "SnowflakeDynamicTable"
132 |
133 |
134 | class DQRuleType(str, Enum):
135 | """Enum for supported data quality rule types."""
136 |
137 | # Completeness checks
138 | NULL_COUNT = "Null Count"
139 | NULL_PERCENTAGE = "Null Percentage"
140 | BLANK_COUNT = "Blank Count"
141 | BLANK_PERCENTAGE = "Blank Percentage"
142 |
143 | # Statistical checks
144 | MIN_VALUE = "Min Value"
145 | MAX_VALUE = "Max Value"
146 | AVERAGE = "Average"
147 | STANDARD_DEVIATION = "Standard Deviation"
148 |
149 | # Uniqueness checks
150 | UNIQUE_COUNT = "Unique Count"
151 | DUPLICATE_COUNT = "Duplicate Count"
152 |
153 | # Validity checks
154 | REGEX = "Regex"
155 | STRING_LENGTH = "String Length"
156 | VALID_VALUES = "Valid Values"
157 |
158 | # Timeliness checks
159 | FRESHNESS = "Freshness"
160 |
161 | # Volume checks
162 | ROW_COUNT = "Row Count"
163 |
164 | # Custom checks
165 | CUSTOM_SQL = "Custom SQL"
166 |
167 | def get_rule_config(self) -> Dict[str, Any]:
168 | """
169 | Get complete configuration for this rule type.
170 |
171 | Returns:
172 | Dict containing:
173 | - creator_method: Name of the DataQualityRule creator method to use
174 | - requires_column: Whether this rule requires column_qualified_name
175 | - supports_conditions: Whether this rule supports conditional logic
176 | """
177 | # Custom SQL rules
178 | if self == DQRuleType.CUSTOM_SQL:
179 | return {
180 | "creator_method": "custom_sql_creator",
181 | "requires_column": False,
182 | "supports_conditions": False,
183 | }
184 |
185 | # Table-level rules
186 | if self == DQRuleType.ROW_COUNT:
187 | return {
188 | "creator_method": "table_level_rule_creator",
189 | "requires_column": False,
190 | "supports_conditions": False,
191 | }
192 |
193 | # Column-level rules with conditions
194 | if self in {
195 | DQRuleType.STRING_LENGTH,
196 | DQRuleType.REGEX,
197 | DQRuleType.VALID_VALUES,
198 | }:
199 | return {
200 | "creator_method": "column_level_rule_creator",
201 | "requires_column": True,
202 | "supports_conditions": True,
203 | }
204 |
205 | # Standard column-level rules
206 | return {
207 | "creator_method": "column_level_rule_creator",
208 | "requires_column": True,
209 | "supports_conditions": False,
210 | }
211 |
212 |
213 | class DQRuleSpecification(BaseModel):
214 | """
215 | Comprehensive model for creating any type of data quality rule.
216 |
217 | Different rule types require different fields:
218 | - Column-level rules: require column_qualified_name
219 | - Table-level rules: only require asset_qualified_name
220 | - Custom SQL rules: require custom_sql, rule_name, dimension
221 | - Rules with conditions: require rule_conditions (String Length, Regex, Valid Values)
222 | """
223 |
224 | # Core identification
225 | rule_type: DQRuleType
226 | asset_qualified_name: str
227 | asset_type: Optional[DQAssetType] = DQAssetType.TABLE # Default to Table
228 |
229 | # Column-level specific (required for most rule types except Row Count and Custom SQL)
230 | column_qualified_name: Optional[str] = None
231 |
232 | # Threshold configuration
233 | threshold_value: Optional[Union[int, float]] = None
234 | threshold_compare_operator: Optional[str] = None # "EQUAL", "GREATER_THAN", etc.
235 | threshold_unit: Optional[str] = None # "DAYS", "HOURS", "MINUTES"
236 |
237 | # Alert configuration
238 | alert_priority: Optional[str] = "NORMAL" # "LOW", "NORMAL", "URGENT"
239 |
240 | # Custom SQL specific
241 | custom_sql: Optional[str] = None
242 | rule_name: Optional[str] = None
243 | dimension: Optional[str] = None # "COMPLETENESS", "VALIDITY", etc.
244 |
245 | # Advanced configuration
246 | rule_conditions: Optional[List[DQRuleCondition]] = None
247 | row_scope_filtering_enabled: Optional[bool] = False
248 | description: Optional[str] = None
249 |
250 | @model_validator(mode="after")
251 | def validate_rule_requirements(self) -> "DQRuleSpecification":
252 | """
253 | Validate rule specification based on rule type requirements.
254 |
255 | Raises:
256 | ValueError: If required fields are missing for the rule type
257 | """
258 | errors = []
259 | config = self.rule_type.get_rule_config()
260 |
261 | # Check if column is required but missing
262 | if config["requires_column"] and not self.column_qualified_name:
263 | errors.append(f"{self.rule_type.value} requires column_qualified_name")
264 |
265 | # Custom SQL rules require specific fields
266 | if self.rule_type == DQRuleType.CUSTOM_SQL:
267 | if not self.custom_sql:
268 | errors.append("Custom SQL rules require custom_sql field")
269 | if not self.rule_name:
270 | errors.append("Custom SQL rules require rule_name field")
271 | if not self.dimension:
272 | errors.append("Custom SQL rules require dimension field")
273 |
274 | # Conditional rules should have conditions (warning only)
275 | if config["supports_conditions"] and not self.rule_conditions:
276 | logger.warning(f"{self.rule_type.value} rule created without conditions")
277 |
278 | # Freshness rules require threshold_unit
279 | if self.rule_type == DQRuleType.FRESHNESS and not self.threshold_unit:
280 | errors.append(
281 | "Freshness rules require threshold_unit (DAYS, HOURS, or MINUTES)"
282 | )
283 |
284 | # All rules require threshold_value
285 | if self.threshold_value is None:
286 | errors.append(f"{self.rule_type.value} requires threshold_value")
287 |
288 | if errors:
289 | raise ValueError("; ".join(errors))
290 |
291 | return self
292 |
293 |
294 | class CreatedRuleInfo(BaseModel):
295 | """Model representing information about a created data quality rule."""
296 |
297 | guid: str
298 | qualified_name: str
299 | rule_type: Optional[str] = None
300 |
301 |
302 | class DQRuleCreationResponse(BaseModel):
303 | """Response model for data quality rule creation operations."""
304 |
305 | created_count: int = 0
306 | created_rules: List[CreatedRuleInfo] = []
307 | errors: List[str] = []
308 |
309 |
310 | class DQRuleScheduleSpecification(BaseModel):
311 | """
312 | Specification model for scheduling data quality rules on an asset.
313 |
314 | This model defines the required parameters for scheduling DQ rule
315 | execution on a table, view, or other supported asset types.
316 |
317 | """
318 |
319 | asset_type: DQAssetType
320 | asset_name: str
321 | asset_qualified_name: str
322 | schedule_crontab: str
323 | schedule_time_zone: str
324 |
325 | @field_validator("schedule_crontab")
326 | @classmethod
327 | def validate_crontab(cls, v: str) -> str:
328 | """
329 | Validate the crontab expression format.
330 |
331 | A valid cron expression should have 5 fields:
332 | minute, hour, day of month, month, day of week.
333 | """
334 | parts = v.strip().split()
335 | if len(parts) != 5:
336 | raise ValueError(
337 | f"Invalid cron expression '{v}'. Expected 5 fields "
338 | "(minute hour day-of-month month day-of-week), got {len(parts)}."
339 | )
340 | return v.strip()
341 |
342 | @field_validator("schedule_time_zone")
343 | @classmethod
344 | def validate_timezone(cls, v: str) -> str:
345 | """Validate that a non-empty timezone string is provided."""
346 | if not v or not v.strip():
347 | raise ValueError("schedule_time_zone cannot be empty")
348 | return v.strip()
349 |
350 |
351 | class ScheduledAssetInfo(BaseModel):
352 | """
353 | Model representing information about a successfully scheduled asset.
354 |
355 | This is returned as part of the response to indicate which assets
356 | had their DQ rule schedules configured successfully.
357 | """
358 |
359 | asset_name: str
360 | asset_qualified_name: str
361 | schedule_crontab: str
362 | schedule_time_zone: str
363 |
364 |
365 | class DQRuleScheduleResponse(BaseModel):
366 | """Response model for data quality rule scheduling operations."""
367 |
368 | scheduled_count: int = 0
369 | scheduled_assets: List[ScheduledAssetInfo] = []
370 | errors: List[str] = []
371 |
372 |
373 | class DQRuleInfo(BaseModel):
374 | """Model representing a data quality rule identifier.
375 |
376 | Used for both delete operations (input) and deleted rule tracking (output).
377 | """
378 |
379 | rule_guid: str
380 |
381 |
382 | class DQRuleDeleteResponse(BaseModel):
383 | """Response model for data quality rule deletion operations."""
384 |
385 | deleted_count: int = 0
386 | deleted_rules: List[DQRuleInfo] = []
387 |
388 |
389 | class DQRuleUpdateSpecification(BaseModel):
390 | """
391 | Model for updating an existing data quality rule.
392 |
393 | Only necessary and updatable fields are included. All fields except
394 | qualified_name, rule_type, and asset_qualified_name are optional.
395 | """
396 |
397 | # Required fields for identification and validation
398 | qualified_name: str # The qualified name of the rule to update
399 | rule_type: DQRuleType # Type of rule (required for validation)
400 | asset_qualified_name: (
401 | str # Qualified name of the table/view (required for validation)
402 | )
403 |
404 | # Optional updatable fields
405 | threshold_value: Optional[Union[int, float]] = None
406 | threshold_compare_operator: Optional[str] = None # "EQUAL", "GREATER_THAN", etc.
407 | threshold_unit: Optional[str] = (
408 | None # "DAYS", "HOURS", "MINUTES" (for Freshness rules)
409 | )
410 | alert_priority: Optional[str] = None # "LOW", "NORMAL", "URGENT"
411 |
412 | # Custom SQL specific fields
413 | custom_sql: Optional[str] = None
414 | rule_name: Optional[str] = None
415 | dimension: Optional[str] = None # "COMPLETENESS", "VALIDITY", etc.
416 |
417 | # Advanced configuration
418 | rule_conditions: Optional[List[DQRuleCondition]] = None
419 | row_scope_filtering_enabled: Optional[bool] = None
420 | description: Optional[str] = None
421 |
422 |
423 | class UpdatedRuleInfo(BaseModel):
424 | """Model representing information about an updated data quality rule."""
425 |
426 | guid: str
427 | qualified_name: str
428 | rule_type: Optional[str] = None
429 |
430 |
431 | class DQRuleUpdateResponse(BaseModel):
432 | """Response model for data quality rule update operations."""
433 |
434 | updated_count: int = 0
435 | updated_rules: List[UpdatedRuleInfo] = []
436 | errors: List[str] = []
437 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/tools/search.py:
--------------------------------------------------------------------------------
```python
1 | import logging
2 | from typing import Type, List, Optional, Union, Dict, Any
3 |
4 | from client import get_atlan_client
5 | from pyatlan.model.assets import Asset, AtlasGlossaryTerm
6 | from pyatlan.model.fluent_search import CompoundQuery, FluentSearch
7 | from pyatlan.model.fields.atlan_fields import AtlanField
8 | from utils.search import SearchUtils
9 | from utils.constants import DEFAULT_SEARCH_ATTRIBUTES, VALID_RELATIONSHIPS
10 |
11 | # Configure logging
12 | logger = logging.getLogger(__name__)
13 |
14 |
15 | def search_assets(
16 | conditions: Optional[Union[Dict[str, Any], str]] = None,
17 | negative_conditions: Optional[Dict[str, Any]] = None,
18 | some_conditions: Optional[Dict[str, Any]] = None,
19 | min_somes: int = 1,
20 | include_attributes: Optional[List[Union[str, AtlanField]]] = None,
21 | asset_type: Optional[Union[Type[Asset], str]] = None,
22 | include_archived: bool = False,
23 | limit: int = 10,
24 | offset: int = 0,
25 | sort_by: Optional[str] = None,
26 | sort_order: str = "ASC",
27 | connection_qualified_name: Optional[str] = None,
28 | tags: Optional[List[str]] = None,
29 | directly_tagged: bool = True,
30 | domain_guids: Optional[List[str]] = None,
31 | date_range: Optional[Dict[str, Dict[str, Any]]] = None,
32 | guids: Optional[List[str]] = None,
33 | ) -> Dict[str, Any]:
34 | """
35 | Advanced asset search using FluentSearch with flexible conditions.
36 |
37 | By default, only essential attributes used in result processing are included.
38 | Additional attributes can be specified via include_attributes parameter.
39 |
40 | Args:
41 | conditions (Dict[str, Any], optional): Dictionary of attribute conditions to match.
42 | Format: {"attribute_name": value} or {"attribute_name": {"operator": operator, "value": value}}
43 | negative_conditions (Dict[str, Any], optional): Dictionary of attribute conditions to exclude.
44 | Format: {"attribute_name": value} or {"attribute_name": {"operator": operator, "value": value}}
45 | some_conditions (Dict[str, Any], optional): Conditions for where_some() queries that require min_somes of them to match.
46 | Format: {"attribute_name": value} or {"attribute_name": {"operator": operator, "value": value}}
47 | min_somes (int): Minimum number of some_conditions that must match. Defaults to 1.
48 | include_attributes (List[Union[str, AtlanField]], optional): List of additional attributes to include in results.
49 | Can be string attribute names or AtlanField objects. These will be added to the default set.
50 | asset_type (Union[Type[Asset], str], optional): Type of asset to search for.
51 | Either a class (e.g., Table, Column) or a string type name (e.g., "Table", "Column")
52 | include_archived (bool): Whether to include archived assets. Defaults to False.
53 | limit (int, optional): Maximum number of results to return. Defaults to 10.
54 | offset (int, optional): Offset for pagination. Defaults to 0.
55 | sort_by (str, optional): Attribute to sort by. Defaults to None.
56 | sort_order (str, optional): Sort order, "ASC" or "DESC". Defaults to "ASC".
57 | connection_qualified_name (str, optional): Connection qualified name to filter by.
58 | tags (List[str], optional): List of tags to filter by.
59 | directly_tagged (bool): Whether to filter for directly tagged assets only. Defaults to True.
60 | domain_guids (List[str], optional): List of domain GUIDs to filter by.
61 | date_range (Dict[str, Dict[str, Any]], optional): Date range filters.
62 | Format: {"attribute_name": {"gte": start_timestamp, "lte": end_timestamp}}
63 | guids (List[str], optional): List of GUIDs to filter by.
64 |
65 |
66 | Returns:
67 | Dict[str, Any]: Dictionary containing:
68 | - results: List of assets matching the search criteria
69 | - aggregations: Search aggregations if available
70 | - error: None if no error occurred, otherwise the error message
71 | """
72 | logger.info(
73 | f"Starting asset search with parameters: asset_type={asset_type}, "
74 | f"limit={limit}, include_archived={include_archived}"
75 | )
76 | logger.debug(
77 | f"Full search parameters: conditions={conditions}, "
78 | f"negative_conditions={negative_conditions}, some_conditions={some_conditions}, "
79 | f"include_attributes={include_attributes}, "
80 | f"connection_qualified_name={connection_qualified_name}, "
81 | f"tags={tags}, domain_guids={domain_guids}"
82 | )
83 |
84 | try:
85 | # Initialize FluentSearch
86 | logger.debug("Initializing FluentSearch object")
87 | search = FluentSearch()
88 |
89 | # Apply asset type filter if provided
90 | if asset_type:
91 | if isinstance(asset_type, str):
92 | # Handle string type name
93 | logger.debug(f"Filtering by asset type name: {asset_type}")
94 | search = search.where(Asset.TYPE_NAME.eq(asset_type))
95 | else:
96 | # Handle class type
97 | logger.debug(f"Filtering by asset class: {asset_type.__name__}")
98 | search = search.where(CompoundQuery.asset_type(asset_type))
99 |
100 | # Filter for active assets unless archived are explicitly included
101 | if not include_archived:
102 | logger.debug("Filtering for active assets only")
103 | search = search.where(CompoundQuery.active_assets())
104 |
105 | # Apply connection qualified name filter if provided
106 | if connection_qualified_name:
107 | logger.debug(
108 | f"Filtering by connection qualified name: {connection_qualified_name}"
109 | )
110 | search = search.where(
111 | Asset.QUALIFIED_NAME.startswith(connection_qualified_name)
112 | )
113 |
114 | # Apply tags filter if provided
115 | if tags and len(tags) > 0:
116 | logger.debug(
117 | f"Filtering by tags: {tags}, directly_tagged={directly_tagged}"
118 | )
119 | search = search.where(
120 | CompoundQuery.tagged(with_one_of=tags, directly=directly_tagged)
121 | )
122 |
123 | # Apply domain GUIDs filter if provided
124 | if domain_guids and len(domain_guids) > 0:
125 | logger.debug(f"Filtering by domain GUIDs: {domain_guids}")
126 | for guid in domain_guids:
127 | search = search.where(Asset.DOMAIN_GUIDS.eq(guid))
128 |
129 | # Apply positive conditions
130 | if conditions:
131 | if not isinstance(conditions, dict):
132 | error_msg = f"Conditions parameter must be a dictionary, got {type(conditions).__name__}"
133 | logger.error(error_msg)
134 | return []
135 |
136 | logger.debug(f"Applying positive conditions: {conditions}")
137 | for attr_name, condition in conditions.items():
138 | attr = SearchUtils._get_asset_attribute(attr_name)
139 | if attr is None:
140 | logger.warning(
141 | f"Unknown attribute: {attr_name}, skipping condition"
142 | )
143 | continue
144 |
145 | logger.debug(f"Processing condition for attribute: {attr_name}")
146 |
147 | search = SearchUtils._process_condition(
148 | search, attr, condition, attr_name, "where"
149 | )
150 |
151 | # Apply negative conditions
152 | if negative_conditions:
153 | logger.debug(f"Applying negative conditions: {negative_conditions}")
154 | for attr_name, condition in negative_conditions.items():
155 | attr = SearchUtils._get_asset_attribute(attr_name)
156 | if attr is None:
157 | logger.warning(
158 | f"Unknown attribute for negative condition: {attr_name}, skipping"
159 | )
160 | continue
161 |
162 | logger.debug(
163 | f"Processing negative condition for attribute: {attr_name}"
164 | )
165 |
166 | search = SearchUtils._process_condition(
167 | search, attr, condition, attr_name, "where_not"
168 | )
169 |
170 | # Apply where_some conditions with min_somes
171 | if some_conditions:
172 | logger.debug(
173 | f"Applying 'some' conditions: {some_conditions} with min_somes={min_somes}"
174 | )
175 | for attr_name, condition in some_conditions.items():
176 | attr = SearchUtils._get_asset_attribute(attr_name)
177 | if attr is None:
178 | logger.warning(
179 | f"Unknown attribute for 'some' condition: {attr_name}, skipping"
180 | )
181 | continue
182 |
183 | logger.debug(f"Processing 'some' condition for attribute: {attr_name}")
184 |
185 | search = SearchUtils._process_condition(
186 | search, attr, condition, attr_name, "where_some"
187 | )
188 | search = search.min_somes(min_somes)
189 |
190 | # Apply date range filters
191 | if date_range:
192 | logger.debug(f"Applying date range filters: {date_range}")
193 | date_range_count = 0
194 | for attr_name, range_cond in date_range.items():
195 | attr = SearchUtils._get_asset_attribute(attr_name)
196 | if attr is None:
197 | logger.warning(
198 | f"Unknown attribute for date range: {attr_name}, skipping"
199 | )
200 | continue
201 |
202 | logger.debug(f"Processing date range for attribute: {attr_name}")
203 |
204 | if "gte" in range_cond:
205 | logger.debug(f"Adding {attr_name} >= {range_cond['gte']}")
206 | search = search.where(attr.gte(range_cond["gte"]))
207 | date_range_count += 1
208 | if "lte" in range_cond:
209 | logger.debug(f"Adding {attr_name} <= {range_cond['lte']}")
210 | search = search.where(attr.lte(range_cond["lte"]))
211 | date_range_count += 1
212 | if "gt" in range_cond:
213 | logger.debug(f"Adding {attr_name} > {range_cond['gt']}")
214 | search = search.where(attr.gt(range_cond["gt"]))
215 | date_range_count += 1
216 | if "lt" in range_cond:
217 | logger.debug(f"Adding {attr_name} < {range_cond['lt']}")
218 | search = search.where(attr.lt(range_cond["lt"]))
219 | date_range_count += 1
220 |
221 | logger.debug(f"Applied {date_range_count} date range conditions")
222 |
223 | if guids and len(guids) > 0:
224 | logger.debug(f"Applying GUID filter: {guids}")
225 | search = search.where(Asset.GUID.within(guids))
226 |
227 | # Prepare attributes to include: default attributes + additional user-specified attributes
228 | all_attributes = DEFAULT_SEARCH_ATTRIBUTES.copy()
229 |
230 | if include_attributes:
231 | logger.debug(f"Adding user-specified attributes: {include_attributes}")
232 | for attr in include_attributes:
233 | if isinstance(attr, str):
234 | if attr not in all_attributes:
235 | all_attributes.append(attr)
236 | else:
237 | # For AtlanField objects, we'll add them directly to the search
238 | # They can't be easily compared for duplicates
239 | pass
240 |
241 | logger.debug(f"Total attributes to include: {all_attributes}")
242 |
243 | # Include all attributes in results
244 | for attr_name in all_attributes:
245 | attr_obj = SearchUtils._get_asset_attribute(attr_name)
246 | if attr_obj is None:
247 | logger.warning(
248 | f"Unknown attribute for inclusion: {attr_name}, skipping"
249 | )
250 | continue
251 | logger.debug(f"Including attribute: {attr_name}")
252 | search = search.include_on_results(attr_obj)
253 |
254 | # Include additional AtlanField objects specified by user
255 | if include_attributes:
256 | for attr in include_attributes:
257 | if not isinstance(attr, str):
258 | # Assume it's already an AtlanField object
259 | logger.debug(f"Including attribute object: {attr}")
260 | search = search.include_on_results(attr)
261 | elif attr in VALID_RELATIONSHIPS:
262 | search = search.include_on_results(attr)
263 | try:
264 | search = search.include_on_results(Asset.ASSIGNED_TERMS)
265 | search = search.include_on_relations(AtlasGlossaryTerm.NAME)
266 | except Exception as e:
267 | logger.warning(f"Error including assigned terms: {e}")
268 |
269 | # Set pagination
270 | logger.debug(f"Setting pagination: limit={limit}, offset={offset}")
271 | search = search.page_size(limit)
272 | if offset > 0:
273 | search = search.from_offset(offset)
274 |
275 | # Set sorting
276 | if sort_by:
277 | sort_attr = SearchUtils._get_asset_attribute(sort_by)
278 | if sort_attr is not None:
279 | if sort_order.upper() == "DESC":
280 | logger.debug(f"Setting sort order: {sort_by} DESC")
281 | search = search.sort_by_desc(sort_attr)
282 | else:
283 | logger.debug(f"Setting sort order: {sort_by} ASC")
284 | search = search.sort_by_asc(sort_attr)
285 | else:
286 | logger.warning(
287 | f"Unknown attribute for sorting: {sort_by}, skipping sort"
288 | )
289 |
290 | # Execute search
291 | logger.debug("Converting FluentSearch to request object")
292 | request = search.to_request()
293 |
294 | logger.info("Executing search request")
295 | client = get_atlan_client()
296 | search_response = client.asset.search(request)
297 | processed_results = SearchUtils.process_results(search_response)
298 | logger.info(
299 | f"Search completed, returned {len(processed_results['results'])} results"
300 | )
301 | return processed_results
302 |
303 | except Exception as e:
304 | logger.error(f"Error searching assets: {str(e)}")
305 | return [{"results": [], "aggregations": {}, "error": str(e)}]
306 |
```
--------------------------------------------------------------------------------
/modelcontextprotocol/tools/dq_rules.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Data Quality Rules creation and update tools for Atlan MCP server.
3 |
4 | This module provides functionality to create and update data quality rules in Atlan,
5 | supporting column-level, table-level, and custom SQL rules.
6 | """
7 |
8 | from __future__ import annotations
9 | import logging
10 | from typing import Dict, Any, List, Union
11 |
12 | from pyatlan.model.assets import (
13 | DataQualityRule,
14 | Table,
15 | Column,
16 | View,
17 | MaterialisedView,
18 | SnowflakeDynamicTable,
19 | )
20 | from pyatlan.model.enums import (
21 | DataQualityRuleAlertPriority,
22 | DataQualityRuleThresholdCompareOperator,
23 | DataQualityDimension,
24 | DataQualityRuleThresholdUnit,
25 | DataQualityRuleTemplateConfigRuleConditions,
26 | )
27 | from pyatlan.model.dq_rule_conditions import DQRuleConditionsBuilder
28 |
29 | from client import get_atlan_client
30 | from .models import (
31 | DQRuleSpecification,
32 | DQRuleType,
33 | DQRuleCreationResponse,
34 | CreatedRuleInfo,
35 | DQRuleCondition,
36 | DQAssetType,
37 | DQRuleScheduleSpecification,
38 | DQRuleScheduleResponse,
39 | ScheduledAssetInfo,
40 | DQRuleInfo,
41 | DQRuleDeleteResponse,
42 | DQRuleUpdateSpecification,
43 | DQRuleUpdateResponse,
44 | UpdatedRuleInfo,
45 | )
46 |
47 | logger = logging.getLogger(__name__)
48 |
49 |
50 | # Asset type class mapping for DQ rule operations
51 | _ASSET_TYPE_MAP = {
52 | DQAssetType.TABLE: Table,
53 | DQAssetType.VIEW: View,
54 | DQAssetType.MATERIALIZED_VIEW: MaterialisedView,
55 | DQAssetType.SNOWFLAKE_DYNAMIC_TABLE: SnowflakeDynamicTable,
56 | }
57 |
58 |
59 | def create_dq_rules(
60 | rules: Union[Dict[str, Any], List[Dict[str, Any]]],
61 | ) -> DQRuleCreationResponse:
62 | """
63 | Create one or multiple data quality rules in Atlan.
64 |
65 | Args:
66 | rules (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single rule
67 | specification or a list of rule specifications.
68 |
69 | Returns:
70 | DQRuleCreationResponse: Response containing:
71 | - created_count: Number of rules successfully created
72 | - created_rules: List of created rule details (guid, qualified_name, rule_type)
73 | - errors: List of any errors encountered
74 |
75 | Raises:
76 | Exception: If there's an error creating the rules.
77 | """
78 | # Convert single rule to list for consistent handling
79 | data = rules if isinstance(rules, list) else [rules]
80 | logger.info(f"Creating {len(data)} data quality rule(s)")
81 |
82 | result = DQRuleCreationResponse()
83 |
84 | try:
85 | # Validate and parse specifications
86 | specs = []
87 | for idx, item in enumerate(data):
88 | try:
89 | # Pydantic model validation happens automatically
90 | spec = DQRuleSpecification(**item)
91 | specs.append(spec)
92 | except ValueError as e:
93 | # Pydantic validation errors
94 | result.errors.append(f"Rule {idx + 1} validation error: {str(e)}")
95 | logger.error(f"Error validating rule specification {idx + 1}: {e}")
96 | except Exception as e:
97 | result.errors.append(f"Rule {idx + 1} error: {str(e)}")
98 | logger.error(f"Error parsing rule specification {idx + 1}: {e}")
99 |
100 | if not specs:
101 | logger.warning("No valid rule specifications to create")
102 | return result
103 |
104 | # Get Atlan client
105 | client = get_atlan_client()
106 |
107 | # Create rules
108 | created_assets = []
109 | for spec in specs:
110 | try:
111 | rule = _create_dq_rule(spec, client)
112 | created_assets.append(rule)
113 |
114 | except Exception as e:
115 | error_msg = f"Error creating {spec.rule_type.value} rule: {str(e)}"
116 | result.errors.append(error_msg)
117 | logger.error(error_msg)
118 |
119 | if not created_assets:
120 | return result
121 |
122 | # Bulk save all created rules
123 | logger.info(f"Saving {len(created_assets)} data quality rules")
124 | response = client.asset.save(created_assets)
125 |
126 | # Process response
127 | for created_rule in response.mutated_entities.CREATE:
128 | result.created_rules.append(
129 | CreatedRuleInfo(
130 | guid=created_rule.guid,
131 | qualified_name=created_rule.qualified_name,
132 | rule_type=created_rule.dq_rule_type
133 | if hasattr(created_rule, "dq_rule_type")
134 | else None,
135 | )
136 | )
137 |
138 | result.created_count = len(result.created_rules)
139 | logger.info(f"Successfully created {result.created_count} data quality rules")
140 |
141 | return result
142 |
143 | except Exception as e:
144 | error_msg = f"Error in bulk rule creation: {str(e)}"
145 | logger.error(error_msg)
146 | result.errors.append(error_msg)
147 | return result
148 |
149 |
150 | def _create_dq_rule(spec: DQRuleSpecification, client) -> DataQualityRule:
151 | """
152 | Create a data quality rule based on specification.
153 |
154 | This unified method handles all rule types by using the rule's configuration
155 | to determine the appropriate creator method and required parameters.
156 |
157 | Args:
158 | spec (DQRuleSpecification): Rule specification
159 | client: Atlan client instance
160 |
161 | Returns:
162 | DataQualityRule: Created rule asset
163 | """
164 | # Get rule configuration
165 | config = spec.rule_type.get_rule_config()
166 |
167 | # Determine asset class based on asset type
168 | asset_class = _ASSET_TYPE_MAP.get(spec.asset_type, Table)
169 |
170 | # Base parameters common to all rule types
171 | params = {
172 | "client": client,
173 | "asset": asset_class.ref_by_qualified_name(
174 | qualified_name=spec.asset_qualified_name
175 | ),
176 | "threshold_value": spec.threshold_value,
177 | "alert_priority": DataQualityRuleAlertPriority[spec.alert_priority],
178 | }
179 |
180 | # Add rule-type specific parameters based on config
181 | if spec.rule_type == DQRuleType.CUSTOM_SQL:
182 | params.update(
183 | {
184 | "rule_name": spec.rule_name,
185 | "custom_sql": spec.custom_sql,
186 | "dimension": DataQualityDimension[spec.dimension],
187 | }
188 | )
189 | else:
190 | params["rule_type"] = spec.rule_type.value
191 |
192 | # Add column reference if required
193 | if config["requires_column"]:
194 | params["column"] = Column.ref_by_qualified_name(
195 | qualified_name=spec.column_qualified_name
196 | )
197 |
198 | # Add optional parameters
199 | if spec.threshold_compare_operator:
200 | params["threshold_compare_operator"] = DataQualityRuleThresholdCompareOperator[
201 | spec.threshold_compare_operator
202 | ]
203 |
204 | if spec.threshold_unit:
205 | params["threshold_unit"] = DataQualityRuleThresholdUnit[spec.threshold_unit]
206 |
207 | if spec.row_scope_filtering_enabled:
208 | params["row_scope_filtering_enabled"] = spec.row_scope_filtering_enabled
209 |
210 | # Add rule conditions if supported and provided
211 | if config["supports_conditions"] and spec.rule_conditions:
212 | params["rule_conditions"] = _build_rule_conditions(spec.rule_conditions)
213 |
214 | # Create rule based on type using explicit creator methods
215 | if spec.rule_type == DQRuleType.CUSTOM_SQL:
216 | dq_rule = DataQualityRule.custom_sql_creator(**params)
217 | elif spec.rule_type == DQRuleType.ROW_COUNT:
218 | dq_rule = DataQualityRule.table_level_rule_creator(**params)
219 | elif spec.rule_type in {
220 | DQRuleType.NULL_COUNT,
221 | DQRuleType.NULL_PERCENTAGE,
222 | DQRuleType.BLANK_COUNT,
223 | DQRuleType.BLANK_PERCENTAGE,
224 | DQRuleType.MIN_VALUE,
225 | DQRuleType.MAX_VALUE,
226 | DQRuleType.AVERAGE,
227 | DQRuleType.STANDARD_DEVIATION,
228 | DQRuleType.UNIQUE_COUNT,
229 | DQRuleType.DUPLICATE_COUNT,
230 | DQRuleType.REGEX,
231 | DQRuleType.STRING_LENGTH,
232 | DQRuleType.VALID_VALUES,
233 | DQRuleType.FRESHNESS,
234 | }:
235 | dq_rule = DataQualityRule.column_level_rule_creator(**params)
236 | else:
237 | raise ValueError(f"Unsupported rule type: {spec.rule_type}")
238 |
239 | # Add description if provided
240 | if spec.description:
241 | dq_rule.description = spec.description
242 |
243 | return dq_rule
244 |
245 |
246 | def _build_rule_conditions(conditions: List[DQRuleCondition]) -> Any:
247 | """
248 | Build DQRuleConditionsBuilder from condition specifications.
249 |
250 | Args:
251 | conditions (List[DQRuleCondition]): List of rule condition models
252 |
253 | Returns:
254 | Built rule conditions object
255 | """
256 | builder = DQRuleConditionsBuilder()
257 |
258 | for condition in conditions:
259 | condition_type = DataQualityRuleTemplateConfigRuleConditions[condition.type]
260 |
261 | # Build condition parameters dynamically
262 | condition_params = {"type": condition_type}
263 |
264 | for key in ["value", "min_value", "max_value"]:
265 | value = getattr(condition, key)
266 | if value is not None:
267 | condition_params[key] = value
268 |
269 | builder.add_condition(**condition_params)
270 |
271 | return builder.build()
272 |
273 |
274 | def schedule_dq_rules(
275 | schedules: Union[Dict[str, Any], List[Dict[str, Any]]],
276 | ) -> DQRuleScheduleResponse:
277 | """
278 | Schedule data quality rule execution for one or multiple assets.
279 |
280 | Args:
281 | schedules: Either a single schedule specification or a list of specifications.
282 |
283 | Returns:
284 | DQRuleScheduleResponse: Response containing scheduled_count, scheduled_assets, and errors.
285 | """
286 | # Convert single schedule to list for consistent handling
287 | data = schedules if isinstance(schedules, list) else [schedules]
288 |
289 | result = DQRuleScheduleResponse()
290 |
291 | # Validate and parse specifications
292 | specs = []
293 | for idx, item in enumerate(data):
294 | try:
295 | spec = DQRuleScheduleSpecification(**item)
296 | specs.append(spec)
297 | except Exception as e:
298 | result.errors.append(f"Schedule {idx + 1} error: {str(e)}")
299 | logger.error(f"Error parsing schedule specification {idx + 1}: {e}")
300 |
301 | if not specs:
302 | logger.warning("No valid schedule specifications to create")
303 | return result
304 |
305 | # Get Atlan client
306 | client = get_atlan_client()
307 |
308 | # Schedule rules for each asset
309 | for spec in specs:
310 | try:
311 | asset_cls = _ASSET_TYPE_MAP.get(spec.asset_type)
312 | if not asset_cls:
313 | raise ValueError(f"Unsupported asset type: {spec.asset_type.value}")
314 |
315 | client.asset.add_dq_rule_schedule(
316 | asset_type=asset_cls,
317 | asset_name=spec.asset_name,
318 | asset_qualified_name=spec.asset_qualified_name,
319 | schedule_crontab=spec.schedule_crontab,
320 | schedule_time_zone=spec.schedule_time_zone,
321 | )
322 |
323 | result.scheduled_assets.append(
324 | ScheduledAssetInfo(
325 | asset_name=spec.asset_name,
326 | asset_qualified_name=spec.asset_qualified_name,
327 | schedule_crontab=spec.schedule_crontab,
328 | schedule_time_zone=spec.schedule_time_zone,
329 | )
330 | )
331 | result.scheduled_count += 1
332 |
333 | except Exception as e:
334 | error_msg = f"Error scheduling {spec.asset_name}: {str(e)}"
335 | result.errors.append(error_msg)
336 | logger.error(error_msg)
337 |
338 | return result
339 |
340 |
341 | def delete_dq_rules(
342 | rule_guids: Union[str, List[str]],
343 | ) -> DQRuleDeleteResponse:
344 | """
345 | Delete one or multiple data quality rules in Atlan.
346 |
347 | Args:
348 | rule_guids: Single rule GUID or list of rule GUIDs to delete.
349 |
350 | Returns:
351 | DQRuleDeleteResponse with deletion results and any errors.
352 |
353 | Example:
354 | # Delete single rule
355 | result = delete_dq_rules("rule-guid-123")
356 |
357 | # Delete multiple rules
358 | result = delete_dq_rules(["rule-guid-1", "rule-guid-2"])
359 | """
360 | # Convert single GUID to list for consistent handling
361 | data = rule_guids if isinstance(rule_guids, list) else [rule_guids]
362 |
363 | result = DQRuleDeleteResponse()
364 |
365 | # Validate and parse specifications
366 | specs = []
367 | for idx, item in enumerate(data):
368 | try:
369 | if isinstance(item, str):
370 | spec = DQRuleInfo(rule_guid=item)
371 | else:
372 | spec = DQRuleInfo(**item)
373 | specs.append(spec)
374 | except Exception as e:
375 | result.errors.append(f"Rule {idx + 1} error: {str(e)}")
376 | logger.error(f"Error parsing rule specification {idx + 1}: {e}")
377 |
378 | if not specs:
379 | logger.warning("No valid rule specifications to delete")
380 | return result
381 |
382 | # Get Atlan client
383 | client = get_atlan_client()
384 |
385 | # Delete each rule
386 | for spec in specs:
387 | try:
388 | response = client.asset.delete_by_guid(guid=spec.rule_guid)
389 | deleted_assets = response.assets_deleted(asset_type=DataQualityRule)
390 |
391 | if deleted_assets:
392 | result.deleted_rules.append(DQRuleInfo(rule_guid=spec.rule_guid))
393 | result.deleted_count += 1
394 | logger.info(f"Successfully deleted rule: {spec.rule_guid}")
395 | else:
396 | error_msg = f"No rule found with GUID: {spec.rule_guid}"
397 | result.errors.append(error_msg)
398 | logger.warning(error_msg)
399 |
400 | except Exception as e:
401 | error_msg = f"Error deleting rule {spec.rule_guid}: {str(e)}"
402 | result.errors.append(error_msg)
403 | logger.error(error_msg)
404 |
405 | return result
406 |
407 |
408 | def update_dq_rules(
409 | rules: Union[Dict[str, Any], List[Dict[str, Any]]],
410 | ) -> DQRuleUpdateResponse:
411 | """
412 | Update one or multiple existing data quality rules in Atlan.
413 |
414 | To update a rule, you only need to provide the qualified name, rule_type, and
415 | asset_qualified_name. All other parameters are optional and will only be updated
416 | if provided.
417 |
418 | Args:
419 | rules (Union[Dict[str, Any], List[Dict[str, Any]]): Either a single rule
420 | specification or a list of rule specifications. Each specification must include:
421 | - qualified_name (str): The qualified name of the rule to update (required)
422 | - rule_type (str): Type of rule (required for validation)
423 | - asset_qualified_name (str): Qualified name of the table/view (required)
424 | - Additional optional fields to update (see examples)
425 |
426 | Returns:
427 | DQRuleUpdateResponse: Response containing:
428 | - updated_count: Number of rules successfully updated
429 | - updated_rules: List of updated rule details (guid, qualified_name, rule_type)
430 | - errors: List of any errors encountered
431 |
432 | Raises:
433 | Exception: If there's an error updating the rules.
434 | """
435 | # Convert single rule to list for consistent handling
436 | data = rules if isinstance(rules, list) else [rules]
437 | logger.info(f"Updating {len(data)} data quality rule(s)")
438 |
439 | result = DQRuleUpdateResponse()
440 |
441 | try:
442 | # Validate and parse specifications
443 | specs = []
444 | for idx, item in enumerate(data):
445 | try:
446 | # Pydantic model validation happens automatically
447 | spec = DQRuleUpdateSpecification(**item)
448 | specs.append(spec)
449 | except ValueError as e:
450 | # Pydantic validation errors
451 | result.errors.append(f"Rule {idx + 1} validation error: {str(e)}")
452 | logger.error(
453 | f"Error validating rule update specification {idx + 1}: {e}"
454 | )
455 | except Exception as e:
456 | result.errors.append(f"Rule {idx + 1} error: {str(e)}")
457 | logger.error(f"Error parsing rule update specification {idx + 1}: {e}")
458 |
459 | if not specs:
460 | logger.warning("No valid rule update specifications to process")
461 | return result
462 |
463 | # Get Atlan client
464 | client = get_atlan_client()
465 |
466 | # Update rules
467 | updated_assets = []
468 | for spec in specs:
469 | try:
470 | logger.debug(
471 | f"Updating {spec.rule_type.value} rule: {spec.qualified_name}"
472 | )
473 | rule = _update_dq_rule(spec, client)
474 | updated_assets.append(rule)
475 |
476 | except Exception as e:
477 | error_msg = f"Error updating rule {spec.qualified_name}: {str(e)}"
478 | result.errors.append(error_msg)
479 | logger.error(error_msg)
480 |
481 | if not updated_assets:
482 | return result
483 |
484 | # Bulk save all updated rules
485 | logger.info(f"Saving {len(updated_assets)} updated data quality rules")
486 | response = client.asset.save(updated_assets)
487 |
488 | # Process response
489 | for updated_rule in response.mutated_entities.UPDATE:
490 | result.updated_rules.append(
491 | UpdatedRuleInfo(
492 | guid=updated_rule.guid,
493 | qualified_name=updated_rule.qualified_name,
494 | rule_type=updated_rule.dq_rule_type
495 | if hasattr(updated_rule, "dq_rule_type")
496 | else None,
497 | )
498 | )
499 |
500 | result.updated_count = len(result.updated_rules)
501 | logger.info(f"Successfully updated {result.updated_count} data quality rules")
502 |
503 | return result
504 |
505 | except Exception as e:
506 | error_msg = f"Error in bulk rule update: {str(e)}"
507 | logger.error(error_msg)
508 | result.errors.append(error_msg)
509 | return result
510 |
511 |
512 | def _update_dq_rule(spec: DQRuleUpdateSpecification, client) -> DataQualityRule:
513 | """
514 | Update a data quality rule based on specification.
515 |
516 | Args:
517 | spec (DQRuleUpdateSpecification): Rule update specification
518 | client: Atlan client instance
519 |
520 | Returns:
521 | DataQualityRule: Updated rule asset
522 | """
523 | logger.debug(f"Updating {spec.rule_type.value} rule: {spec.qualified_name}")
524 |
525 | # Base parameters - only qualified_name and client are required
526 | params = {
527 | "client": client,
528 | "qualified_name": spec.qualified_name,
529 | }
530 |
531 | # Add optional threshold parameters if provided
532 | if spec.threshold_value is not None:
533 | params["threshold_value"] = spec.threshold_value
534 |
535 | if spec.threshold_compare_operator:
536 | params["threshold_compare_operator"] = DataQualityRuleThresholdCompareOperator[
537 | spec.threshold_compare_operator
538 | ]
539 |
540 | if spec.threshold_unit:
541 | params["threshold_unit"] = DataQualityRuleThresholdUnit[spec.threshold_unit]
542 |
543 | if spec.alert_priority:
544 | params["alert_priority"] = DataQualityRuleAlertPriority[spec.alert_priority]
545 |
546 | # Add Custom SQL specific parameters if provided
547 | if spec.custom_sql:
548 | params["custom_sql"] = spec.custom_sql
549 |
550 | if spec.rule_name:
551 | params["rule_name"] = spec.rule_name
552 |
553 | if spec.dimension:
554 | params["dimension"] = DataQualityDimension[spec.dimension]
555 |
556 | # Add rule conditions if provided
557 | if spec.rule_conditions:
558 | params["rule_conditions"] = _build_rule_conditions(spec.rule_conditions)
559 |
560 | if spec.row_scope_filtering_enabled is not None:
561 | params["row_scope_filtering_enabled"] = spec.row_scope_filtering_enabled
562 |
563 | # Use the updater method from DataQualityRule
564 | updated_rule = DataQualityRule.updater(**params)
565 |
566 | # Add description if provided
567 | if spec.description:
568 | updated_rule.description = spec.description
569 |
570 | return updated_rule
571 |
```