#
tokens: 16522/50000 1/42 files (page 2/2)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 2 of 2. Use http://codebase.md/atlanhq/agent-toolkit?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .github
│   ├── CODEOWNERS
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── custom.md
│   │   └── feature_request.md
│   ├── SECURITY.md
│   └── workflows
│       ├── checks.yml
│       └── mcp-server-release.yml
├── .gitignore
├── .pre-commit-config.yaml
├── CHANGELOG.md
├── CODE_OF_CONDUCT.md
├── commitlint.config.js
├── CONTRIBUTING.md
├── LICENSE
├── modelcontextprotocol
│   ├── .cursor
│   │   └── rules
│   │       ├── mcp-guidelines.mdc
│   │       ├── project-structure.mdc
│   │       ├── python.mdc
│   │       └── tool-development-guide.mdc
│   ├── .dockerignore
│   ├── .env.template
│   ├── .python-version
│   ├── client.py
│   ├── Dockerfile
│   ├── docs
│   │   ├── DEPLOYMENT.md
│   │   └── LOCAL_BUILD.md
│   ├── middleware.py
│   ├── pyproject.toml
│   ├── README.md
│   ├── server.py
│   ├── settings.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── assets.py
│   │   ├── domain.py
│   │   ├── dq_rules.py
│   │   ├── dsl.py
│   │   ├── glossary.py
│   │   ├── lineage.py
│   │   ├── models.py
│   │   ├── query.py
│   │   └── search.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── assets.py
│   │   ├── constants.py
│   │   ├── parameters.py
│   │   └── search.py
│   ├── uv.lock
│   └── version.py
└── README.md
```

# Files

--------------------------------------------------------------------------------
/modelcontextprotocol/server.py:
--------------------------------------------------------------------------------

```python
   1 | import argparse
   2 | import json
   3 | import os
   4 | from typing import Any, Dict, List
   5 | from fastmcp import FastMCP
   6 | from tools import (
   7 |     search_assets,
   8 |     get_assets_by_dsl,
   9 |     traverse_lineage,
  10 |     update_assets,
  11 |     query_asset,
  12 |     create_glossary_category_assets,
  13 |     create_glossary_assets,
  14 |     create_glossary_term_assets,
  15 |     create_data_domain_assets,
  16 |     create_data_product_assets,
  17 |     create_dq_rules,
  18 |     schedule_dq_rules,
  19 |     delete_dq_rules,
  20 |     update_dq_rules,
  21 |     UpdatableAttribute,
  22 |     CertificateStatus,
  23 |     UpdatableAsset,
  24 |     TermOperations,
  25 | )
  26 | from pyatlan.model.lineage import LineageDirection
  27 | from utils.parameters import (
  28 |     parse_json_parameter,
  29 |     parse_list_parameter,
  30 | )
  31 | from middleware import ToolRestrictionMiddleware
  32 | from settings import get_settings
  33 | 
  34 | 
  35 | mcp = FastMCP("Atlan MCP Server", dependencies=["pyatlan", "fastmcp"])
  36 | 
  37 | # Get restricted tools from environment variable or use default
  38 | restricted_tools_env = os.getenv("RESTRICTED_TOOLS", "")
  39 | if restricted_tools_env:
  40 |     restricted_tools = [
  41 |         tool.strip() for tool in restricted_tools_env.split(",") if tool.strip()
  42 |     ]
  43 | else:
  44 |     # Default configuration - modify this list to restrict specific tools
  45 |     restricted_tools = []
  46 | 
  47 | tool_restriction = ToolRestrictionMiddleware(restricted_tools=restricted_tools)
  48 | mcp.add_middleware(tool_restriction)
  49 | 
  50 | 
  51 | @mcp.tool()
  52 | def search_assets_tool(
  53 |     conditions=None,
  54 |     negative_conditions=None,
  55 |     some_conditions=None,
  56 |     min_somes=1,
  57 |     include_attributes=None,
  58 |     asset_type=None,
  59 |     include_archived=False,
  60 |     limit=10,
  61 |     offset=0,
  62 |     sort_by=None,
  63 |     sort_order="ASC",
  64 |     connection_qualified_name=None,
  65 |     tags=None,
  66 |     directly_tagged=True,
  67 |     domain_guids=None,
  68 |     date_range=None,
  69 |     guids=None,
  70 | ):
  71 |     """
  72 |     Advanced asset search using FluentSearch with flexible conditions.
  73 | 
  74 |     Args:
  75 |         conditions (Dict[str, Any], optional): Dictionary of attribute conditions to match.
  76 |             Format: {"attribute_name": value} or {"attribute_name": {"operator": operator, "value": value}}
  77 |         negative_conditions (Dict[str, Any], optional): Dictionary of attribute conditions to exclude.
  78 |             Format: {"attribute_name": value} or {"attribute_name": {"operator": operator, "value": value}}
  79 |         some_conditions (Dict[str, Any], optional): Conditions for where_some() queries that require min_somes of them to match.
  80 |             Format: {"attribute_name": value} or {"attribute_name": {"operator": operator, "value": value}}
  81 |         min_somes (int): Minimum number of some_conditions that must match. Defaults to 1.
  82 |         include_attributes (List[Union[str, AtlanField]], optional): List of specific attributes to include in results.
  83 |             Can be string attribute names or AtlanField objects.
  84 |         asset_type (Union[Type[Asset], str], optional): Type of asset to search for.
  85 |             Either a class (e.g., Table, Column) or a string type name (e.g., "Table", "Column")
  86 |         include_archived (bool): Whether to include archived assets. Defaults to False.
  87 |         limit (int, optional): Maximum number of results to return. Defaults to 10.
  88 |         offset (int, optional): Offset for pagination. Defaults to 0.
  89 |         sort_by (str, optional): Attribute to sort by. Defaults to None.
  90 |         sort_order (str, optional): Sort order, "ASC" or "DESC". Defaults to "ASC".
  91 |         connection_qualified_name (str, optional): Connection qualified name to filter by. ex: default/snowflake/123456/abc
  92 |         tags (List[str], optional): List of tags to filter by.
  93 |         directly_tagged (bool): Whether to filter for directly tagged assets only. Defaults to True.
  94 |         domain_guids (List[str], optional): List of domain GUIDs to filter by.
  95 |         date_range (Dict[str, Dict[str, Any]], optional): Date range filters.
  96 |             Format: {"attribute_name": {"gte": start_timestamp, "lte": end_timestamp}}
  97 |         guids (List[str], optional): List of asset GUIDs to filter by.
  98 | 
  99 |     Returns:
 100 |         List[Asset]: List of assets matching the search criteria
 101 | 
 102 |     Raises:
 103 |         Exception: If there's an error executing the search
 104 | 
 105 |     Examples:
 106 |         # Search for verified tables
 107 |         tables = search_assets(
 108 |             asset_type="Table",
 109 |             conditions={"certificate_status": CertificateStatus.VERIFIED.value}
 110 |         )
 111 | 
 112 |         # Search for assets missing descriptions from the database/connection default/snowflake/123456/abc
 113 |         missing_desc = search_assets(
 114 |             connection_qualified_name="default/snowflake/123456/abc",
 115 |             negative_conditions={
 116 |                 "description": "has_any_value",
 117 |                 "user_description": "has_any_value"
 118 |             },
 119 |             include_attributes=["owner_users", "owner_groups"]
 120 |         )
 121 | 
 122 |         # Search for columns with specific certificate status
 123 |         columns = search_assets(
 124 |             asset_type="Column",
 125 |             some_conditions={
 126 |                 "certificate_status": [CertificateStatus.DRAFT.value, CertificateStatus.VERIFIED.value]
 127 |             },
 128 |             tags=["PRD"],
 129 |             conditions={"created_by": "username"},
 130 |             date_range={"create_time": {"gte": 1641034800000, "lte": 1672570800000}}
 131 |         )
 132 |         # Search for assets with a specific search text
 133 |         assets = search_assets(
 134 |             conditions = {
 135 |                 "name": {
 136 |                     "operator": "match",
 137 |                     "value": "search_text"
 138 |                 },
 139 |                 "description": {
 140 |                     "operator": "match",
 141 |                     "value": "search_text"
 142 |                 }
 143 |             }
 144 |         )
 145 | 
 146 | 
 147 |         # Search for assets using advanced operators
 148 |         assets = search_assets(
 149 |             conditions={
 150 |                 "name": {
 151 |                     "operator": "startswith",
 152 |                     "value": "prefix_",
 153 |                     "case_insensitive": True
 154 |                 },
 155 |                 "description": {
 156 |                     "operator": "contains",
 157 |                     "value": "important data",
 158 |                     "case_insensitive": True
 159 |                 },
 160 |                 "create_time": {
 161 |                     "operator": "between",
 162 |                     "value": [1640995200000, 1643673600000]
 163 |                 }
 164 |             }
 165 |         )
 166 | 
 167 |         # For multiple asset types queries. ex: Search for Table, Column, or View assets from the database/connection default/snowflake/123456/abc
 168 |         assets = search_assets(
 169 |             connection_qualified_name="default/snowflake/123456/abc",
 170 |             conditions={
 171 |                 "type_name": ["Table", "Column", "View"],
 172 |             }
 173 |         )
 174 | 
 175 |         # Search for assets with compliant business policy
 176 |         assets = search_assets(
 177 |             conditions={
 178 |                 "asset_policy_guids": "business_policy_guid"
 179 |             },
 180 |             include_attributes=["asset_policy_guids"]
 181 |         )
 182 | 
 183 |         # Search for assets with non compliant business policy
 184 |         assets = search_assets(
 185 |             conditions={
 186 |                 "non_compliant_asset_policy_guids": "business_policy_guid"
 187 |             },
 188 |             include_attributes=["non_compliant_asset_policy_guids"]
 189 |         )
 190 | 
 191 |         # get non compliant business policies for an asset
 192 |          assets = search_assets(
 193 |             conditions={
 194 |                 "name": "has_any_value",
 195 |                 "displayName": "has_any_value",
 196 |                 "guid": "has_any_value"
 197 |             },
 198 |             include_attributes=["non_compliant_asset_policy_guids"]
 199 |         )
 200 | 
 201 |         # get compliant business policies for an asset
 202 |          assets = search_assets(
 203 |             conditions={
 204 |                 "name": "has_any_value",
 205 |                 "displayName": "has_any_value",
 206 |                 "guid": "has_any_value"
 207 |             },
 208 |             include_attributes=["asset_policy_guids"]
 209 |         )
 210 | 
 211 |         # get incident for a business policy
 212 |          assets = search_assets(
 213 |             conditions={
 214 |                 "asset_type": "BusinessPolicyIncident",
 215 |                 "business_policy_incident_related_policy_guids": "business_policy_guid"
 216 |             },
 217 |             some_conditions={
 218 |                 "certificate_status": [CertificateStatus.DRAFT.value, CertificateStatus.VERIFIED.value]
 219 |             }
 220 |         )
 221 | 
 222 |         # Search for glossary terms by name and status
 223 |         glossary_terms = search_assets(
 224 |             asset_type="AtlasGlossaryTerm",
 225 |             conditions={
 226 |                 "certificate_status": CertificateStatus.VERIFIED.value,
 227 |                 "name": {
 228 |                     "operator": "contains",
 229 |                     "value": "customer",
 230 |                     "case_insensitive": True
 231 |                 }
 232 |             },
 233 |             include_attributes=["categories"]
 234 |         )
 235 | 
 236 |         # Find popular but expensive assets (cost optimization)
 237 |         search_assets(
 238 |             conditions={
 239 |                 "popularityScore": {"operator": "gte", "value": 0.8},
 240 |                 "sourceReadQueryCost": {"operator": "gte", "value": 1000}
 241 |             },
 242 |             include_attributes=["sourceReadExpensiveQueryRecordList", "sourceCostUnit"]
 243 |         )
 244 | 
 245 |         # Find unused assets accessed before 2024
 246 |         search_assets(
 247 |             conditions={"sourceLastReadAt": {"operator": "lt", "value": 1704067200000}}, # Unix epoch in milliseconds
 248 |             include_attributes=["sourceReadCount", "sourceLastReadAt"]
 249 |         )
 250 | 
 251 |         # Get top users for a specific table
 252 |         # Note: Can't directly filter by user, but can retrieve the list
 253 |         search_assets(
 254 |             conditions={"name": "customer_transactions"},
 255 |             include_attributes=["sourceReadTopUserList", "sourceReadUserCount"]
 256 |         )
 257 | 
 258 |         # Find frequently accessed uncertified assets (governance gap)
 259 |         search_assets(
 260 |             conditions={
 261 |                 "sourceReadUserCount": {"operator": "gte", "value": 10},
 262 |                 "certificate_status": {"operator": "ne", "value": "VERIFIED"}
 263 |             }
 264 |         )
 265 | 
 266 |         # Query assets in specific connection with cost filters
 267 |         search_assets(
 268 |             connection_qualified_name="default/snowflake/123456",
 269 |             conditions={"sourceTotalCost": {"operator": "gte", "value": 500}},
 270 |             sort_by="sourceTotalCost",
 271 |             sort_order="DESC",
 272 |             include_attributes=[
 273 |                 "sourceReadQueryComputeCostRecordList",  # Shows breakdown by warehouse
 274 |                 "sourceQueryComputeCostList",  # List of warehouses used
 275 |                 "sourceCostUnit"
 276 |             ]
 277 |         )
 278 | 
 279 |     The search supports various analytics attributes following similar patterns:
 280 |     - Usage Metrics:
 281 |         - `sourceReadCount`, `sourceReadUserCount` - Filter by read frequency or user diversity
 282 |         - `sourceLastReadAt`, `lastRowChangedAt` - Time-based filtering (Unix timestamp in ms)
 283 |         - `popularityScore` - Float value 0-1 indicating asset popularity
 284 | 
 285 |     - Cost Metrics:
 286 |         - `sourceReadQueryCost`, `sourceTotalCost` - Filter by cost thresholds
 287 |         - Include `sourceCostUnit` in attributes to get cost units
 288 |         - Include `sourceReadExpensiveQueryRecordList` for detailed breakdowns
 289 | 
 290 |     - User Analytics:
 291 |         - `sourceReadTopUserList`, `sourceReadRecentUserList` - Get user lists
 292 |         - `sourceReadTopUserRecordList`, `sourceReadRecentUserRecordList` - Get detailed records
 293 | 
 294 |     - Query Analytics:
 295 |         - `sourceReadPopularQueryRecordList` - Popular queries for the asset
 296 |         - `lastRowChangedQuery` - Query that last modified the asset
 297 | 
 298 |     Additional attributes you can include in the conditions to extract more metadata from an asset:
 299 |         - columns
 300 |         - column_count
 301 |         - row_count
 302 |         - readme
 303 |         - owner_users
 304 |     """
 305 |     try:
 306 |         # Parse JSON string parameters if needed
 307 |         conditions = parse_json_parameter(conditions)
 308 |         negative_conditions = parse_json_parameter(negative_conditions)
 309 |         some_conditions = parse_json_parameter(some_conditions)
 310 |         date_range = parse_json_parameter(date_range)
 311 |         include_attributes = parse_list_parameter(include_attributes)
 312 |         tags = parse_list_parameter(tags)
 313 |         domain_guids = parse_list_parameter(domain_guids)
 314 |         guids = parse_list_parameter(guids)
 315 | 
 316 |         return search_assets(
 317 |             conditions,
 318 |             negative_conditions,
 319 |             some_conditions,
 320 |             min_somes,
 321 |             include_attributes,
 322 |             asset_type,
 323 |             include_archived,
 324 |             limit,
 325 |             offset,
 326 |             sort_by,
 327 |             sort_order,
 328 |             connection_qualified_name,
 329 |             tags,
 330 |             directly_tagged,
 331 |             domain_guids,
 332 |             date_range,
 333 |             guids,
 334 |         )
 335 |     except (json.JSONDecodeError, ValueError) as e:
 336 |         return {"error": f"Parameter parsing error: {str(e)}"}
 337 | 
 338 | 
 339 | @mcp.tool()
 340 | def get_assets_by_dsl_tool(dsl_query):
 341 |     """
 342 |     Execute the search with the given query
 343 |     dsl_query : Union[str, Dict[str, Any]] (required):
 344 |         The DSL query used to search the index.
 345 | 
 346 |     Example:
 347 |     dsl_query = '''{
 348 |     "query": {
 349 |         "function_score": {
 350 |             "boost_mode": "sum",
 351 |             "functions": [
 352 |                 {"filter": {"match": {"starredBy": "john.doe"}}, "weight": 10},
 353 |                 {"filter": {"match": {"certificateStatus": "VERIFIED"}}, "weight": 15},
 354 |                 {"filter": {"match": {"certificateStatus": "DRAFT"}}, "weight": 10},
 355 |                 {"filter": {"bool": {"must_not": [{"exists": {"field": "certificateStatus"}}]}}, "weight": 8},
 356 |                 {"filter": {"bool": {"must_not": [{"terms": {"__typeName.keyword": ["Process", "DbtProcess"]}}]}}, "weight": 20}
 357 |             ],
 358 |             "query": {
 359 |                 "bool": {
 360 |                     "filter": [
 361 |                         {
 362 |                             "bool": {
 363 |                                 "minimum_should_match": 1,
 364 |                                 "must": [
 365 |                                     {"bool": {"should": [{"terms": {"certificateStatus": ["VERIFIED"]}}]}},
 366 |                                     {"term": {"__state": "ACTIVE"}}
 367 |                                 ],
 368 |                                 "must_not": [
 369 |                                     {"term": {"isPartial": "true"}},
 370 |                                     {"terms": {"__typeName.keyword": ["Procedure", "DbtColumnProcess", "BIProcess", "MatillionComponent", "SnowflakeTag", "DbtTag", "BigqueryTag", "AIApplication", "AIModel"]}},
 371 |                                     {"terms": {"__typeName.keyword": ["MCIncident", "AnomaloCheck"]}}
 372 |                                 ],
 373 |                                 "should": [
 374 |                                     {"terms": {"__typeName.keyword": ["Query", "Collection", "AtlasGlossary", "AtlasGlossaryCategory", "AtlasGlossaryTerm", "Connection", "File"]}},
 375 |                                 ]
 376 |                             }
 377 |                         }
 378 |                     ]
 379 |                 },
 380 |                 "score_mode": "sum"
 381 |             },
 382 |             "score_mode": "sum"
 383 |         }
 384 |     },
 385 |     "post_filter": {
 386 |         "bool": {
 387 |             "filter": [
 388 |                 {
 389 |                     "bool": {
 390 |                         "must": [{"terms": {"__typeName.keyword": ["Table", "Column"]}}],
 391 |                         "must_not": [{"exists": {"field": "termType"}}]
 392 |                     }
 393 |                 }
 394 |             ]
 395 |         },
 396 |         "sort": [
 397 |             {"_score": {"order": "desc"}},
 398 |             {"popularityScore": {"order": "desc"}},
 399 |             {"starredCount": {"order": "desc"}},
 400 |             {"name.keyword": {"order": "asc"}}
 401 |         ],
 402 |         "track_total_hits": true,
 403 |         "size": 10,
 404 |         "include_meta": false
 405 |     }'''
 406 |     response = get_assets_by_dsl(dsl_query)
 407 |     """
 408 |     return get_assets_by_dsl(dsl_query)
 409 | 
 410 | 
 411 | @mcp.tool()
 412 | def traverse_lineage_tool(
 413 |     guid,
 414 |     direction,
 415 |     depth=1000000,
 416 |     size=10,
 417 |     immediate_neighbors=True,
 418 |     include_attributes=None,
 419 | ):
 420 |     """
 421 |     Traverse asset lineage in specified direction.
 422 | 
 423 |     By default, essential attributes are included in results. Additional attributes can be
 424 |     specified via include_attributes parameter for richer lineage information.
 425 | 
 426 |     Args:
 427 |         guid (str): GUID of the starting asset
 428 |         direction (str): Direction to traverse ("UPSTREAM" or "DOWNSTREAM")
 429 |         depth (int, optional): Maximum depth to traverse. Defaults to 1000000.
 430 |         size (int, optional): Maximum number of results to return. Defaults to 10.
 431 |         immediate_neighbors (bool, optional): Only return immediate neighbors. Defaults to True.
 432 |         include_attributes (List[str], optional): List of additional attribute names to include in results.
 433 |             These will be added to the default set.
 434 | 
 435 |     Default Attributes (always included):
 436 |         - name, display_name, description, qualified_name, user_description
 437 |         - certificate_status, owner_users, owner_groups
 438 |         - connector_name, has_lineage, source_created_at, source_updated_at
 439 |         - readme, asset_tags
 440 | 
 441 |     Returns:
 442 |         Dict[str, Any]: Dictionary containing:
 443 |             - assets: List of assets in the lineage with processed attributes
 444 |             - error: None if no error occurred, otherwise the error message
 445 | 
 446 |     Examples:
 447 |         # Get lineage with default attributes
 448 |         lineage = traverse_lineage_tool(
 449 |             guid="asset-guid-here",
 450 |             direction="DOWNSTREAM",
 451 |             depth=1000,
 452 |             size=10
 453 |         )
 454 |     """
 455 |     try:
 456 |         direction_enum = LineageDirection[direction.upper()]
 457 |     except KeyError:
 458 |         raise ValueError(
 459 |             f"Invalid direction: {direction}. Must be either 'UPSTREAM' or 'DOWNSTREAM'"
 460 |         )
 461 | 
 462 |     # Parse include_attributes parameter if provided
 463 |     parsed_include_attributes = parse_list_parameter(include_attributes)
 464 | 
 465 |     return traverse_lineage(
 466 |         guid=guid,
 467 |         direction=direction_enum,
 468 |         depth=int(depth),
 469 |         size=int(size),
 470 |         immediate_neighbors=bool(immediate_neighbors),
 471 |         include_attributes=parsed_include_attributes,
 472 |     )
 473 | 
 474 | 
 475 | @mcp.tool()
 476 | def update_assets_tool(
 477 |     assets,
 478 |     attribute_name,
 479 |     attribute_values,
 480 | ):
 481 |     """
 482 |     Update one or multiple assets with different values for attributes or term operations.
 483 | 
 484 |     Args:
 485 |         assets (Union[Dict[str, Any], List[Dict[str, Any]]]): Asset(s) to update.
 486 |             Can be a single UpdatableAsset or a list of UpdatableAsset objects.
 487 |             For asset of type_name=AtlasGlossaryTerm or type_name=AtlasGlossaryCategory, each asset dictionary MUST include a "glossary_guid" key which is the GUID of the glossary that the term belongs to.
 488 |         attribute_name (str): Name of the attribute to update.
 489 |             Supports "user_description", "certificate_status", "readme", and "term".
 490 |         attribute_values (List[Union[str, Dict[str, Any]]]): List of values to set for the attribute.
 491 |             For certificateStatus, only "VERIFIED", "DRAFT", or "DEPRECATED" are allowed.
 492 |             For readme, the value must be a valid Markdown string.
 493 |             For term, the value must be a dict with "operation" and "term_guids" keys.
 494 | 
 495 |     Returns:
 496 |         Dict[str, Any]: Dictionary containing:
 497 |             - updated_count: Number of assets successfully updated
 498 |             - errors: List of any errors encountered
 499 |             - operation: The operation that was performed (for term operations)
 500 | 
 501 |     Examples:
 502 |         # Update certificate status for a single asset
 503 |         result = update_assets_tool(
 504 |             assets={
 505 |                 "guid": "asset-guid-here",
 506 |                 "name": "Asset Name",
 507 |                 "type_name": "Asset Type Name",
 508 |                 "qualified_name": "Asset Qualified Name"
 509 |             },
 510 |             attribute_name="certificate_status",
 511 |             attribute_values=["VERIFIED"]
 512 |         )
 513 | 
 514 |         # Update user description for multiple assets
 515 |         result = update_assets_tool(
 516 |             assets=[
 517 |                 {
 518 |                     "guid": "asset-guid-1",
 519 |                     "name": "Asset Name 1",
 520 |                     "type_name": "Asset Type Name 1",
 521 |                     "qualified_name": "Asset Qualified Name 1"
 522 |                 },
 523 |                 {
 524 |                     "guid": "asset-guid-2",
 525 |                     "name": "Asset Name 2",
 526 |                     "type_name": "Asset Type Name 2",
 527 |                     "qualified_name": "Asset Qualified Name 2"
 528 |                 }
 529 |             ],
 530 |             attribute_name="user_description",
 531 |             attribute_values=[
 532 |                 "New description for asset 1", "New description for asset 2"
 533 |             ]
 534 |         )
 535 | 
 536 |         # Update readme for a single asset with Markdown
 537 |         result = update_assets_tool(
 538 |             assets={
 539 |                 "guid": "asset-guid-here",
 540 |                 "name": "Asset Name",
 541 |                 "type_name": "Asset Type Name",
 542 |                 "qualified_name": "Asset Qualified Name"
 543 |             },
 544 |             attribute_name="readme",
 545 |             attribute_values=['''# Customer Data Table
 546 |             Contains customer transaction records for analytics.
 547 |             **Key Info:**
 548 |             - Updated daily at 2 AM
 549 |             - Contains PII data
 550 |             - [Documentation](https://docs.example.com)''']
 551 |         )
 552 | 
 553 |         # Append terms to a single asset
 554 |         result = update_assets_tool(
 555 |             assets={
 556 |                 "guid": "asset-guid-here",
 557 |                 "name": "Customer Name Column",
 558 |                 "type_name": "Column",
 559 |                 "qualified_name": "default/snowflake/123456/abc/CUSTOMER_NAME"
 560 |             },
 561 |             attribute_name="term",
 562 |             attribute_values=[{
 563 |                 "operation": "append",
 564 |                 "term_guids": ["term-guid-1", "term-guid-2"]
 565 |             }]
 566 |         )
 567 | 
 568 |         # Replace all terms on multiple assets
 569 |         result = update_assets_tool(
 570 |             assets=[
 571 |                 {
 572 |                     "guid": "asset-guid-1",
 573 |                     "name": "Table 1",
 574 |                     "type_name": "Table",
 575 |                     "qualified_name": "default/snowflake/123456/abc/TABLE_1"
 576 |                 },
 577 |                 {
 578 |                     "guid": "asset-guid-2",
 579 |                     "name": "Table 2",
 580 |                     "type_name": "Table",
 581 |                     "qualified_name": "default/snowflake/123456/abc/TABLE_2"
 582 |                 }
 583 |             ],
 584 |             attribute_name="term",
 585 |             attribute_values=[
 586 |                 {
 587 |                     "operation": "replace",
 588 |                     "term_guids": ["new-term-for-table-1-guid-1", "new-term-for-table-1-guid-2"]
 589 |                 },
 590 |                 {
 591 |                     "operation": "replace",
 592 |                     "term_guids": ["new-term-for-table-2-guid-1", "new-term-for-table-2-guid-2"]
 593 |                 }
 594 |             ]
 595 |         )
 596 | 
 597 |         # Remove specific terms from an asset
 598 |         result = update_assets_tool(
 599 |             assets={
 600 |                 "guid": "asset-guid-here",
 601 |                 "name": "Customer Data Table",
 602 |                 "type_name": "Table",
 603 |                 "qualified_name": "default/snowflake/123456/abc/CUSTOMER_DATA"
 604 |             },
 605 |             attribute_name="term",
 606 |             attribute_values=[{
 607 |                 "operation": "remove",
 608 |                 "term_guids": ["term-guid-to-remove"]
 609 |             }]
 610 |         )
 611 |     """
 612 |     try:
 613 |         # Parse JSON parameters
 614 |         parsed_assets = parse_json_parameter(assets)
 615 |         parsed_attribute_values = parse_list_parameter(attribute_values)
 616 | 
 617 |         # Convert string attribute name to enum
 618 |         attr_enum = UpdatableAttribute(attribute_name)
 619 | 
 620 |         # Handle term operations - convert dict to TermOperations object
 621 |         if attr_enum == UpdatableAttribute.TERM:
 622 |             term_operations = []
 623 |             for value in parsed_attribute_values:
 624 |                 if isinstance(value, dict):
 625 |                     term_operations.append(TermOperations(**value))
 626 |                 else:
 627 |                     return {
 628 |                         "error": "Term attribute values must be dictionaries with 'operation' and 'term_guids' keys",
 629 |                         "updated_count": 0,
 630 |                     }
 631 |             parsed_attribute_values = term_operations
 632 |         # For certificate status, convert values to enum
 633 |         elif attr_enum == UpdatableAttribute.CERTIFICATE_STATUS:
 634 |             parsed_attribute_values = [
 635 |                 CertificateStatus(val) for val in parsed_attribute_values
 636 |             ]
 637 | 
 638 |         # Convert assets to UpdatableAsset objects
 639 |         if isinstance(parsed_assets, dict):
 640 |             updatable_assets = [UpdatableAsset(**parsed_assets)]
 641 |         else:
 642 |             updatable_assets = [UpdatableAsset(**asset) for asset in parsed_assets]
 643 | 
 644 |         return update_assets(
 645 |             updatable_assets=updatable_assets,
 646 |             attribute_name=attr_enum,
 647 |             attribute_values=parsed_attribute_values,
 648 |         )
 649 |     except (json.JSONDecodeError, ValueError, TypeError) as e:
 650 |         return {
 651 |             "error": f"Parameter parsing/conversion error: {str(e)}",
 652 |             "updated_count": 0,
 653 |         }
 654 | 
 655 | 
 656 | @mcp.tool()
 657 | def query_asset_tool(
 658 |     sql: str, connection_qualified_name: str, default_schema: str | None = None
 659 | ):
 660 |     """
 661 |     Execute a SQL query on a table/view asset.
 662 | 
 663 |     This tool enables querying table/view assets on the source similar to
 664 |     what's available in the insights table. It uses the Atlan query capabilities
 665 |     to execute SQL against connected data sources.
 666 | 
 667 |     CRITICAL: Use READ-ONLY queries to retrieve data. Write and modify queries are not supported by this tool.
 668 | 
 669 | 
 670 |     Args:
 671 |         sql (str): The SQL query to execute (read-only queries allowed)
 672 |         connection_qualified_name (str): Connection qualified name to use for the query.
 673 |             This is the same parameter used in search_assets_tool.
 674 |             You can find this value by searching for Table/View assets using search_assets_tool
 675 |             and extracting the first part of the 'qualifiedName' attribute.
 676 |             Example: from "default/snowflake/1657275059/LANDING/FRONTEND_PROD/PAGES"
 677 |             use "default/snowflake/1657275059"
 678 |         default_schema (str, optional): Default schema name to use for unqualified
 679 |             objects in the SQL, in the form "DB.SCHEMA"
 680 |             (e.g., "RAW.WIDEWORLDIMPORTERS_WAREHOUSE")
 681 | 
 682 |     Examples:
 683 |         # Use case: How to query the PAGES table and retrieve the first 10 rows
 684 |         # Find tables to query using search_assets_tool
 685 |         tables = search_assets_tool(
 686 |             asset_type="Table",
 687 |             conditions={"name": "PAGES"},
 688 |             limit=5
 689 |         )
 690 |         # Extract connection info from the table's qualifiedName
 691 |         # Example qualifiedName: "default/snowflake/1657275059/LANDING/FRONTEND_PROD/PAGES"
 692 |         # connection_qualified_name: "default/snowflake/1657275059"
 693 |         # database.schema: "LANDING.FRONTEND_PROD"
 694 | 
 695 |         # Query the table using extracted connection info
 696 |         result = query_asset_tool(
 697 |             sql='SELECT * FROM PAGES LIMIT 10',
 698 |             connection_qualified_name="default/snowflake/1657275059",
 699 |             default_schema="LANDING.FRONTEND_PROD"
 700 |         )
 701 | 
 702 |         # Query without specifying default schema (fully qualified table names)
 703 |         result = query_asset_tool(
 704 |             sql='SELECT COUNT(*) FROM "LANDING"."FRONTEND_PROD"."PAGES"',
 705 |             connection_qualified_name="default/snowflake/1657275059"
 706 |         )
 707 | 
 708 |         # Complex analytical query on PAGES table
 709 |         result = query_asset_tool(
 710 |             sql='''
 711 |             SELECT
 712 |                 page_type,
 713 |                 COUNT(*) AS page_count,
 714 |                 AVG(load_time) AS avg_load_time,
 715 |                 MAX(views) AS max_views
 716 |             FROM PAGES
 717 |             WHERE created_date >= '2024-01-01'
 718 |             GROUP BY page_type
 719 |             ORDER BY page_count DESC
 720 |             ''',
 721 |             connection_qualified_name="default/snowflake/1657275059",
 722 |             default_schema="LANDING.FRONTEND_PROD"
 723 |         )
 724 |     """
 725 |     return query_asset(sql, connection_qualified_name, default_schema)
 726 | 
 727 | 
 728 | @mcp.tool()
 729 | def create_glossaries(glossaries) -> List[Dict[str, Any]]:
 730 |     """
 731 |     Create one or multiple AtlasGlossary assets in Atlan.
 732 | 
 733 |     IMPORTANT BUSINESS RULES & CONSTRAINTS:
 734 |     - Check for duplicate names within the same request and ask user to choose different names
 735 |     - Do NOT use search tool before creating glossaries - Atlan will handle existence validation
 736 |     - If user gives ambiguous instructions, ask clarifying questions
 737 | 
 738 |     Args:
 739 |         glossaries (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single glossary
 740 |             specification (dict) or a list of glossary specifications. Each specification
 741 |             can be a dictionary containing:
 742 |             - name (str): Name of the glossary (required)
 743 |             - user_description (str, optional): Detailed description of the glossary
 744 |               proposed by the user
 745 |             - certificate_status (str, optional): Certification status
 746 |               ("VERIFIED", "DRAFT", or "DEPRECATED")
 747 | 
 748 |     Returns:
 749 |         List[Dict[str, Any]]: List of dictionaries, each with details for a created glossary:
 750 |             - guid: The GUID of the created glossary
 751 |             - name: The name of the glossary
 752 |             - qualified_name: The qualified name of the created glossary
 753 | 
 754 | 
 755 |     Examples:
 756 |         Multiple glossaries creation:
 757 |         [
 758 |             {
 759 |                 "name": "Business Terms",
 760 |                 "user_description": "Common business terminology",
 761 |                 "certificate_status": "VERIFIED"
 762 |             },
 763 |             {
 764 |                 "name": "Technical Dictionary",
 765 |                 "user_description": "Technical terminology and definitions",
 766 |                 "certificate_status": "DRAFT"
 767 |             }
 768 |         ]
 769 |     """
 770 | 
 771 |     # Parse parameters to handle JSON strings using shared utility
 772 |     try:
 773 |         glossaries = parse_json_parameter(glossaries)
 774 |     except json.JSONDecodeError as e:
 775 |         return {"error": f"Invalid JSON format for glossaries parameter: {str(e)}"}
 776 | 
 777 |     return create_glossary_assets(glossaries)
 778 | 
 779 | 
 780 | @mcp.tool()
 781 | def create_glossary_terms(terms) -> List[Dict[str, Any]]:
 782 |     """
 783 |     Create one or multiple AtlasGlossaryTerm assets in Atlan.
 784 | 
 785 |     IMPORTANT BUSINESS RULES & CONSTRAINTS:
 786 |     - Within a glossary, a term (single GUID) can be associated with many categories
 787 |     - Two terms with the same name CANNOT exist within the same glossary (regardless of categories)
 788 |     - A term is always anchored to a glossary and may also be associated with one or more categories inside the same glossary
 789 |     - Before creating a term, perform a single search to check if the glossary, categories, or term with the same name already exist. Search for all relevant glossaries, categories, and terms in one call. Skip this step if you already have the required GUIDs.
 790 |     - Example call for searching glossary categories and terms before term creation(Query - create a term fighterz under category Characters and Locations under Marvel Cinematic Universe (MCU) glossary):
 791 |         {
 792 |             "limit": 10,
 793 |             "conditions": {
 794 |                 "type_name": ["AtlasGlossary", "AtlasGlossaryCategory","AtlasGlossaryTerm"],
 795 |                 "name": ["Marvel Cinematic Universe (MCU)", "Characters", "Locations","fighterz"]
 796 |             }
 797 |         }
 798 | 
 799 |     Args:
 800 |         terms (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single term
 801 |             specification (dict) or a list of term specifications. Each specification
 802 |             can be a dictionary containing:
 803 |             - name (str): Name of the term (required)
 804 |             - glossary_guid (str): GUID of the glossary this term belongs to (required)
 805 |             - user_description (str, optional): Detailed description of the term
 806 |               proposed by the user
 807 |             - certificate_status (str, optional): Certification status
 808 |               ("VERIFIED", "DRAFT", or "DEPRECATED")
 809 |             - category_guids (List[str], optional): List of category GUIDs this term
 810 |               belongs to.
 811 | 
 812 |     Returns:
 813 |         List[Dict[str, Any]]: List of dictionaries, each with details for a created term:
 814 |             - guid: The GUID of the created term
 815 |             - name: The name of the term
 816 |             - qualified_name: The qualified name of the created term
 817 | 
 818 |     Examples:
 819 |         Multiple terms creation:
 820 |         [
 821 |             {
 822 |                 "name": "Customer",
 823 |                 "glossary_guid": "glossary-guid-here",
 824 |                 "user_description": "An individual or organization that purchases goods or services",
 825 |                 "certificate_status": "VERIFIED"
 826 |             },
 827 |             {
 828 |                 "name": "Annual Recurring Revenue",
 829 |                 "glossary_guid": "glossary-guid-here",
 830 |                 "user_description": "The yearly value of recurring revenue from customers",
 831 |                 "certificate_status": "DRAFT",
 832 |                 "category_guids": ["category-guid-1"]
 833 |             }
 834 |         ]
 835 |     """
 836 |     # Parse parameters to handle JSON strings using shared utility
 837 |     try:
 838 |         terms = parse_json_parameter(terms)
 839 |     except json.JSONDecodeError as e:
 840 |         return {"error": f"Invalid JSON format for terms parameter: {str(e)}"}
 841 | 
 842 |     return create_glossary_term_assets(terms)
 843 | 
 844 | 
 845 | @mcp.tool()
 846 | def create_glossary_categories(categories) -> List[Dict[str, Any]]:
 847 |     """
 848 |     Create one or multiple AtlasGlossaryCategory assets in Atlan.
 849 | 
 850 |     IMPORTANT BUSINESS RULES & CONSTRAINTS:
 851 |     - There cannot be two categories with the same name under the same glossary (at the same level)
 852 |     - Under a parent category, there cannot be subcategories with the same name (at the same level)
 853 |     - Categories with the same name can exist under different glossaries (this is allowed)
 854 |     - Cross-level naming is allowed: category "a" can have subcategory "b", and category "b" can have subcategory "a"
 855 |     - Example allowed structure: Glossary "bui" → category "a" → subcategory "b" AND category "b" → subcategory "a"
 856 |     - Always check for duplicate names at the same level and ask user to choose different names
 857 |     - Before creating a category, perform a single search to check if the glossary or categories with the same name already exist. Skip this step if you already have the required GUIDs.
 858 |     - Example call for searching glossary and categories before category creation(Query - create categories Locations and Characters under Marvel Cinematic Universe (MCU) glossary):
 859 |         {
 860 |             "limit": 10,
 861 |             "conditions": {
 862 |                 "type_name": ["AtlasGlossary", "AtlasGlossaryCategory"],
 863 |                 "name": ["Marvel Cinematic Universe (MCU)", "Characters", "Locations"]
 864 |             }
 865 |         }
 866 |     - If user gives ambiguous instructions, ask clarifying questions
 867 | 
 868 |     Args:
 869 |         categories (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single category
 870 |             specification (dict) or a list of category specifications. Each specification
 871 |             can be a dictionary containing:
 872 |             - name (str): Name of the category (required)
 873 |             - glossary_guid (str): GUID of the glossary this category belongs to (required)
 874 |             - user_description (str, optional): Detailed description of the category
 875 |               proposed by the user
 876 |             - certificate_status (str, optional): Certification status
 877 |               ("VERIFIED", "DRAFT", or "DEPRECATED")
 878 |             - parent_category_guid (str, optional): GUID of the parent category if this
 879 |               is a subcategory
 880 | 
 881 |     Returns:
 882 |         List[Dict[str, Any]]: List of dictionaries, each with details for a created category:
 883 |             - guid: The GUID of the created category
 884 |             - name: The name of the category
 885 |             - qualified_name: The qualified name of the created category
 886 | 
 887 |     Examples:
 888 |         Multiple categories creation:
 889 |         [
 890 |             {
 891 |                 "name": "Customer Data",
 892 |                 "glossary_guid": "glossary-guid-here",
 893 |                 "user_description": "Terms related to customer information and attributes",
 894 |                 "certificate_status": "VERIFIED"
 895 |             },
 896 |             {
 897 |                 "name": "PII",
 898 |                 "glossary_guid": "glossary-guid-here",
 899 |                 "parent_category_guid": "parent-category-guid-here",
 900 |                 "user_description": "Subcategory for PII terms",
 901 |                 "certificate_status": "DRAFT"
 902 |             }
 903 |         ]
 904 |     """
 905 |     # Parse parameters to handle JSON strings using shared utility
 906 |     try:
 907 |         categories = parse_json_parameter(categories)
 908 |     except json.JSONDecodeError as e:
 909 |         return {"error": f"Invalid JSON format for categories parameter: {str(e)}"}
 910 | 
 911 |     return create_glossary_category_assets(categories)
 912 | 
 913 | 
 914 | @mcp.tool()
 915 | def create_domains(domains) -> List[Dict[str, Any]]:
 916 |     """
 917 |     Create Data Domains or Sub Domains in Atlan.
 918 | 
 919 |     IMPORTANT BUSINESS RULES & CONSTRAINTS:
 920 |     - Before creating a domain/subdomain, you may want to search for existing
 921 |       domains to avoid duplicates or to get the qualified_name for parent relationships
 922 |     - Domain names must be unique at the top level
 923 |     - Subdomain names must be unique within the same parent domain
 924 | 
 925 |     Args:
 926 |         domains (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single domain
 927 |             specification (dict) or a list of domain specifications.
 928 | 
 929 |     For Data Domain:
 930 |         - name (str): Name of the domain (required)
 931 |         - user_description (str, optional): Detailed description
 932 |         - certificate_status (str, optional): "VERIFIED", "DRAFT", or "DEPRECATED"
 933 | 
 934 |     For Sub Domain:
 935 |         - name (str): Name of the subdomain (required)
 936 |         - parent_domain_qualified_name (str): Qualified name of parent domain (required)
 937 |         - user_description (str, optional): Detailed description
 938 |         - certificate_status (str, optional): "VERIFIED", "DRAFT", or "DEPRECATED"
 939 | 
 940 |     Returns:
 941 |         List[Dict[str, Any]]: List of dictionaries, each with details for a created asset:
 942 |             - guid: The GUID of the created asset
 943 |             - name: The name of the asset
 944 |             - qualified_name: The qualified name of the created asset
 945 | 
 946 |     Examples:
 947 |         # Create a single Data Domain
 948 |         create_domains({
 949 |             "name": "Marketing",
 950 |             "user_description": "Marketing data domain",
 951 |             "certificate_status": "VERIFIED"
 952 |         })
 953 | 
 954 |         # Create a Sub Domain under an existing domain
 955 |         create_domains({
 956 |             "name": "Social Marketing",
 957 |             "parent_domain_qualified_name": "default/domain/marketing",
 958 |             "user_description": "Social media marketing subdomain",
 959 |             "certificate_status": "DRAFT"
 960 |         })
 961 | 
 962 |         # Create multiple domains in one call
 963 |         create_domains([
 964 |             {
 965 |                 "name": "Sales",
 966 |                 "user_description": "Sales data domain"
 967 |             },
 968 |             {
 969 |                 "name": "E-commerce Sales",
 970 |                 "parent_domain_qualified_name": "default/domain/sales",
 971 |                 "user_description": "E-commerce sales subdomain"
 972 |             }
 973 |         ])
 974 |     """
 975 |     # Parse parameters to handle JSON strings using shared utility
 976 |     try:
 977 |         domains = parse_json_parameter(domains)
 978 |     except json.JSONDecodeError as e:
 979 |         return {"error": f"Invalid JSON format for domains parameter: {str(e)}"}
 980 | 
 981 |     return create_data_domain_assets(domains)
 982 | 
 983 | 
 984 | @mcp.tool()
 985 | def create_data_products(products) -> List[Dict[str, Any]]:
 986 |     """
 987 |     Create Data Products in Atlan.
 988 | 
 989 |     IMPORTANT BUSINESS RULES & CONSTRAINTS:
 990 |     - Before creating a product, you may want to search for existing domains
 991 |       to get the qualified_name for the domain relationship
 992 |     - Product names must be unique within the same domain
 993 |     - At least one asset GUID must be provided for each product
 994 | 
 995 |     Args:
 996 |         products (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single product
 997 |             specification (dict) or a list of product specifications.
 998 | 
 999 |     For Data Product:
1000 |         - name (str): Name of the product (required)
1001 |         - domain_qualified_name (str): Qualified name of the domain (required)
1002 |         - asset_guids (List[str]): List of asset GUIDs to link to this product (required).
1003 |           At least one asset GUID must be provided. Use search_assets_tool to find asset GUIDs.
1004 |         - user_description (str, optional): Detailed description
1005 |         - certificate_status (str, optional): "VERIFIED", "DRAFT", or "DEPRECATED"
1006 | 
1007 |     Returns:
1008 |         List[Dict[str, Any]]: List of dictionaries, each with details for a created asset:
1009 |             - guid: The GUID of the created asset
1010 |             - name: The name of the asset
1011 |             - qualified_name: The qualified name of the created asset
1012 | 
1013 |     Examples:
1014 |         # Create a Data Product with linked assets (asset_guids required)
1015 |         # First, search for assets to get their GUIDs using search_assets_tool
1016 |         create_data_products({
1017 |             "name": "Marketing Influence",
1018 |             "domain_qualified_name": "default/domain/marketing",
1019 |             "user_description": "Product for marketing influence analysis",
1020 |             "asset_guids": ["asset-guid-1", "asset-guid-2"]  # GUIDs from search_assets_tool
1021 |         })
1022 | 
1023 |         # Create multiple products in one call
1024 |         create_data_products([
1025 |             {
1026 |                 "name": "Sales Analytics",
1027 |                 "domain_qualified_name": "default/domain/sales",
1028 |                 "user_description": "Sales analytics product",
1029 |                 "asset_guids": ["table-guid-1", "table-guid-2"]
1030 |             },
1031 |             {
1032 |                 "name": "Customer Insights",
1033 |                 "domain_qualified_name": "default/domain/marketing",
1034 |                 "user_description": "Customer insights product",
1035 |                 "asset_guids": ["view-guid-1"]
1036 |             }
1037 |         ])
1038 |     """
1039 |     # Parse parameters to handle JSON strings using shared utility
1040 |     try:
1041 |         products = parse_json_parameter(products)
1042 |     except json.JSONDecodeError as e:
1043 |         return {"error": f"Invalid JSON format for products parameter: {str(e)}"}
1044 | 
1045 |     return create_data_product_assets(products)
1046 | 
1047 | 
1048 | @mcp.tool()
1049 | def create_dq_rules_tool(rules):
1050 |     """
1051 |     Create one or multiple data quality rules in Atlan.
1052 | 
1053 |     Supports all rule types: column-level, table-level, and custom SQL rules.
1054 |     Rules can be created individually or in bulk for efficient setup.
1055 | 
1056 |     Args:
1057 |         rules (Union[Dict[str, Any], List[Dict[str, Any]]]): Either a single rule
1058 |             specification or a list of rule specifications. Each specification
1059 |             must include:
1060 |             - rule_type (str): Type of rule (see Supported Rule Types) [REQUIRED]
1061 |             - asset_qualified_name (str): Qualified name of the asset (Table, View, MaterialisedView, or SnowflakeDynamicTable) [REQUIRED]
1062 |             - asset_type (str): Type of asset - "Table" | "View" | "MaterialisedView" | "SnowflakeDynamicTable" [OPTIONAL, default: "Table"]
1063 |             - threshold_value (int/float): Threshold value for comparison [REQUIRED]
1064 |             - column_qualified_name (str): Column qualified name [REQUIRED for column-level rules, NOT for Row Count/Custom SQL]
1065 |             - threshold_compare_operator (str): Comparison operator (EQUAL, GREATER_THAN, etc.) [OPTIONAL, default varies by rule]
1066 |             - threshold_unit (str): Time unit for Freshness rules (DAYS, HOURS, MINUTES) [REQUIRED for Freshness, N/A for others]
1067 |             - alert_priority (str): Alert priority level (LOW, NORMAL, URGENT) [OPTIONAL, default: NORMAL]
1068 |             - row_scope_filtering_enabled (bool): Enable row-level filtering [OPTIONAL]
1069 |             - rule_conditions (List[Dict]): Conditions for String Length/Regex/Valid Values [REQUIRED for conditional rules]
1070 |             - custom_sql (str): SQL query [REQUIRED for Custom SQL rules]
1071 |             - rule_name (str): Name for the rule [REQUIRED for Custom SQL rules]
1072 |             - dimension (str): DQ dimension [REQUIRED for Custom SQL rules]
1073 |             - description (str): Rule description [OPTIONAL]
1074 | 
1075 |     Returns:
1076 |         Dict[str, Any]: Dictionary containing:
1077 |             - created_count: Number of rules successfully created
1078 |             - created_rules: List of created rules with guid, qualified_name, rule_type
1079 |             - errors: List of any errors encountered
1080 | 
1081 |     Examples:
1082 |         # Column-level rules (Null Count, Min/Max Value, Unique/Duplicate Count, etc.)
1083 |         rule = create_dq_rules_tool({
1084 |             "rule_type": "Null Count",  # or "Min Value", "Max Value", "Unique Count", etc.
1085 |             "asset_qualified_name": "default/snowflake/123/DB/SCHEMA/TABLE",
1086 |             "column_qualified_name": "default/snowflake/123/DB/SCHEMA/TABLE/EMAIL",
1087 |             "threshold_compare_operator": "LESS_THAN_EQUAL",  # EQUAL, GREATER_THAN, etc.
1088 |             "threshold_value": 5,
1089 |             "alert_priority": "URGENT",  # LOW, NORMAL, URGENT
1090 |             "row_scope_filtering_enabled": True,
1091 |             "description": "Email column should have minimal nulls"
1092 |         })
1093 | 
1094 |         # Conditional rules (String Length, Regex, Valid Values)
1095 |         rule = create_dq_rules_tool({
1096 |             "rule_type": "String Length",  # or "Regex", "Valid Values"
1097 |             "asset_qualified_name": "default/snowflake/123/DB/SCHEMA/TABLE",
1098 |             "column_qualified_name": "default/snowflake/123/DB/SCHEMA/TABLE/PHONE",
1099 |             "threshold_value": 10,
1100 |             "alert_priority": "URGENT",
1101 |             "rule_conditions": [{
1102 |                 "type": "STRING_LENGTH_BETWEEN",  # See Rule Condition Types below
1103 |                 "min_value": 10,
1104 |                 "max_value": 15
1105 |             }],
1106 |             # For Regex: {"type": "REGEX_NOT_MATCH", "value": "pattern"}
1107 |             # For Valid Values: {"type": "IN_LIST", "value": ["ACTIVE", "INACTIVE"]}
1108 |             "row_scope_filtering_enabled": True
1109 |         })
1110 | 
1111 |         # Table-level (Row Count) and Time-based (Freshness)
1112 |         rule = create_dq_rules_tool({
1113 |             "rule_type": "Row Count",  # No column_qualified_name needed
1114 |             "asset_qualified_name": "default/snowflake/123/DB/SCHEMA/TABLE",
1115 |             "asset_type": "Table",  # Optional: "Table" (default), "View", "MaterialisedView", "SnowflakeDynamicTable"
1116 |             "threshold_compare_operator": "GREATER_THAN_EQUAL",
1117 |             "threshold_value": 1000,
1118 |             "alert_priority": "URGENT"
1119 |         })
1120 |         # For Freshness: Add "column_qualified_name" + "threshold_unit": "DAYS"/"HOURS"/"MINUTES"
1121 | 
1122 |         # Custom SQL rule
1123 |         rule = create_dq_rules_tool({
1124 |             "rule_type": "Custom SQL",
1125 |             "asset_qualified_name": "default/snowflake/123/DB/SCHEMA/TABLE",
1126 |             "rule_name": "Revenue Consistency Check",
1127 |             "custom_sql": "SELECT COUNT(*) FROM TABLE WHERE revenue < 0 OR revenue > 1000000",
1128 |             "threshold_compare_operator": "EQUAL",
1129 |             "threshold_value": 0,
1130 |             "alert_priority": "URGENT",
1131 |             "dimension": "CONSISTENCY",  # See Data Quality Dimensions below
1132 |             "description": "Ensure revenue values are within expected range"
1133 |         })
1134 | 
1135 |         # Bulk creation - Pass array instead of single dict
1136 |         rules = create_dq_rules_tool([
1137 |             {"rule_type": "Null Count", "column_qualified_name": "...EMAIL", ...},
1138 |             {"rule_type": "Duplicate Count", "column_qualified_name": "...USER_ID", ...},
1139 |             {"rule_type": "Row Count", "asset_qualified_name": "...", ...}
1140 |         ])
1141 | 
1142 |     Supported Rule Types:
1143 |         Completeness: "Null Count", "Null Percentage", "Blank Count", "Blank Percentage"
1144 |         Statistical: "Min Value", "Max Value", "Average", "Standard Deviation"
1145 |         Uniqueness: "Unique Count", "Duplicate Count"
1146 |         Validity: "Regex", "String Length", "Valid Values"
1147 |         Timeliness: "Freshness"
1148 |         Volume: "Row Count"
1149 |         Custom: "Custom SQL"
1150 | 
1151 |     Supported Asset Types:
1152 |         "Table", "View", "MaterialisedView", "SnowflakeDynamicTable"
1153 | 
1154 |     Valid Alert Priority Levels:
1155 |         "LOW", "NORMAL" (default), "URGENT"
1156 | 
1157 |     Threshold Operators:
1158 |         "EQUAL", "GREATER_THAN", "GREATER_THAN_EQUAL", "LESS_THAN", "LESS_THAN_EQUAL", "BETWEEN"
1159 | 
1160 |     Threshold Units (Freshness only):
1161 |         "DAYS", "HOURS", "MINUTES"
1162 | 
1163 |     Data Quality Dimensions (Custom SQL only):
1164 |         "COMPLETENESS", "VALIDITY", "UNIQUENESS", "TIMELINESS", "VOLUME", "ACCURACY", "CONSISTENCY"
1165 | 
1166 |     Rule Condition Types:
1167 |         String Length: "STRING_LENGTH_EQUALS", "STRING_LENGTH_BETWEEN",
1168 |                       "STRING_LENGTH_GREATER_THAN", "STRING_LENGTH_LESS_THAN"
1169 |         Regex: "REGEX_MATCH", "REGEX_NOT_MATCH"
1170 |         Valid Values: "IN_LIST", "NOT_IN_LIST"
1171 |     """
1172 |     try:
1173 |         parsed_rules = parse_json_parameter(rules)
1174 |         return create_dq_rules(parsed_rules)
1175 |     except (json.JSONDecodeError, ValueError) as e:
1176 |         return {
1177 |             "created_count": 0,
1178 |             "created_rules": [],
1179 |             "errors": [f"Parameter parsing error: {str(e)}"],
1180 |         }
1181 | 
1182 | 
1183 | @mcp.tool()
1184 | def schedule_dq_rules_tool(schedules):
1185 |     """
1186 |     Schedule data quality rule execution for one or multiple assets.
1187 | 
1188 |     Args:
1189 |         schedules: Single schedule or list of schedules. Each schedule requires:
1190 |             - asset_type (str): "Table", "View", "MaterialisedView", or "SnowflakeDynamicTable"
1191 |             - asset_name (str): Name of the asset
1192 |             - asset_qualified_name (str): Qualified name of the asset
1193 |             - schedule_crontab (str): Cron expression (5 fields: min hour day month weekday)
1194 |             - schedule_time_zone (str): Timezone (e.g., "UTC", "America/New_York")
1195 | 
1196 |     Returns:
1197 |         Dict with scheduled_count, scheduled_assets, and errors.
1198 | 
1199 |     Example:
1200 |         schedule_dq_rules_tool({
1201 |             "asset_type": "Table",
1202 |             "asset_name": "CUSTOMERS",
1203 |             "asset_qualified_name": "default/snowflake/123/DB/SCHEMA/CUSTOMERS",
1204 |             "schedule_crontab": "0 2 * * *",
1205 |             "schedule_time_zone": "UTC"
1206 |         })
1207 |     """
1208 |     try:
1209 |         parsed_schedules = parse_json_parameter(schedules)
1210 |         return schedule_dq_rules(parsed_schedules)
1211 |     except (json.JSONDecodeError, ValueError) as e:
1212 |         return {
1213 |             "scheduled_count": 0,
1214 |             "scheduled_assets": [],
1215 |             "errors": [f"Parameter parsing error: {str(e)}"],
1216 |         }
1217 | 
1218 | 
1219 | @mcp.tool()
1220 | def delete_dq_rules_tool(rule_guids):
1221 |     """
1222 |     Delete one or multiple data quality rules in Atlan.
1223 | 
1224 |     Args:
1225 |         rule_guids: Single rule GUID (string) or list of rule GUIDs to delete.
1226 | 
1227 |     Returns:
1228 |         Dict with deleted_count, deleted_rules (list of GUIDs), and errors.
1229 | 
1230 |     Example:
1231 |         # Delete single rule
1232 |         delete_dq_rules_tool("rule-guid-123")
1233 | 
1234 |         # Delete multiple rules
1235 |         delete_dq_rules_tool(["rule-guid-1", "rule-guid-2"])
1236 |     """
1237 |     try:
1238 |         parsed_guids = parse_json_parameter(rule_guids)
1239 |         return delete_dq_rules(parsed_guids)
1240 |     except (json.JSONDecodeError, ValueError) as e:
1241 |         return {
1242 |             "deleted_count": 0,
1243 |             "deleted_rules": [],
1244 |             "errors": [f"Parameter parsing error: {str(e)}"],
1245 |         }
1246 | 
1247 | 
1248 | @mcp.tool()
1249 | def update_dq_rules_tool(rules):
1250 |     """
1251 |     Update existing data quality rules in Atlan.
1252 | 
1253 |     Args:
1254 |         rules: Single rule dict or list of rule dicts. Required fields:
1255 |             - qualified_name: Rule's qualified name
1256 |             - rule_type: Rule type (e.g., "Null Count", "Row Count", "Custom SQL")
1257 |             - asset_qualified_name: Table/view qualified name
1258 |         Optional fields: threshold_value, threshold_compare_operator, threshold_unit,
1259 |         alert_priority, custom_sql, rule_name, dimension, rule_conditions,
1260 |         row_scope_filtering_enabled, description
1261 | 
1262 |     Returns:
1263 |         Dict with updated_count, updated_rules, and errors.
1264 | 
1265 |     Examples:
1266 |         # Single rule update
1267 |         update_dq_rules_tool({
1268 |             "qualified_name": "default/snowflake/123/DB/SCHEMA/TABLE/rule/abc-123",
1269 |             "rule_type": "Null Count",
1270 |             "asset_qualified_name": "default/snowflake/123/DB/SCHEMA/TABLE",
1271 |             "threshold_value": 10,
1272 |             "alert_priority": "URGENT"
1273 |         })
1274 | 
1275 |         # Bulk update with conditions
1276 |         update_dq_rules_tool([
1277 |             {"qualified_name": "...", "rule_type": "Null Count", "threshold_value": 5},
1278 |             {"qualified_name": "...", "rule_type": "String Length",
1279 |              "rule_conditions": [{"type": "STRING_LENGTH_BETWEEN", "min_value": 10, "max_value": 100}]}
1280 |         ])
1281 | 
1282 |     Rule Types: "Null Count", "Null Percentage", "Blank Count", "Blank Percentage",
1283 |     "Min Value", "Max Value", "Average", "Standard Deviation", "Unique Count",
1284 |     "Duplicate Count", "Regex", "String Length", "Valid Values", "Freshness",
1285 |     "Row Count", "Custom SQL"
1286 | 
1287 |     Alert Priority: "LOW", "NORMAL", "URGENT"
1288 |     Operators: "EQUAL", "GREATER_THAN", "GREATER_THAN_EQUAL", "LESS_THAN",
1289 |                "LESS_THAN_EQUAL", "BETWEEN"
1290 |     Threshold Units: "DAYS", "HOURS", "MINUTES" (Freshness only)
1291 |     Dimensions: "COMPLETENESS", "VALIDITY", "UNIQUENESS", "TIMELINESS", "VOLUME",
1292 |                 "ACCURACY", "CONSISTENCY" (Custom SQL only)
1293 |     Condition Types: "STRING_LENGTH_EQUALS", "STRING_LENGTH_BETWEEN",
1294 |                      "STRING_LENGTH_GREATER_THAN", "STRING_LENGTH_LESS_THAN",
1295 |                      "REGEX_MATCH", "REGEX_NOT_MATCH", "IN_LIST", "NOT_IN_LIST"
1296 |     """
1297 |     try:
1298 |         parsed_rules = parse_json_parameter(rules)
1299 |         return update_dq_rules(parsed_rules)
1300 |     except (json.JSONDecodeError, ValueError) as e:
1301 |         return {
1302 |             "updated_count": 0,
1303 |             "updated_rules": [],
1304 |             "errors": [f"Parameter parsing error: {str(e)}"],
1305 |         }
1306 | 
1307 | 
1308 | def main():
1309 |     """Main entry point for the Atlan MCP Server."""
1310 | 
1311 |     settings = get_settings()
1312 | 
1313 |     parser = argparse.ArgumentParser(description="Atlan MCP Server")
1314 |     parser.add_argument(
1315 |         "--transport",
1316 |         type=str,
1317 |         default=settings.MCP_TRANSPORT,
1318 |         choices=["stdio", "sse", "streamable-http"],
1319 |         help="Transport protocol (stdio/sse/streamable-http)",
1320 |     )
1321 |     parser.add_argument(
1322 |         "--host",
1323 |         type=str,
1324 |         default=settings.MCP_HOST,
1325 |         help="Host to run the server on",
1326 |     )
1327 |     parser.add_argument(
1328 |         "--port",
1329 |         type=int,
1330 |         default=settings.MCP_PORT,
1331 |         help="Port to run the server on",
1332 |     )
1333 |     parser.add_argument(
1334 |         "--path",
1335 |         type=str,
1336 |         default=settings.MCP_PATH,
1337 |         help="Path of the streamable HTTP server",
1338 |     )
1339 |     args = parser.parse_args()
1340 | 
1341 |     kwargs = {"transport": args.transport}
1342 |     if args.transport == "streamable-http" or args.transport == "sse":
1343 |         kwargs = {
1344 |             "transport": args.transport,
1345 |             "host": args.host,
1346 |             "port": args.port,
1347 |             "path": args.path,
1348 |         }
1349 |     # Run the server with the specified transport and host/port/path
1350 |     mcp.run(**kwargs)
1351 | 
1352 | 
1353 | if __name__ == "__main__":
1354 |     main()
1355 | 
```
Page 2/2FirstPrevNextLast