This is page 1 of 3. Use http://codebase.md/ocean-zhc/dolphinscheduler-mcp?lines=false&page={x} to view the full context. # Directory Structure ``` ├── .env.example ├── .gitignore ├── CHANGELOG.md ├── docker-compose.yml ├── Dockerfile ├── docs │ ├── api.md │ └── installation.md ├── ds-restfuleapi-v1.json ├── example_mcp.py ├── examples │ ├── list_projects.py │ ├── manage_resources.py │ ├── simple_client.py │ └── start_workflow.py ├── install_dev.sh ├── mcp-openapi-split │ ├── base │ │ └── 01_base.json │ ├── paths │ │ ├── 01_azure_datafactory_api.json │ │ ├── 02_dynamic_task_type_api.json │ │ ├── 03_kubernetes_namespace_api.json │ │ ├── 04_project_worker_group_api.json │ │ ├── 05_ui_plugin_api.json │ │ ├── 06_worker_group_api.json │ │ ├── 07_project_preference_api.json │ │ ├── 08_task_definition_api.json │ │ ├── 09_task_instance_api.json │ │ ├── 10_task_analysis_api.json │ │ ├── 11_task_group_api.json │ │ ├── 12_favourite_api.json │ │ ├── 13_alert_plugin_instance_api.json │ │ ├── 14_alert_group_api.json │ │ ├── 15_schedule_api.json │ │ ├── 16_audit_log_api.json │ │ ├── 17_process_task_relation_api.json │ │ ├── 18_workflow_lineage_api.json │ │ ├── 19_datasource_api.json │ │ ├── 20_data_quality_api.json │ │ ├── 21_log_api.json │ │ ├── 22_process_definition_api.json │ │ ├── 23_process_instance_api.json │ │ ├── 24_process_execution_api.json │ │ ├── 25_environment_api.json │ │ ├── 26_login_api.json │ │ ├── 27_user_api.json │ │ ├── 28_monitor_api.json │ │ ├── 29_tenant_api.json │ │ ├── 30_token_api.json │ │ ├── 31_resource_api.json │ │ ├── 32_queue_api.json │ │ ├── 33_cluster_api.json │ │ ├── 34_project_parameter_api.json │ │ └── 35_project_api.json │ ├── schemas │ │ ├── 01_alert_schemas.json │ │ ├── 02_cluster_schemas.json │ │ ├── 03_datasource_schemas.json │ │ ├── 04_environment_schemas.json │ │ ├── 05_k8s_schemas.json │ │ ├── 06_project_schemas.json │ │ ├── 07_queue_schemas.json │ │ ├── 08_remaining_1_schemas.json │ │ ├── 09_remaining_10_schemas.json │ │ ├── 10_remaining_11_schemas.json │ │ ├── 11_remaining_12_schemas.json │ │ ├── 12_remaining_13_schemas.json │ │ ├── 13_remaining_2_schemas.json │ │ ├── 14_remaining_3_schemas.json │ │ ├── 15_remaining_4_schemas.json │ │ ├── 16_remaining_5_schemas.json │ │ ├── 17_remaining_6_schemas.json │ │ ├── 18_remaining_7_schemas.json │ │ ├── 19_remaining_8_schemas.json │ │ ├── 20_remaining_9_schemas.json │ │ ├── 21_resource_schemas.json │ │ ├── 22_result_schemas.json │ │ ├── 23_schedule_schemas.json │ │ ├── 24_task_schemas.json │ │ ├── 25_tenant_schemas.json │ │ ├── 26_user_schemas.json │ │ ├── 27_worker_schemas.json │ │ └── 28_workflow_schemas.json │ └── utils │ ├── combine_openapi.py │ └── split_openapi.py ├── pyproject.toml ├── README.md ├── requirements-dev.txt ├── run.bat ├── run.py ├── run.sh ├── src │ ├── __init__.py │ └── dolphinscheduler_mcp │ ├── __init__.py │ ├── __main__.py │ ├── cli.py │ ├── client.py │ ├── config.py │ ├── fastmcp_compat.py │ ├── server.py │ ├── tools │ ├── tools_generated │ │ ├── __init__.py │ │ ├── access_token_tools.py │ │ ├── audit_log_tools.py │ │ ├── azure_data_factory_tools.py │ │ ├── datasource_tools.py │ │ ├── dynamic_task_type_tools.py │ │ ├── environment_check_tools.py │ │ ├── environment_update_tools.py │ │ ├── k8s_namespace_tools.py │ │ ├── lineage_tools.py │ │ ├── process_task_relation_tools.py │ │ ├── project_parameter_tools.py │ │ ├── project_preference_tools.py │ │ ├── project_tools.py │ │ ├── project_worker_group_tools.py │ │ ├── README.md │ │ ├── template_tools.py │ │ ├── ui_plugin_tools.py │ │ └── worker_group_tools.py │ ├── tools_loader.py │ └── tools.py.bak ├── test_create_project.py ├── test_env_settings.py └── tests ├── __init__.py └── test_config.py ``` # Files -------------------------------------------------------------------------------- /.env.example: -------------------------------------------------------------------------------- ``` DOLPHINSCHEDULER_API_URL=http://localhost:12345/dolphinscheduler # Default DolphinScheduler REST API URL DOLPHINSCHEDULER_API_KEY=your_api_key # Optional: Default DolphinScheduler API key ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python __pycache__/ *.py[cod] *$py.class *.so .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ *.egg-info/ .installed.cfg *.egg MANIFEST # Environment .env .venv env/ venv/ ENV/ env.bak/ venv.bak/ # IDE .idea/ .vscode/ *.swp *.swo .trae/ # OS .DS_Store Thumbs.db ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # DolphinScheduler MCP Server A Model Context Protocol (MCP) server for Apache DolphinScheduler, allowing AI agents to interact with DolphinScheduler through a standardized protocol. ## Overview DolphinScheduler MCP provides a FastMCP-based server that exposes DolphinScheduler's REST API as a collection of tools that can be used by AI agents. The server acts as a bridge between AI models and DolphinScheduler, enabling AI-driven workflow management. ## Features - Full API coverage of DolphinScheduler functionality - Standardized tool interfaces following the Model Context Protocol - Easy configuration through environment variables or command-line arguments - Comprehensive tool documentation ## Installation ```bash pip install dolphinscheduler-mcp ``` ## Configuration ### Environment Variables - `DOLPHINSCHEDULER_API_URL`: URL for the DolphinScheduler API (default: http://localhost:12345/dolphinscheduler) - `DOLPHINSCHEDULER_API_KEY`: API token for authentication with the DolphinScheduler API - `DOLPHINSCHEDULER_MCP_HOST`: Host to bind the MCP server (default: 0.0.0.0) - `DOLPHINSCHEDULER_MCP_PORT`: Port to bind the MCP server (default: 8089) - `DOLPHINSCHEDULER_MCP_LOG_LEVEL`: Logging level (default: INFO) ## Usage ### Command Line Start the server using the command-line interface: ```bash ds-mcp --host 0.0.0.0 --port 8089 ``` ### Python API ```python from dolphinscheduler_mcp.server import run_server # Start the server run_server(host="0.0.0.0", port=8089) ``` ## Available Tools The DolphinScheduler MCP Server provides tools for: - Project Management - Process Definition Management - Process Instance Management - Task Definition Management - Scheduling Management - Resource Management - Data Source Management - Alert Group Management - Alert Plugin Management - Worker Group Management - Tenant Management - User Management - System Status Monitoring ## Example Client Usage ```python from mcp_client import MCPClient # Connect to the MCP server client = MCPClient("http://localhost:8089/mcp") # Get a list of projects response = await client.invoke_tool("get-project-list") # Create a new project response = await client.invoke_tool( "create-project", {"name": "My AI Project", "description": "Project created by AI"} ) ``` ## License Apache License 2.0 ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/05_k8s_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": {} } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/06_project_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": {} } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/21_resource_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": {} } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/27_worker_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": {} } } ``` -------------------------------------------------------------------------------- /tests/__init__.py: -------------------------------------------------------------------------------- ```python """Test package for DolphinScheduler MCP Server.""" ``` -------------------------------------------------------------------------------- /requirements-dev.txt: -------------------------------------------------------------------------------- ``` pytest>=7.3.1 black>=23.3.0 isort>=5.12.0 mypy>=1.3.0 pre-commit>=3.3.2 fastmcp ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/__init__.py: -------------------------------------------------------------------------------- ```python """DolphinScheduler MCP Server Package.""" from .server import run_server __all__ = ["run_server"] ``` -------------------------------------------------------------------------------- /src/__init__.py: -------------------------------------------------------------------------------- ```python """DolphinScheduler MCP Package.""" # Empty __init__.py to make src directory a proper Python package ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile FROM python:3.9-slim WORKDIR /app # Copy project files COPY . . # Install dependencies RUN pip install --no-cache-dir -e . # Expose MCP server port EXPOSE 8089 # Run server CMD ["python", "-m", "src.dolphinscheduler_mcp"] ``` -------------------------------------------------------------------------------- /install_dev.sh: -------------------------------------------------------------------------------- ```bash #!/bin/bash # Install the package in development mode # Activate virtual environment if it exists if [ -d ".venv" ]; then echo "Activating virtual environment..." source .venv/bin/activate fi # Install the package in development mode echo "Installing package in development mode..." pip install -e . echo "Installation complete!" ``` -------------------------------------------------------------------------------- /mcp-openapi-split/base/01_base.json: -------------------------------------------------------------------------------- ```json { "openapi": "3.0.1", "info": { "title": "Apache DolphinScheduler Api Docs", "description": "Apache DolphinScheduler Api Docs", "version": "3.3.0" }, "servers": [ { "url": "http://www.sjjc.dolphinscheduler.dsj.com/dolphinscheduler", "description": "Generated server url" } ] } ``` -------------------------------------------------------------------------------- /docker-compose.yml: -------------------------------------------------------------------------------- ```yaml version: '3' services: dolphinscheduler-mcp: build: context: . dockerfile: Dockerfile ports: - "8089:8089" environment: - DOLPHINSCHEDULER_API_URL=http://dolphinscheduler-api:12345/dolphinscheduler - DOLPHINSCHEDULER_API_KEY=${DOLPHINSCHEDULER_API_KEY} networks: - dolphinscheduler-network restart: unless-stopped networks: dolphinscheduler-network: name: dolphinscheduler-network driver: bridge ``` -------------------------------------------------------------------------------- /run.sh: -------------------------------------------------------------------------------- ```bash #!/bin/bash # Check for Python if ! command -v python3 &>/dev/null; then echo "Python 3 could not be found. Please install Python 3." exit 1 fi # Create virtual environment if it doesn't exist if [ ! -d ".venv" ]; then echo "Creating virtual environment..." python3 -m venv .venv fi # Activate virtual environment echo "Activating virtual environment..." source .venv/bin/activate # Install requirements echo "Installing requirements..." pip install -e . # Run the server echo "Starting DolphinScheduler MCP Server..." python -m src.dolphinscheduler_mcp ``` -------------------------------------------------------------------------------- /run.bat: -------------------------------------------------------------------------------- ``` @echo off SETLOCAL :: Check for Python python --version >nul 2>&1 IF %ERRORLEVEL% NEQ 0 ( echo Python could not be found. Please install Python 3. exit /b 1 ) :: Create virtual environment if it doesn't exist IF NOT EXIST .venv ( echo Creating virtual environment... python -m venv .venv ) :: Activate virtual environment echo Activating virtual environment... call .venv\Scripts\activate.bat :: Install requirements echo Installing requirements... pip install -e . :: Run the server echo Starting DolphinScheduler MCP Server... python -m src.dolphinscheduler_mcp ENDLOCAL ``` -------------------------------------------------------------------------------- /CHANGELOG.md: -------------------------------------------------------------------------------- ```markdown # Changelog All notable changes to the DolphinScheduler MCP Server will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [0.1.0] - 2023-06-24 ### Added - Initial release of DolphinScheduler MCP Server - Core MCP server implementation with DolphinScheduler API integration - Connection management tools for dynamic configuration - Project management tools (list projects, get project details) - Workflow management tools (list, view, start, stop) - Workflow instance management tools - Task instance monitoring - System status monitoring tools - Resource management tools - Documentation, examples, and tests - Docker and Docker Compose support for containerized deployment ``` -------------------------------------------------------------------------------- /example_mcp.py: -------------------------------------------------------------------------------- ```python """Example MCP server using the latest FastMCP API.""" from fastmcp import FastMCP, Context # Create an MCP server mcp = FastMCP(title="Example MCP Server") # Define a tool using the decorator pattern @mcp.tool() def add(a: int, b: int) -> int: """Add two numbers together. Args: a: First number b: Second number Returns: The sum of a and b """ return a + b # Define an async tool @mcp.tool() async def fetch_data(url: str, ctx: Context) -> str: """Fetch data from a URL. Args: url: The URL to fetch data from ctx: The MCP context Returns: The fetched data """ # Report progress ctx.info(f"Fetching data from {url}") # In a real implementation, we would fetch data here return f"Data from {url}" if __name__ == "__main__": # Start the server mcp.run(host="0.0.0.0", port=8089) ``` -------------------------------------------------------------------------------- /test_create_project.py: -------------------------------------------------------------------------------- ```python import asyncio import os from src.dolphinscheduler_mcp.client import DolphinSchedulerClient async def test_create_project(): # Create a client with environment variables client = DolphinSchedulerClient( api_url=os.environ.get("DOLPHINSCHEDULER_API_URL"), api_key=os.environ.get("DOLPHINSCHEDULER_API_KEY") ) try: # Call the create_project method project_name = "mcp-demo1" description = "MCP Demo Project" print(f"Creating project {project_name}...") response = await client.request( "POST", "projects", params={ "projectName": project_name, "description": description } ) print("Response:") print(response) finally: # Close the client await client.close() if __name__ == "__main__": asyncio.run(test_create_project()) ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [build-system] requires = ["setuptools>=42", "wheel"] build-backend = "setuptools.build_meta" [project] name = "dolphinscheduler-mcp" version = "0.1.0" description = "A Model Context Protocol (MCP) server for DolphinScheduler" readme = "README.md" authors = [ {name = "DolphinScheduler Contributors"} ] license = {text = "Apache-2.0"} classifiers = [ "Programming Language :: Python :: 3", "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", ] requires-python = ">=3.9" dependencies = [ "fastapi", "python-dotenv", "pydantic", "fastmcp", "aiohttp", "uvicorn", ] [project.scripts] ds-mcp = "dolphinscheduler_mcp.cli:main" [project.optional-dependencies] dev = [ "pytest", "black", "isort", "mypy", "pre-commit", ] [tool.setuptools] package-dir = {"" = "src"} [tool.black] line-length = 88 target-version = ["py39"] [tool.isort] profile = "black" line_length = 88 [tool.mypy] python_version = "3.9" ignore_missing_imports = true ``` -------------------------------------------------------------------------------- /examples/list_projects.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """Example script to list DolphinScheduler projects using the MCP client.""" import asyncio import json import os import sys # For local development, add the parent directory to the Python path sys.path.append(".") # Import the MCPClient from simple_client.py from examples.simple_client import MCPClient async def main(): """List all DolphinScheduler projects using MCP.""" # Set up the MCP server URL mcp_url = os.environ.get("MCP_URL", "http://localhost:8089/mcp") print(f"Connecting to MCP server at {mcp_url}") async with MCPClient(mcp_url) as client: # Get project list print("Getting project list:") response = await client.invoke_tool("get-project-list") # Pretty print the result if "result" in response and "data" in response["result"]: projects = response["result"]["data"] print(f"Found {len(projects)} projects:") for project in projects: print(f"- {project['name']} (Code: {project['code']})") else: print("Error or no projects found:") print(json.dumps(response, indent=2)) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/16_remaining_5_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "AuditModelTypeDto": { "type": "object", "properties": { "name": { "type": "string" } } }, "AuditDto": { "type": "object", "properties": { "userName": { "type": "string" }, "modelType": { "type": "string" }, "modelName": { "type": "string" }, "operation": { "type": "string" }, "createTime": { "type": "string", "format": "date-time" }, "description": { "type": "string" }, "detail": { "type": "string" }, "latency": { "type": "string" } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/15_remaining_4_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "WorkFlowLineage": { "type": "object", "properties": { "workFlowCode": { "type": "integer", "format": "int64" }, "workFlowName": { "type": "string" }, "workFlowPublishStatus": { "type": "string" }, "scheduleStartTime": { "type": "string", "format": "date-time" }, "scheduleEndTime": { "type": "string", "format": "date-time" }, "crontab": { "type": "string" }, "schedulePublishStatus": { "type": "integer", "format": "int32" }, "sourceWorkFlowCode": { "type": "string" } } }, "AuditOperationTypeDto": { "type": "object", "properties": { "name": { "type": "string" } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/11_remaining_12_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "PageInfoDqExecuteResult": { "type": "object", "properties": { "totalList": { "type": "array", "items": { "$ref": "#/components/schemas/DqExecuteResult" } }, "total": { "type": "integer", "format": "int32" }, "totalPage": { "type": "integer", "format": "int32" }, "pageSize": { "type": "integer", "format": "int32" }, "currentPage": { "type": "integer", "format": "int32" }, "pageNo": { "type": "integer", "format": "int32" } } }, "ParamsOptions": { "type": "object", "properties": { "label": { "type": "string" }, "value": { "type": "object" }, "disabled": { "type": "boolean" } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/utils/combine_openapi.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 ''' Combine split OpenAPI files back into a single specification. ''' import json import os import glob # Output file OUTPUT_FILE = '../combined_openapi.json' def load_json(filename): '''Load JSON from a file.''' with open(filename, 'r', encoding='utf-8') as f: return json.load(f) def combine_openapi(): '''Combine all the split OpenAPI files into a single specification.''' # Start with the base information base_files = sorted(glob.glob('../base/*.json')) if not base_files: raise FileNotFoundError("No base files found!") combined = load_json(base_files[0]) combined['paths'] = {} combined['components'] = {'schemas': {}} # Add all paths path_files = sorted(glob.glob('../paths/*.json')) for path_file in path_files: paths_data = load_json(path_file) if 'paths' in paths_data: combined['paths'].update(paths_data['paths']) # Add all schemas schema_files = sorted(glob.glob('../schemas/*.json')) for schema_file in schema_files: schema_data = load_json(schema_file) if 'components' in schema_data and 'schemas' in schema_data['components']: combined['components']['schemas'].update(schema_data['components']['schemas']) # Save the combined specification with open(OUTPUT_FILE, 'w', encoding='utf-8') as f: json.dump(combined, f, ensure_ascii=False, indent=4) print(f"Combined OpenAPI specification saved to {OUTPUT_FILE}") print(f"Specification contains {len(combined['paths'])} paths and {len(combined['components']['schemas'])} schemas") if __name__ == '__main__': combine_openapi() ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/tools_generated/environment_check_tools.py: -------------------------------------------------------------------------------- ```python """Tool for checking environment settings.""" import json import logging from typing import Dict, Any, Optional from ..client import DolphinSchedulerClient from ..config import Config from ..fastmcp_compat import FastMCPTool class CheckEnvironmentSettings(FastMCPTool): """Tool for checking the current environment settings for DolphinScheduler API.""" name = "check_environment_settings" description = "Check the current environment settings for DolphinScheduler API" is_async = False schema = { "type": "object", "properties": { "id": { "type": "string", "description": "Not used, included for compatibility with FastMCP" } } } def _run(self, id: Optional[str] = None) -> Dict[str, Any]: """Check the current environment settings. Args: id: Not used, included for compatibility with FastMCP Returns: Dictionary containing current settings """ try: config = Config() return { "success": True, "api_url": config.api_url, "has_api_key": config.has_api_key(), "api_key_value": config.api_key if config.has_api_key() else None } except Exception as e: self.logger.error(f"Error checking environment settings: {e}") return { "success": False, "error": str(e) } def register_environment_check_tools(mcp): """Register environment check tools with FastMCP. Args: mcp: The FastMCP instance to register tools with. """ from ..fastmcp_compat import register_tool_class register_tool_class(mcp, CheckEnvironmentSettings) ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/04_environment_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "Environment": { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" }, "code": { "type": "integer", "format": "int64" }, "name": { "type": "string" }, "config": { "type": "string" }, "description": { "type": "string" }, "operator": { "type": "integer", "format": "int32" }, "createTime": { "type": "string", "format": "date-time" }, "updateTime": { "type": "string", "format": "date-time" } } }, "ResultEnvironment": { "type": "object", "properties": { "code": { "type": "integer", "format": "int32" }, "msg": { "type": "string" }, "data": { "$ref": "#/components/schemas/Environment" }, "failed": { "type": "boolean" }, "success": { "type": "boolean" } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/paths/02_dynamic_task_type_api.json: -------------------------------------------------------------------------------- ```json { "paths": { "/dynamic/{taskCategory}/taskTypes": { "get": { "tags": [ "DYNAMIC_TASK_TYPE" ], "summary": "listDynamicTaskTypes", "description": "LIST_DYNAMIC_TASK_TYPES", "operationId": "listDynamicTaskTypes", "parameters": [ { "name": "taskCategory", "in": "path", "required": true, "schema": { "type": "string" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } }, "/dynamic/taskCategories": { "get": { "tags": [ "DYNAMIC_TASK_TYPE" ], "summary": "listTaskCategories", "description": "LIST_TASK_TYPE_CATEGORIES", "operationId": "listDynamicTaskCategories", "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } } } } ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/__main__.py: -------------------------------------------------------------------------------- ```python """Main entry point for DolphinScheduler MCP.""" import sys import os import importlib from typing import List # Clear cache for fastmcp_compat module to ensure changes are loaded if "dolphinscheduler_mcp.fastmcp_compat" in sys.modules: del sys.modules["dolphinscheduler_mcp.fastmcp_compat"] from .server import run_server def main(args: List[str] = None) -> int: """Run the MCP server with the given arguments.""" if args is None: args = sys.argv[1:] # Default configuration host = "0.0.0.0" port = 8089 # Check for environment variables first if "DOLPHINSCHEDULER_MCP_HOST" in os.environ: host = os.environ["DOLPHINSCHEDULER_MCP_HOST"] if "DOLPHINSCHEDULER_MCP_PORT" in os.environ: try: port = int(os.environ["DOLPHINSCHEDULER_MCP_PORT"]) except ValueError: print(f"Invalid port number in DOLPHINSCHEDULER_MCP_PORT: {os.environ['DOLPHINSCHEDULER_MCP_PORT']}") return 1 # Override with command line arguments if provided if len(args) >= 1: host = args[0] if len(args) >= 2: try: port = int(args[1]) except ValueError: print(f"Invalid port number: {args[1]}") return 1 # Check if API URL and key are set if "DOLPHINSCHEDULER_API_URL" not in os.environ: print("Warning: DOLPHINSCHEDULER_API_URL environment variable is not set.") print("Using default: http://localhost:12345/dolphinscheduler") if "DOLPHINSCHEDULER_API_KEY" not in os.environ: print("Warning: DOLPHINSCHEDULER_API_KEY environment variable is not set.") print("Authentication to the DolphinScheduler API may fail.") # Run the server try: run_server(host=host, port=port) return 0 except Exception as e: print(f"Error running server: {e}") return 1 if __name__ == "__main__": sys.exit(main()) ``` -------------------------------------------------------------------------------- /run.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python """ DolphinScheduler MCP Server Runner This script sets up environment variables and runs the DolphinScheduler MCP server. """ import os import sys import logging # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) logger = logging.getLogger("dolphinscheduler_mcp.runner") def main(): """Main entry point for running DolphinScheduler MCP server.""" # Default host and port host = os.environ.get("DOLPHINSCHEDULER_MCP_HOST", "0.0.0.0") port = int(os.environ.get("DOLPHINSCHEDULER_MCP_PORT", "8089")) # Check if API URL is set api_url = os.environ.get("DOLPHINSCHEDULER_API_URL") if not api_url: logger.warning("DOLPHINSCHEDULER_API_URL environment variable is not set!") logger.warning("Using default URL from Config class.") # Check if API key is set - this is required for all API requests api_key = os.environ.get("DOLPHINSCHEDULER_API_KEY") if not api_key: logger.warning("DOLPHINSCHEDULER_API_KEY environment variable is not set!") logger.warning("API requests may fail due to missing authentication.") else: logger.info("Using API key from environment variables.") # Print configuration logger.info(f"Starting DolphinScheduler MCP Server on {host}:{port}") if api_url: logger.info(f"Using DolphinScheduler API URL: {api_url}") # Import and run the server try: from src.dolphinscheduler_mcp.server import run_server run_server(host=host, port=port) return 0 except ImportError: logger.error("Could not import dolphinscheduler_mcp. Make sure it's installed.") return 1 except Exception as e: logger.error(f"Error running server: {e}", exc_info=True) return 1 if __name__ == "__main__": sys.exit(main()) ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/tools_generated/dynamic_task_type_tools.py: -------------------------------------------------------------------------------- ```python """Tools for dynamic task type operations in DolphinScheduler.""" from typing import Dict, List, Optional from ..fastmcp_compat import FastMCPTool from ..client import DolphinSchedulerClient class ListDynamicTasktypes(FastMCPTool): """Tool to list_dynamic_task_types""" name = "list_dynamic_tasktypes" description = "LIST_DYNAMIC_TASK_TYPES" is_async = True schema = { "type": "object", "properties": { "task_category": { "type": "string" } }, "required": [ "task_category" ] } async def _run(self, task_category) -> Dict: """Execute the GET operation on /dynamic/{taskCategory}/taskTypes.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/dynamic/{task_category}/taskTypes" ) return {"success": True, "data": response} finally: await client.close() class ListDynamicTaskcategories(FastMCPTool): """Tool to list_task_type_categories""" name = "list_dynamic_taskcategories" description = "LIST_TASK_TYPE_CATEGORIES" is_async = True async def _run(self) -> Dict: """Execute the GET operation on /dynamic/taskCategories.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/dynamic/taskCategories" ) return {"success": True, "data": response} finally: await client.close() def register_dynamic_task_type_tools(mcp): """Register dynamic task type tools with FastMCP. Args: mcp: The FastMCP instance to register tools with. """ from ..fastmcp_compat import register_tool_class register_tool_class(mcp, ListDynamicTaskcategories) register_tool_class(mcp, ListDynamicTasktypes) ``` -------------------------------------------------------------------------------- /examples/start_workflow.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """Example script to execute a DolphinScheduler process definition using the MCP client.""" import argparse import asyncio import json import os import sys # For local development, add the parent directory to the Python path sys.path.append(".") # Import the MCPClient from simple_client.py from examples.simple_client import MCPClient async def main(): """Execute a DolphinScheduler process definition using MCP.""" parser = argparse.ArgumentParser(description="Execute a DolphinScheduler process definition") parser.add_argument("--project-code", type=int, required=True, help="Project code") parser.add_argument("--process-definition-code", type=int, required=True, help="Process definition code") parser.add_argument("--schedule-time", type=str, help="Schedule time (yyyy-MM-dd HH:mm:ss)") parser.add_argument("--execution-type", type=str, default="PARALLEL", help="Execution type (PARALLEL/SERIAL_WAIT/SERIAL_DISCARD/SERIAL_PRIORITY)") args = parser.parse_args() # Set up the MCP server URL mcp_url = os.environ.get("MCP_URL", "http://localhost:8089/mcp") print(f"Connecting to MCP server at {mcp_url}") async with MCPClient(mcp_url) as client: # Execute process definition print(f"Executing process definition {args.process_definition_code} in project {args.project_code}:") # Prepare parameters parameters = { "project_code": args.project_code, "process_definition_code": args.process_definition_code, "execution_type": args.execution_type, } if args.schedule_time: parameters["schedule_time"] = args.schedule_time # Execute the process definition response = await client.invoke_tool("execute-process-definition", parameters) # Pretty print the result print(json.dumps(response, indent=2)) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/20_remaining_9_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "OAuth2ClientProperties": { "type": "object", "properties": { "authorizationUri": { "type": "string" }, "clientId": { "type": "string" }, "redirectUri": { "type": "string" }, "clientSecret": { "type": "string" }, "tokenUri": { "type": "string" }, "userInfoUri": { "type": "string" }, "callbackUrl": { "type": "string" }, "iconUri": { "type": "string" }, "provider": { "type": "string" } } }, "Server": { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" }, "host": { "type": "string" }, "port": { "type": "integer", "format": "int32" }, "zkDirectory": { "type": "string" }, "resInfo": { "type": "string" }, "createTime": { "type": "string", "format": "date-time" }, "lastHeartbeatTime": { "type": "string", "format": "date-time" } } } } } } ``` -------------------------------------------------------------------------------- /docs/installation.md: -------------------------------------------------------------------------------- ```markdown # Installation Guide This guide provides detailed instructions for installing and setting up the DolphinScheduler MCP server. ## Prerequisites - Python 3.9 or higher - pip (Python package installer) - A running DolphinScheduler instance - (Optional) Docker and Docker Compose for containerized deployment ## Standard Installation 1. Clone the repository: ```bash git clone https://github.com/yourusername/dolphinscheduler-mcp.git cd dolphinscheduler-mcp ``` 2. Create and activate a virtual environment: ```bash # Linux/macOS python -m venv .venv source .venv/bin/activate # Windows python -m venv .venv .venv\Scripts\activate ``` 3. Install the package in development mode: ```bash pip install -e . ``` 4. Configure environment variables by creating a `.env` file in the project root: ``` DOLPHINSCHEDULER_API_URL=http://your-dolphinscheduler-server:12345/dolphinscheduler DOLPHINSCHEDULER_API_KEY=your_api_key ``` ## Docker Installation 1. Build and run using Docker Compose: ```bash docker-compose up -d ``` This will: - Build the Docker image - Start the DolphinScheduler MCP server on port 8089 - Configure it to connect to your DolphinScheduler API 2. Check the logs: ```bash docker-compose logs -f ``` 3. Stop the service: ```bash docker-compose down ``` ## Using the Scripts For convenience, we provide scripts to set up and run the server: - Linux/macOS: ```bash chmod +x run.sh ./run.sh ``` - Windows: ```cmd run.bat ``` These scripts will: 1. Create a virtual environment if it doesn't exist 2. Activate the virtual environment 3. Install the package dependencies 4. Run the MCP server ## Verifying the Installation To verify that the MCP server is running correctly: 1. Open a web browser and navigate to: `http://localhost:8089/mcp/tools` 2. You should see a JSON response listing all available tools. Alternatively, use the MCP Inspector to test your server: ```bash npx @modelcontextprotocol/inspector python -m src.dolphinscheduler_mcp ``` ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/08_remaining_1_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "AccessToken": { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" }, "userId": { "type": "integer", "format": "int32" }, "token": { "type": "string" }, "expireTime": { "type": "string", "format": "date-time" }, "createTime": { "type": "string", "format": "date-time" }, "updateTime": { "type": "string", "format": "date-time" }, "userName": { "type": "string" } } }, "PageInfoStorageEntity": { "type": "object", "properties": { "totalList": { "type": "array", "items": { "$ref": "#/components/schemas/StorageEntity" } }, "total": { "type": "integer", "format": "int32" }, "totalPage": { "type": "integer", "format": "int32" }, "pageSize": { "type": "integer", "format": "int32" }, "currentPage": { "type": "integer", "format": "int32" }, "pageNo": { "type": "integer", "format": "int32" } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/12_remaining_13_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "PageInfoAccessToken": { "type": "object", "properties": { "totalList": { "type": "array", "items": { "$ref": "#/components/schemas/AccessToken" } }, "total": { "type": "integer", "format": "int32" }, "totalPage": { "type": "integer", "format": "int32" }, "pageSize": { "type": "integer", "format": "int32" }, "currentPage": { "type": "integer", "format": "int32" }, "pageNo": { "type": "integer", "format": "int32" } } }, "DeleteDataTransferResponse": { "type": "object", "properties": { "code": { "type": "integer", "format": "int32" }, "msg": { "type": "string" }, "data": { "type": "object" }, "successList": { "type": "array", "items": { "type": "string" } }, "failedList": { "type": "array", "items": { "type": "string" } }, "failed": { "type": "boolean" }, "success": { "type": "boolean" } } } } } } ``` -------------------------------------------------------------------------------- /tests/test_config.py: -------------------------------------------------------------------------------- ```python """Tests for the config module.""" import os import unittest from unittest.mock import patch from src.dolphinscheduler_mcp.config import Config class TestConfig(unittest.TestCase): """Test cases for the Config class.""" def setUp(self): """Set up test cases.""" # Reset the Config singleton instance before each test Config._instance = None # Clear environment variables that might affect the test if "DOLPHINSCHEDULER_API_URL" in os.environ: del os.environ["DOLPHINSCHEDULER_API_URL"] if "DOLPHINSCHEDULER_API_KEY" in os.environ: del os.environ["DOLPHINSCHEDULER_API_KEY"] def test_default_values(self): """Test default configuration values.""" config = Config() self.assertEqual(config.api_url, "http://localhost:12345/dolphinscheduler") self.assertIsNone(config.api_key) self.assertFalse(config.has_api_key()) @patch.dict(os.environ, {"DOLPHINSCHEDULER_API_URL": "http://test-server:8080"}) def test_env_api_url(self): """Test API URL from environment variable.""" config = Config() self.assertEqual(config.api_url, "http://test-server:8080") @patch.dict(os.environ, {"DOLPHINSCHEDULER_API_KEY": "test-api-key"}) def test_env_api_key(self): """Test API key from environment variable.""" config = Config() self.assertEqual(config.api_key, "test-api-key") self.assertTrue(config.has_api_key()) def test_singleton(self): """Test that Config is a singleton.""" config1 = Config() config2 = Config() self.assertIs(config1, config2) def test_update_values(self): """Test updating configuration values.""" config = Config() config.api_url = "http://new-server:9000" config.api_key = "new-api-key" # Get a new instance and verify values are preserved (singleton) new_config = Config() self.assertEqual(new_config.api_url, "http://new-server:9000") self.assertEqual(new_config.api_key, "new-api-key") self.assertTrue(new_config.has_api_key()) if __name__ == "__main__": unittest.main() ``` -------------------------------------------------------------------------------- /mcp-openapi-split/paths/28_monitor_api.json: -------------------------------------------------------------------------------- ```json { "paths": { "/monitor/{nodeType}": { "get": { "tags": [ "监控相关操作" ], "summary": "listServer", "description": "SERVER_LIST_NOTES", "operationId": "listServer", "parameters": [ { "name": "nodeType", "in": "path", "required": true, "schema": { "type": "string", "enum": [ "ALL_SERVERS", "MASTER", "MASTER_NODE_LOCK", "MASTER_FAILOVER_LOCK", "MASTER_TASK_GROUP_COORDINATOR_LOCK", "WORKER", "ALERT_SERVER", "ALERT_LOCK" ] } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultListServer" } } } } } } }, "/monitor/databases": { "get": { "tags": [ "监控相关操作" ], "summary": "queryDatabaseState", "description": "查询数据库状态", "operationId": "queryDatabaseState", "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultListDatabaseMetrics" } } } } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/19_remaining_8_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "PageInfoCommand": { "type": "object", "properties": { "totalList": { "type": "array", "items": { "$ref": "#/components/schemas/Command" } }, "total": { "type": "integer", "format": "int32" }, "totalPage": { "type": "integer", "format": "int32" }, "pageSize": { "type": "integer", "format": "int32" }, "currentPage": { "type": "integer", "format": "int32" }, "pageNo": { "type": "integer", "format": "int32" } } }, "CommandStateCount": { "type": "object", "properties": { "errorCount": { "type": "integer", "format": "int32" }, "normalCount": { "type": "integer", "format": "int32" }, "commandState": { "type": "string", "enum": [ "START_PROCESS", "START_CURRENT_TASK_PROCESS", "RECOVER_TOLERANCE_FAULT_PROCESS", "RECOVER_SUSPENDED_PROCESS", "START_FAILURE_TASK_PROCESS", "COMPLEMENT_DATA", "SCHEDULER", "REPEAT_RUNNING", "PAUSE", "STOP", "RECOVER_WAITING_THREAD", "RECOVER_SERIAL_WAIT", "EXECUTE_TASK", "DYNAMIC_GENERATION" ] } } } } } } ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/server.py: -------------------------------------------------------------------------------- ```python """DolphinScheduler MCP Server implementation using FastMCP.""" import logging import sys import os from typing import Dict, List, Optional # Clear module cache for fastmcp_compat if "dolphinscheduler_mcp.fastmcp_compat" in sys.modules: del sys.modules["dolphinscheduler_mcp.fastmcp_compat"] from mcp.server.fastmcp import FastMCP # 导入工具加载器 from .tools_loader import register_all_generated_tools as register_all_tools from .config import Config # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) logger = logging.getLogger(__name__) # Initialize MCP server mcp = FastMCP( title="DolphinScheduler MCP", description="MCP interface for DolphinScheduler API", ) def register_tools() -> None: """Register all tools with the MCP server.""" # Check environment variables first api_url = os.environ.get("DOLPHINSCHEDULER_API_URL") api_key = os.environ.get("DOLPHINSCHEDULER_API_KEY") # Update config if environment variables are set if api_url or api_key: config = Config() if api_url: logger.info(f"Using API URL from environment: {api_url}") config.api_url = api_url if api_key: logger.info("Using API key from environment") config.api_key = api_key # 注册工具 try: logger.info("Registering tools...") count = register_all_tools(mcp) logger.info(f"Registered {count} tool modules successfully") except Exception as e: logger.error(f"Error registering tools: {e}", exc_info=True) # Register all tools register_tools() def run_server(host: str = "0.0.0.0", port: int = 8089) -> None: """Run the DolphinScheduler MCP server. Args: host: Host to bind the server to port: Port to bind the server to """ logger.info(f"Starting DolphinScheduler MCP Server on {host}:{port}") logger.info(f"API URL: {Config().api_url}") logger.info(f"API Key is {'set' if Config().has_api_key() else 'not set'}") try: # FastMCP.run() does not accept uvicorn_config parameter # Simply call run() without parameters mcp.run() except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"Error running server: {e}", exc_info=True) sys.exit(1) ``` -------------------------------------------------------------------------------- /mcp-openapi-split/paths/05_ui_plugin_api.json: -------------------------------------------------------------------------------- ```json { "paths": { "/ui-plugins/{id}": { "get": { "tags": [ "UI插件相关操作" ], "summary": "queryUiPluginDetailById", "description": "通过ID查询UI插件详情", "operationId": "queryUiPluginDetailById", "parameters": [ { "name": "id", "in": "path", "description": "插件ID", "required": true, "schema": { "type": "integer", "format": "int32" } } ], "responses": { "201": { "description": "Created", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } }, "/ui-plugins/query-by-type": { "get": { "tags": [ "UI插件相关操作" ], "summary": "queryUiPluginsByType", "description": "通过类型查询UI插件", "operationId": "queryUiPluginsByType", "parameters": [ { "name": "pluginType", "in": "query", "description": "pluginType", "required": true, "schema": { "type": "string", "enum": [ "ALERT", "REGISTER", "TASK" ] } } ], "responses": { "201": { "description": "Created", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/13_remaining_2_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "StorageEntity": { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" }, "fullName": { "type": "string" }, "fileName": { "type": "string" }, "alias": { "type": "string" }, "pfullName": { "type": "string" }, "userId": { "type": "integer", "format": "int32" }, "userName": { "type": "string" }, "type": { "type": "string", "enum": [ "FILE", "UDF", "ALL" ] }, "size": { "type": "integer", "format": "int64" }, "createTime": { "type": "string", "format": "date-time" }, "updateTime": { "type": "string", "format": "date-time" }, "directory": { "type": "boolean" } } }, "DagData": { "type": "object", "properties": { "processDefinition": { "$ref": "#/components/schemas/ProcessDefinition" }, "processTaskRelationList": { "type": "array", "items": { "$ref": "#/components/schemas/ProcessTaskRelation" } }, "taskDefinitionList": { "type": "array", "items": { "$ref": "#/components/schemas/TaskDefinition" } } } } } } } ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/cli.py: -------------------------------------------------------------------------------- ```python """Command-line interface for DolphinScheduler MCP.""" import argparse import sys import os from .server import run_server def main() -> int: """Parse command-line arguments and run the server.""" parser = argparse.ArgumentParser( description="DolphinScheduler Model Context Protocol (MCP) Server" ) parser.add_argument( "--host", type=str, default=os.environ.get("DOLPHINSCHEDULER_MCP_HOST", "0.0.0.0"), help="Host to bind the server (default: 0.0.0.0 or DOLPHINSCHEDULER_MCP_HOST env var)", ) parser.add_argument( "--port", type=int, default=int(os.environ.get("DOLPHINSCHEDULER_MCP_PORT", "8089")), help="Port to bind the server (default: 8089 or DOLPHINSCHEDULER_MCP_PORT env var)", ) parser.add_argument( "--api-url", type=str, default=None, help="DolphinScheduler API URL (default: DOLPHINSCHEDULER_API_URL env var)", ) parser.add_argument( "--api-key", type=str, default=None, help="DolphinScheduler API key (default: DOLPHINSCHEDULER_API_KEY env var)", ) parser.add_argument( "--log-level", type=str, choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default=os.environ.get("DOLPHINSCHEDULER_MCP_LOG_LEVEL", "INFO"), help="Logging level (default: INFO or DOLPHINSCHEDULER_MCP_LOG_LEVEL env var)", ) args = parser.parse_args() # Set environment variables if provided through command-line args if args.api_url: os.environ["DOLPHINSCHEDULER_API_URL"] = args.api_url if args.api_key: os.environ["DOLPHINSCHEDULER_API_KEY"] = args.api_key # Check if API URL and key are set if "DOLPHINSCHEDULER_API_URL" not in os.environ: print("Warning: DOLPHINSCHEDULER_API_URL environment variable is not set.") print("Using default: http://localhost:12345/dolphinscheduler") if "DOLPHINSCHEDULER_API_KEY" not in os.environ: print("Warning: DOLPHINSCHEDULER_API_KEY environment variable is not set.") print("Authentication to the DolphinScheduler API may fail.") # Set logging level os.environ["DOLPHINSCHEDULER_MCP_LOG_LEVEL"] = args.log_level # Run the server try: run_server(host=args.host, port=args.port) return 0 except KeyboardInterrupt: print("\nServer stopped by user") return 0 except Exception as e: print(f"Error running server: {e}") return 1 if __name__ == "__main__": sys.exit(main()) ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/tools_generated/project_worker_group_tools.py: -------------------------------------------------------------------------------- ```python """Tools for project worker group operations in DolphinScheduler.""" from typing import Dict, List, Optional from ..fastmcp_compat import FastMCPTool from ..client import DolphinSchedulerClient class GetProjectsWorkerGroup(FastMCPTool): """Tool to query_worker_group_list""" name = "get_projects_worker_group" description = "QUERY_WORKER_GROUP_LIST" is_async = True schema = { "type": "object", "properties": { "project_code": { "type": "integer", "format": "int64", "description": "\u9879\u76eeCode" } }, "required": [ "project_code" ] } async def _run(self, project_code) -> Dict: """Execute the GET operation on /projects/{projectCode}/worker-group.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/projects/{project_code}/worker-group" ) return {"success": True, "data": response} finally: await client.close() class CreateProjectsWorkerGroup(FastMCPTool): """Tool to assign_worker_groups_notes""" name = "create_projects_worker_group" description = "ASSIGN_WORKER_GROUPS_NOTES" is_async = True schema = { "type": "object", "properties": { "project_code": { "type": "integer", "format": "int64", "description": "\u9879\u76eeCode" }, "worker_groups": { "type": "string", "description": "Worker\u5de5\u4f5c\u7ec4\u5217\u8868" } }, "required": [ "project_code" ] } async def _run(self, project_code, worker_groups) -> Dict: """Execute the POST operation on /projects/{projectCode}/worker-group.""" client = DolphinSchedulerClient() try: response = await client.request( "POST", f"/projects/{project_code}/worker-group" ) return {"success": True, "data": response} finally: await client.close() def register_project_worker_group_tools(mcp): """Register project worker group tools with FastMCP. Args: mcp: The FastMCP instance to register tools with. """ from ..fastmcp_compat import register_tool_class register_tool_class(mcp, CreateProjectsWorkerGroup) register_tool_class(mcp, GetProjectsWorkerGroup) ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/tools_generated/environment_update_tools.py: -------------------------------------------------------------------------------- ```python """Tool for updating environment settings.""" import json import logging import os from typing import Dict, Any, Optional from ..client import DolphinSchedulerClient from ..config import Config from ..fastmcp_compat import FastMCPTool class UpdateEnvironmentSettings(FastMCPTool): """Tool for updating the environment settings for DolphinScheduler API.""" name = "update_connection_settings" description = "更新DolphinScheduler API连接设置" is_async = True schema = { "type": "object", "properties": { "id": { "type": "string", "description": "No description" }, "api_url": { "type": "string", "description": "New API URL to set" }, "api_key": { "type": "string", "description": "New API key to set" } }, "required": [] } async def _run(self, id: Optional[str] = None, api_url: Optional[str] = None, api_key: Optional[str] = None) -> \ Dict[str, Any]: """Update the environment settings. Args: id: Not used, included for compatibility with FastMCP api_url: New API URL to set (optional) api_key: New API key to set (optional) Returns: Dictionary containing updated settings """ logger = logging.getLogger("dolphinscheduler_mcp.tools.update_env") try: config = Config() # Update API URL if provided if api_url is not None: logger.info(f"Updating API URL to: {api_url}") config.api_url = api_url os.environ["DOLPHINSCHEDULER_API_URL"] = api_url # Update API key if provided if api_key is not None: logger.info("Updating API key") config.api_key = api_key os.environ["DOLPHINSCHEDULER_API_KEY"] = api_key return { "success": True, "api_url": config.api_url, "has_api_key": config.has_api_key() } except Exception as e: logger.error(f"Error updating environment settings: {e}") return { "success": False, "error": str(e) } def register_environment_update_tools(mcp): """Register environment update tools with FastMCP. Args: mcp: The FastMCP instance to register tools with. """ from ..fastmcp_compat import register_tool_class register_tool_class(mcp, UpdateEnvironmentSettings) ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/26_user_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "ResultUser": { "type": "object", "properties": { "code": { "type": "integer", "format": "int32" }, "msg": { "type": "string" }, "data": { "$ref": "#/components/schemas/User" }, "failed": { "type": "boolean" }, "success": { "type": "boolean" } } }, "User": { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" }, "userName": { "type": "string" }, "userPassword": { "type": "string" }, "email": { "type": "string" }, "phone": { "type": "string" }, "userType": { "type": "string", "enum": [ "ADMIN_USER", "GENERAL_USER" ] }, "tenantId": { "type": "integer", "format": "int32" }, "state": { "type": "integer", "format": "int32" }, "tenantCode": { "type": "string" }, "queueName": { "type": "string" }, "alertGroup": { "type": "string" }, "queue": { "type": "string" }, "timeZone": { "type": "string" }, "createTime": { "type": "string", "format": "date-time" }, "updateTime": { "type": "string", "format": "date-time" } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/14_remaining_3_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "Property": { "type": "object", "properties": { "prop": { "type": "string" }, "direct": { "type": "string", "enum": [ "IN", "OUT" ] }, "type": { "type": "string", "enum": [ "VARCHAR", "INTEGER", "LONG", "FLOAT", "DOUBLE", "DATE", "TIME", "TIMESTAMP", "BOOLEAN", "LIST", "FILE" ] }, "value": { "type": "string" } } }, "StateDesc": { "type": "object", "properties": { "time": { "type": "string", "format": "date-time" }, "state": { "type": "string", "enum": [ "WorkflowExecutionStatus{code=0, desc='submit success'}", "WorkflowExecutionStatus{code=1, desc='running'}", "WorkflowExecutionStatus{code=2, desc='ready pause'}", "WorkflowExecutionStatus{code=3, desc='pause'}", "WorkflowExecutionStatus{code=4, desc='ready stop'}", "WorkflowExecutionStatus{code=5, desc='stop'}", "WorkflowExecutionStatus{code=6, desc='failure'}", "WorkflowExecutionStatus{code=7, desc='success'}", "WorkflowExecutionStatus{code=12, desc='delay execution'}", "WorkflowExecutionStatus{code=14, desc='serial wait'}", "WorkflowExecutionStatus{code=15, desc='ready block'}", "WorkflowExecutionStatus{code=16, desc='block'}", "WorkflowExecutionStatus{code=17, desc='wait to run'}" ] }, "desc": { "type": "string" } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/paths/12_favourite_api.json: -------------------------------------------------------------------------------- ```json { "paths": { "/favourite/{taskType}": { "post": { "tags": [ "偏好相关操作" ], "summary": "addTaskType", "description": "添加任务类型", "operationId": "addFavTask", "parameters": [ { "name": "taskType", "in": "path", "required": true, "schema": { "type": "string" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } }, "delete": { "tags": [ "偏好相关操作" ], "summary": "deleteTaskType", "description": "删除任务类型", "operationId": "deleteFavTask", "parameters": [ { "name": "taskType", "in": "path", "required": true, "schema": { "type": "string" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } }, "/favourite/taskTypes": { "get": { "tags": [ "偏好相关操作" ], "summary": "listTaskType", "description": "查询任务类型列表", "operationId": "listTaskType", "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } } } } ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/tools_generated/__init__.py: -------------------------------------------------------------------------------- ```python # __init__.py """Tools package for DolphinScheduler MCP.""" from mcp.server.fastmcp import FastMCP # Import registration functions for each tool module from .azure_data_factory_tools import register_azure_data_factory_tools from .dynamic_task_type_tools import register_dynamic_task_type_tools from .k8s_namespace_tools import register_k8s_namespace_tools from .project_worker_group_tools import register_project_worker_group_tools from .ui_plugin_tools import register_ui_plugin_tools from .worker_group_tools import register_worker_group_tools from .project_tools import register_project_tools from .datasource_tools import register_datasource_tools from .project_parameter_tools import register_project_parameter_tools from .lineage_tools import register_lineage_tools from .audit_log_tools import register_audit_log_tools from .access_token_tools import register_access_token_tools from .project_preference_tools import register_project_preference_tools from .environment_check_tools import register_environment_check_tools from .environment_update_tools import register_environment_update_tools from .process_task_relation_tools import register_process_task_relation_tools # The all_tools list is no longer used since we now rely on registration functions all_tools = [] def register_all_tools(mcp: FastMCP) -> None: """Register all available tools with the FastMCP instance. Args: mcp: The FastMCP instance to register tools with. """ # Register Cloud-related Tools (e.g., Azure Data Factory) register_azure_data_factory_tools(mcp) # Register Dynamic Task Type Tools register_dynamic_task_type_tools(mcp) # Register Kubernetes Namespace Tools register_k8s_namespace_tools(mcp) # Register Project Worker Group Tools register_project_worker_group_tools(mcp) # Register UI Plugin Tools register_ui_plugin_tools(mcp) # Register Worker Group Tools register_worker_group_tools(mcp) # Register Project Management Tools register_project_tools(mcp) # Register Datasource Management Tools register_datasource_tools(mcp) # Register Project Parameter Tools register_project_parameter_tools(mcp) # Register Data Lineage Tools register_lineage_tools(mcp) # Register Audit Log Tools register_audit_log_tools(mcp) # Register Access Token Tools register_access_token_tools(mcp) # Register Project Preference Tools register_project_preference_tools(mcp) # Register Environment Check Tools register_environment_check_tools(mcp) # Register Environment Update Tools register_environment_update_tools(mcp) # Register Process-Task Relation Tools register_process_task_relation_tools(mcp) __all__ = ["all_tools", "register_all_tools"] ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/fastmcp_compat.py: -------------------------------------------------------------------------------- ```python """Compatibility layer for FastMCP. This module provides a FastMCPTool class that emulates the old interface, but actually uses the new MCP API under the hood. """ from typing import Any, Dict, Optional, Type, TypeVar, get_type_hints import inspect from mcp.server.fastmcp import FastMCP from mcp.server.fastmcp.tools import Tool from mcp.types import TextContent, ImageContent, EmbeddedResource T = TypeVar('T', bound='FastMCPTool') class FastMCPTool: """Base class for FastMCP tools that emulates the old interface.""" name: str description: str is_async: bool = True schema: Dict[str, Any] def __init__(self, mcp: FastMCP): """Initialize the tool with the MCP instance.""" self.mcp = mcp self._register() def _register(self): """Register the tool with the MCP instance.""" if not hasattr(self, '_run'): raise NotImplementedError(f"Tool {self.__class__.__name__} must implement _run") # Get the run method run_method = getattr(self, '_run') # Get the parameters from the type hints type_hints = get_type_hints(run_method) # Remove return type if present if 'return' in type_hints: del type_hints['return'] # Get the signature sig = inspect.signature(run_method) # Remove 'self' parameter parameters = list(sig.parameters.values()) if parameters and parameters[0].name == 'self': parameters = parameters[1:] # 获取 schema 中定义的参数 param_names = [] if hasattr(self, 'schema') and self.schema and 'properties' in self.schema: param_names = list(self.schema['properties'].keys()) else: param_names = [p.name for p in parameters] # 构造接收具体参数的装饰器函数 # 这是为了避免使用 *args, **kwargs 通用参数 param_decl = ", ".join(param_names) # 创建工具函数的代码 tool_func_code = f""" async def tool_fn({param_decl}): \"\"\" {self.description} \"\"\" # 调用原始 run 方法 result = await self._run({param_decl}) # 返回文本内容 return TextContent(type="text", text=str(result)) """ # 执行代码创建函数 namespace = { 'self': self, 'TextContent': TextContent } exec(tool_func_code, namespace) tool_fn = namespace['tool_fn'] # 装饰并注册工具 self.mcp.tool(name=self.name, description=self.description)(tool_fn) @classmethod def register(cls: Type[T], mcp: FastMCP) -> T: """Register the tool with the MCP instance.""" return cls(mcp) def register_tool_class(mcp: FastMCP, tool_class: Type[FastMCPTool]) -> FastMCPTool: """Register a tool class with the MCP instance.""" return tool_class.register(mcp) ``` -------------------------------------------------------------------------------- /mcp-openapi-split/paths/01_azure_datafactory_api.json: -------------------------------------------------------------------------------- ```json { "paths": { "/cloud/azure/datafactory/resourceGroups": { "get": { "tags": [ "CLOUD_TAG" ], "summary": "listResourceGroup", "description": "LIST_RESOURCE_GROUP", "operationId": "listResourceGroup", "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } }, "/cloud/azure/datafactory/pipelines": { "get": { "tags": [ "CLOUD_TAG" ], "summary": "listPipeline", "description": "LIST_PIPELINE", "operationId": "listPipeline", "parameters": [ { "name": "factoryName", "in": "query", "required": true, "schema": { "type": "string" } }, { "name": "resourceGroupName", "in": "query", "required": true, "schema": { "type": "string" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } }, "/cloud/azure/datafactory/factories": { "get": { "tags": [ "CLOUD_TAG" ], "summary": "listDataFactory", "description": "LIST_DATA_FACTORY", "operationId": "listDataFactory", "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } } } } ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/tools_generated/azure_data_factory_tools.py: -------------------------------------------------------------------------------- ```python """Tools for azure data factory operations in DolphinScheduler.""" from typing import Dict, List, Optional from ..fastmcp_compat import FastMCPTool from ..client import DolphinSchedulerClient class ListCloudAzureDatafactoryResourcegroups(FastMCPTool): """Tool to list_resource_group""" name = "list_cloud_azure_datafactory_resourcegroups" description = "LIST_RESOURCE_GROUP" is_async = True async def _run(self) -> Dict: """Execute the GET operation on /cloud/azure/datafactory/resourceGroups.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/cloud/azure/datafactory/resourceGroups" ) return {"success": True, "data": response} finally: await client.close() class ListCloudAzureDatafactoryPipelines(FastMCPTool): """Tool to list_pipeline""" name = "list_cloud_azure_datafactory_pipelines" description = "LIST_PIPELINE" is_async = True schema = { "type": "object", "properties": { "factoryName": { "type": "string" }, "resourceGroupName": { "type": "string" } }, "required": [ "factoryName", "resourceGroupName" ] } async def _run(self, factoryName, resourceGroupName) -> Dict: """Execute the GET operation on /cloud/azure/datafactory/pipelines.""" client = DolphinSchedulerClient() try: params = { "factoryName": factoryName, "resourceGroupName": resourceGroupName, } response = await client.request( "GET", f"/cloud/azure/datafactory/pipelines", params=params ) return {"success": True, "data": response} finally: await client.close() class ListCloudAzureDatafactoryFactories(FastMCPTool): """Tool to list_data_factory""" name = "list_cloud_azure_datafactory_factories" description = "LIST_DATA_FACTORY" is_async = True async def _run(self) -> Dict: """Execute the GET operation on /cloud/azure/datafactory/factories.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/cloud/azure/datafactory/factories" ) return {"success": True, "data": response} finally: await client.close() def register_azure_data_factory_tools(mcp): """Register azure data factory tools with FastMCP. Args: mcp: The FastMCP instance to register tools with. """ from ..fastmcp_compat import register_tool_class register_tool_class(mcp, ListCloudAzureDatafactoryFactories) register_tool_class(mcp, ListCloudAzureDatafactoryPipelines) register_tool_class(mcp, ListCloudAzureDatafactoryResourcegroups) ``` -------------------------------------------------------------------------------- /mcp-openapi-split/paths/04_project_worker_group_api.json: -------------------------------------------------------------------------------- ```json { "paths": { "/projects/{projectCode}/worker-group": { "get": { "tags": [ "PROJECT_WORKER_GROUP_TAG" ], "summary": "queryWorkerGroups", "description": "QUERY_WORKER_GROUP_LIST", "operationId": "queryWorkerGroups", "parameters": [ { "name": "projectCode", "in": "path", "description": "项目Code", "required": true, "schema": { "type": "integer", "format": "int64" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "type": "object", "additionalProperties": { "type": "object" } } } } } } }, "post": { "tags": [ "PROJECT_WORKER_GROUP_TAG" ], "summary": "assignWorkerGroups", "description": "ASSIGN_WORKER_GROUPS_NOTES", "operationId": "assignWorkerGroups", "parameters": [ { "name": "projectCode", "in": "path", "description": "项目Code", "required": true, "schema": { "type": "integer", "format": "int64" } }, { "name": "workerGroups", "description": "Worker工作组列表", "schema": { "type": "string" } } ], "requestBody": { "content": { "application/json": { "schema": { "type": "array", "items": { "type": "string" } } } } }, "responses": { "201": { "description": "Created", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/23_schedule_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "Schedule": { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" }, "processDefinitionCode": { "type": "integer", "format": "int64" }, "processDefinitionName": { "type": "string" }, "projectName": { "type": "string" }, "definitionDescription": { "type": "string" }, "startTime": { "type": "string", "format": "date-time" }, "endTime": { "type": "string", "format": "date-time" }, "timezoneId": { "type": "string" }, "crontab": { "type": "string" }, "failureStrategy": { "type": "string", "enum": [ "END", "CONTINUE" ] }, "warningType": { "type": "string", "enum": [ "NONE", "SUCCESS", "FAILURE", "ALL", "GLOBAL" ] }, "createTime": { "type": "string", "format": "date-time" }, "updateTime": { "type": "string", "format": "date-time" }, "userId": { "type": "integer", "format": "int32" }, "userName": { "type": "string" }, "releaseState": { "type": "string", "enum": [ "OFFLINE", "ONLINE" ] }, "warningGroupId": { "type": "integer", "format": "int32" }, "processInstancePriority": { "type": "string", "enum": [ "HIGHEST", "HIGH", "MEDIUM", "LOW", "LOWEST" ] }, "workerGroup": { "type": "string" }, "tenantCode": { "type": "string" }, "environmentCode": { "type": "integer", "format": "int64" }, "environmentName": { "type": "string" } } } } } } ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/tools_generated/ui_plugin_tools.py: -------------------------------------------------------------------------------- ```python """Tools for ui plugin operations in DolphinScheduler.""" from typing import Dict, List, Optional from ..fastmcp_compat import FastMCPTool from ..client import DolphinSchedulerClient class GetUiPlugins(FastMCPTool): """Tool to 通过id查询ui插件详情""" name = "get_ui_plugins" description = "通过ID查询UI插件详情" is_async = True schema = { "type": "object", "properties": { "id": { "type": "integer", "format": "int32", "description": "\u63d2\u4ef6ID" } }, "required": [ "id" ] } async def _run(self, id) -> Dict: """Execute the GET operation on /ui-plugins/{id}. Args: id: 插件ID Returns: API响应 """ client = DolphinSchedulerClient() try: # 确保路径参数正确处理 plugin_id = int(id) if id is not None else None if plugin_id is None: return { "success": False, "error": "Missing required parameter: id" } response = await client.request( "GET", f"/ui-plugins/{plugin_id}" ) return {"success": True, "data": response} except ValueError: return { "success": False, "error": f"Invalid ID format: {id}" } except Exception as e: return { "success": False, "error": str(e) } finally: await client.close() class GetUiPluginsQueryByType(FastMCPTool): """Tool to 通过类型查询ui插件""" name = "get_ui_plugins_query_by_type" description = "通过类型查询UI插件" is_async = True schema = { "type": "object", "properties": { "plugin_type": { "type": "string", "enum": [ "ALERT", "REGISTER", "TASK" ], "description": "pluginType" } }, "required": [ "plugin_type" ] } async def _run(self, plugin_type) -> Dict: """Execute the GET operation on /ui-plugins/query-by-type. Args: plugin_type: 插件类型 Returns: API响应 """ client = DolphinSchedulerClient() try: if not plugin_type: return { "success": False, "error": "Missing required parameter: plugin_type" } params = { "pluginType": plugin_type, } response = await client.request( "GET", f"/ui-plugins/query-by-type", params=params ) return {"success": True, "data": response} except Exception as e: return { "success": False, "error": str(e) } finally: await client.close() def register_ui_plugin_tools(mcp): """Register ui plugin tool with FastMCP. Args: mcp: The FastMCP instance to register tools with. """ from ..fastmcp_compat import register_tool_class register_tool_class(mcp, GetUiPlugins) register_tool_class(mcp, GetUiPluginsQueryByType) ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/09_remaining_10_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "DatabaseMetrics": { "type": "object", "properties": { "dbType": { "type": "string", "enum": [ "MYSQL", "MARIADB", "ORACLE", "ORACLE_12C", "DB2", "H2", "HSQL", "SQLITE", "POSTGRE_SQL", "SQL_SERVER2005", "SQL_SERVER", "DM", "XU_GU", "KINGBASE_ES", "PHOENIX", "GAUSS", "CLICK_HOUSE", "GBASE", "GBASE_8S", "GBASEDBT", "GBASE_INFORMIX", "OSCAR", "SYBASE", "OCEAN_BASE", "FIREBIRD", "HIGH_GO", "CUBRID", "GOLDILOCKS", "CSIIDB", "SAP_HANA", "IMPALA", "VERTICA", "XCloud", "OTHER" ] }, "state": { "type": "string", "enum": [ "YES", "NO" ] }, "maxConnections": { "type": "integer", "format": "int64" }, "maxUsedConnections": { "type": "integer", "format": "int64" }, "threadsConnections": { "type": "integer", "format": "int64" }, "threadsRunningConnections": { "type": "integer", "format": "int64" }, "date": { "type": "string", "format": "date-time" } } }, "DqRule": { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" }, "name": { "type": "string" }, "type": { "type": "integer", "format": "int32" }, "ruleJson": { "type": "string" }, "userId": { "type": "integer", "format": "int32" }, "userName": { "type": "string" }, "createTime": { "type": "string", "format": "date-time" }, "updateTime": { "type": "string", "format": "date-time" } } } } } } ``` -------------------------------------------------------------------------------- /examples/simple_client.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python """ A simple example client for the DolphinScheduler MCP server. This example demonstrates how to use the MCP client to interact with DolphinScheduler. """ import asyncio import json import os from typing import Any, Dict, Optional import aiohttp class MCPClient: """A simple MCP client for DolphinScheduler.""" def __init__(self, url: str): """Initialize the MCP client. Args: url: The URL of the MCP server. """ self.url = url self.session: Optional[aiohttp.ClientSession] = None async def __aenter__(self): """Enter the async context manager.""" self.session = aiohttp.ClientSession() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Exit the async context manager.""" if self.session: await self.session.close() self.session = None async def invoke_tool(self, tool_name: str, arguments: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Invoke a tool on the MCP server. Args: tool_name: The name of the tool to invoke. arguments: Optional arguments for the tool. Returns: The response from the tool. """ if self.session is None: self.session = aiohttp.ClientSession() payload = { "toolName": tool_name, } if arguments: payload["arguments"] = arguments async with self.session.post(self.url, json=payload) as response: return await response.json() async def main(): """Run the example client.""" # Set up the MCP server URL mcp_url = os.environ.get("MCP_URL", "http://localhost:8089/mcp") print(f"Connecting to MCP server at {mcp_url}") async with MCPClient(mcp_url) as client: # Get connection settings print("\n1. Getting connection settings:") response = await client.invoke_tool("get-connection-settings") print(json.dumps(response, indent=2)) # Get project list print("\n2. Getting project list:") response = await client.invoke_tool("get-project-list") print(json.dumps(response, indent=2)) # Create a new project print("\n3. Creating a new project:") response = await client.invoke_tool( "create-project", {"name": "MCP Demo Project", "description": "Project created by MCP demo"} ) print(json.dumps(response, indent=2)) # If the project was created successfully, get its code if response.get("result", {}).get("success", False): project_code = response.get("result", {}).get("code") # Get project details print(f"\n4. Getting details for project {project_code}:") response = await client.invoke_tool("get-project", {"project_code": project_code}) print(json.dumps(response, indent=2)) # Create a process definition print(f"\n5. Creating a process definition in project {project_code}:") process_def = { "project_code": project_code, "name": "MCP Demo Process", "description": "Process created by MCP demo", "task_definitions": [], "execution_type": "PARALLEL" } response = await client.invoke_tool("create-process-definition", process_def) print(json.dumps(response, indent=2)) # Delete the project print(f"\n6. Deleting project {project_code}:") response = await client.invoke_tool("delete-project", {"project_code": project_code}) print(json.dumps(response, indent=2)) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/tools_loader.py: -------------------------------------------------------------------------------- ```python """Tool loader for DolphinScheduler MCP. This module provides utilities to load tools from the tools directory. """ import importlib import importlib.util import os import sys import logging from pathlib import Path from typing import List, Optional from mcp.server.fastmcp import FastMCP logger = logging.getLogger(__name__) def get_tools_dir() -> Path: """Get the path to the tools directory. Returns: Path to the tools directory """ return Path(__file__).parent / "tools" def get_tools_generated_dir() -> Path: """Get the path to the tools_generated directory. Returns: Path to the tools_generated directory """ return Path(__file__).parent / "tools_generated" def is_tool_module(filename: str) -> bool: """Check if a file is a tool module. Args: filename: Filename to check Returns: True if the file is a tool module, False otherwise """ return ( filename.endswith(".py") and not filename.startswith("__") and not filename.startswith("_") ) def get_tool_modules() -> List[str]: """Get a list of all tool modules. Returns: List of tool module names """ tools_dir = get_tools_dir() if not tools_dir.exists(): return [] return [ f.stem for f in tools_dir.glob("*.py") if is_tool_module(f.name) ] def import_tools_module(module_name: str) -> Optional[object]: """Import a tools module. Args: module_name: Name of the module to import Returns: The imported module, or None if the module could not be imported """ try: module = importlib.import_module(f"dolphinscheduler_mcp.tools.{module_name}") return module except ImportError as e: logger.error(f"Error importing module {module_name}: {e}") return None def ensure_env_variables(): """Ensure environment variables are properly set. Logs warnings if required environment variables are missing. """ # Check if API URL and key are set in environment variables api_url = os.environ.get("DOLPHINSCHEDULER_API_URL") api_key = os.environ.get("DOLPHINSCHEDULER_API_KEY") if not api_url: logger.warning("DOLPHINSCHEDULER_API_URL environment variable is not set.") logger.warning("Using default URL from Config class.") else: logger.info(f"Using API URL: {api_url}") if not api_key: logger.warning("DOLPHINSCHEDULER_API_KEY environment variable is not set.") logger.warning("Authentication to the DolphinScheduler API may fail.") else: logger.info("API Key is set and will be used for authentication") def register_all_generated_tools(mcp: FastMCP) -> int: """Register all tools with FastMCP. Args: mcp: The FastMCP instance to register tools with Returns: Number of modules successfully registered """ # Make sure environment variables are properly set ensure_env_variables() try: # Import the register_all_tools function from tools_generated from dolphinscheduler_mcp.tools_generated import register_all_tools # Register all tools register_all_tools(mcp) # Count the number of tool modules in tools_generated tools_generated_dir = get_tools_generated_dir() if tools_generated_dir.exists(): tool_count = len([ f for f in tools_generated_dir.glob("*.py") if is_tool_module(f.name) ]) return tool_count return 0 except ImportError as e: logger.error(f"Error importing tools_generated: {e}") return 0 except Exception as e: logger.error(f"Error registering tools: {e}") return 0 ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/07_queue_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "Queue": { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" }, "queueName": { "type": "string" }, "queue": { "type": "string" }, "createTime": { "type": "string", "format": "date-time" }, "updateTime": { "type": "string", "format": "date-time" } } }, "ResultQueue": { "type": "object", "properties": { "code": { "type": "integer", "format": "int32" }, "msg": { "type": "string" }, "data": { "$ref": "#/components/schemas/Queue" }, "failed": { "type": "boolean" }, "success": { "type": "boolean" } } }, "PageInfoQueue": { "type": "object", "properties": { "totalList": { "type": "array", "items": { "$ref": "#/components/schemas/Queue" } }, "total": { "type": "integer", "format": "int32" }, "totalPage": { "type": "integer", "format": "int32" }, "pageSize": { "type": "integer", "format": "int32" }, "currentPage": { "type": "integer", "format": "int32" }, "pageNo": { "type": "integer", "format": "int32" } } }, "ResultPageInfoQueue": { "type": "object", "properties": { "code": { "type": "integer", "format": "int32" }, "msg": { "type": "string" }, "data": { "$ref": "#/components/schemas/PageInfoQueue" }, "failed": { "type": "boolean" }, "success": { "type": "boolean" } } }, "ResultListQueue": { "type": "object", "properties": { "code": { "type": "integer", "format": "int32" }, "msg": { "type": "string" }, "data": { "type": "array", "items": { "$ref": "#/components/schemas/Queue" } }, "failed": { "type": "boolean" }, "success": { "type": "boolean" } } } } } } ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/config.py: -------------------------------------------------------------------------------- ```python """Configuration for DolphinScheduler MCP.""" import os import json from pathlib import Path from typing import Optional, Dict, Any def read_mcp_settings() -> Dict[str, Any]: """Read MCP settings from the Cursor MCP settings file. Returns: A dictionary containing the MCP settings """ # Default location for the Cursor MCP settings file settings_path = os.path.expanduser("~/Library/Application Support/Cursor/User/globalStorage/rooveterinaryinc.roo-cline/settings/mcp_settings.json") if os.path.exists(settings_path): try: with open(settings_path, 'r') as f: settings = json.load(f) return settings except Exception as e: print(f"Error reading MCP settings file: {e}") return {} def get_env_from_mcp_settings() -> Dict[str, str]: """Get environment variables from MCP settings. Returns: A dictionary containing environment variables """ settings = read_mcp_settings() env_vars = {} print("Reading MCP settings:", settings.keys() if settings else "No settings found") # Look for the dolphinscheduler server config if 'mcpServers' in settings and 'dolphinscheduler' in settings['mcpServers']: server_config = settings['mcpServers']['dolphinscheduler'] print("Found dolphinscheduler server config:", server_config.keys()) if 'env' in server_config: env_vars = server_config['env'] print("Found environment variables in MCP settings:", env_vars) return env_vars class Config: """Configuration for DolphinScheduler MCP.""" _instance = None def __new__(cls): """Create a new instance of Config or return the existing one.""" if cls._instance is None: cls._instance = super(Config, cls).__new__(cls) # First, try to get env variables from MCP settings mcp_env = get_env_from_mcp_settings() # Get API URL from MCP settings, env var, or use default cls._instance._api_url = mcp_env.get( "DOLPHINSCHEDULER_API_URL", os.environ.get( "DOLPHINSCHEDULER_API_URL", "http://localhost:12345/dolphinscheduler" ) ) # Get API key from MCP settings, env var, or use default cls._instance._api_key = mcp_env.get( "DOLPHINSCHEDULER_API_KEY", os.environ.get("DOLPHINSCHEDULER_API_KEY", "") ) # Set the environment variables for other parts of the app if cls._instance._api_url: os.environ["DOLPHINSCHEDULER_API_URL"] = cls._instance._api_url if cls._instance._api_key: os.environ["DOLPHINSCHEDULER_API_KEY"] = cls._instance._api_key return cls._instance @property def api_url(self) -> str: """Get the API URL. Returns: The API URL. """ return self._api_url @api_url.setter def api_url(self, value: str) -> None: """Set the API URL. Args: value: The API URL. """ self._api_url = value # We could also update the environment variable here os.environ["DOLPHINSCHEDULER_API_URL"] = value @property def api_key(self) -> str: """Get the API key. Returns: The API key. """ return self._api_key @api_key.setter def api_key(self, value: str) -> None: """Set the API key. Args: value: The API key. """ self._api_key = value # We could also update the environment variable here os.environ["DOLPHINSCHEDULER_API_KEY"] = value def has_api_key(self) -> bool: """Check if an API key is set. Returns: True if an API key is set, False otherwise. """ return bool(self._api_key) ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/tools_generated/project_preference_tools.py: -------------------------------------------------------------------------------- ```python """Tools for project preference operations in DolphinScheduler.""" from typing import Dict, List, Optional from ..fastmcp_compat import FastMCPTool from ..client import DolphinSchedulerClient class GetProjectsProjectPreference(FastMCPTool): """Tool to query project preference""" name = "get_projects_project_preference" description = "query project preference" is_async = True schema = { "type": "object", "properties": { "project_code": { "type": "integer", "format": "int64", "description": "\u9879\u76eeCode" } }, "required": [ "project_code" ] } async def _run(self, project_code) -> Dict: """Execute the GET operation on /projects/{projectCode}/project-preference.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/projects/{project_code}/project-preference" ) return {"success": True, "data": response} finally: await client.close() class UpdateProjectsProjectPreference(FastMCPTool): """Tool to update project preference""" name = "update_projects_project_preference" description = "update project preference" is_async = True schema = { "type": "object", "properties": { "project_code": { "type": "integer", "format": "int64", "description": "\u9879\u76eeCode" }, "project_preferences": { "type": "string", "description": "project preferences" } }, "required": [ "project_code", "project_preferences" ] } async def _run(self, project_code, project_preferences) -> Dict: """Execute the PUT operation on /projects/{projectCode}/project-preference.""" client = DolphinSchedulerClient() try: params = { "projectPreferences": project_preferences, } response = await client.request( "PUT", f"/projects/{project_code}/project-preference", params=params ) return {"success": True, "data": response} finally: await client.close() class CreateProjectsProjectPreference(FastMCPTool): """Tool to update the state of the project preference""" name = "create_projects_project_preference" description = "update the state of the project preference" is_async = True schema = { "type": "object", "properties": { "project_code": { "type": "integer", "format": "int64", "description": "\u9879\u76eeCode" }, "state": { "type": "string", "description": "the state of the project preference" } }, "required": [ "project_code", "state" ] } async def _run(self, project_code, state) -> Dict: """Execute the POST operation on /projects/{projectCode}/project-preference.""" client = DolphinSchedulerClient() try: params = { "state": state, } response = await client.request( "POST", f"/projects/{project_code}/project-preference", params=params ) return {"success": True, "data": response} finally: await client.close() def register_project_preference_tools(mcp): """Register project preference tools with FastMCP. Args: mcp: The FastMCP instance to register tools with. """ from ..fastmcp_compat import register_tool_class register_tool_class(mcp, CreateProjectsProjectPreference) register_tool_class(mcp, GetProjectsProjectPreference) register_tool_class(mcp, UpdateProjectsProjectPreference) ``` -------------------------------------------------------------------------------- /mcp-openapi-split/paths/07_project_preference_api.json: -------------------------------------------------------------------------------- ```json { "paths": { "/projects/{projectCode}/project-preference": { "get": { "tags": [ "project preference related operation" ], "summary": "queryProjectPreferenceByProjectCode", "description": "query project preference", "operationId": "queryProjectPreferenceByProjectCode", "parameters": [ { "name": "projectCode", "in": "path", "description": "项目Code", "required": true, "schema": { "type": "integer", "format": "int64" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } }, "put": { "tags": [ "project preference related operation" ], "summary": "updateProjectPreference", "description": "update project preference", "operationId": "updateProjectPreference", "parameters": [ { "name": "projectCode", "in": "path", "description": "项目Code", "required": true, "schema": { "type": "integer", "format": "int64" } }, { "name": "projectPreferences", "in": "query", "description": "project preferences", "required": true, "schema": { "type": "string" } } ], "responses": { "201": { "description": "Created", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } }, "post": { "tags": [ "project preference related operation" ], "summary": "enableProjectPreference", "description": "update the state of the project preference", "operationId": "enableProjectPreference", "parameters": [ { "name": "projectCode", "in": "path", "description": "项目Code", "required": true, "schema": { "type": "integer", "format": "int64" } }, { "name": "state", "in": "query", "description": "the state of the project preference", "required": true, "schema": { "type": "string" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/25_tenant_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "ResultTenant": { "type": "object", "properties": { "code": { "type": "integer", "format": "int32" }, "msg": { "type": "string" }, "data": { "$ref": "#/components/schemas/Tenant" }, "failed": { "type": "boolean" }, "success": { "type": "boolean" } } }, "Tenant": { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" }, "tenantCode": { "type": "string" }, "description": { "type": "string" }, "queueId": { "type": "integer", "format": "int32" }, "queueName": { "type": "string" }, "queue": { "type": "string" }, "createTime": { "type": "string", "format": "date-time" }, "updateTime": { "type": "string", "format": "date-time" } } }, "PageInfoTenant": { "type": "object", "properties": { "totalList": { "type": "array", "items": { "$ref": "#/components/schemas/Tenant" } }, "total": { "type": "integer", "format": "int32" }, "totalPage": { "type": "integer", "format": "int32" }, "pageSize": { "type": "integer", "format": "int32" }, "currentPage": { "type": "integer", "format": "int32" }, "pageNo": { "type": "integer", "format": "int32" } } }, "ResultPageInfoTenant": { "type": "object", "properties": { "code": { "type": "integer", "format": "int32" }, "msg": { "type": "string" }, "data": { "$ref": "#/components/schemas/PageInfoTenant" }, "failed": { "type": "boolean" }, "success": { "type": "boolean" } } }, "ResultListTenant": { "type": "object", "properties": { "code": { "type": "integer", "format": "int32" }, "msg": { "type": "string" }, "data": { "type": "array", "items": { "$ref": "#/components/schemas/Tenant" } }, "failed": { "type": "boolean" }, "success": { "type": "boolean" } } } } } } ``` -------------------------------------------------------------------------------- /examples/manage_resources.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python3 """Example script to manage DolphinScheduler resources using the MCP client.""" import argparse import asyncio import json import os import sys from typing import Dict, Any # For local development, add the parent directory to the Python path sys.path.append(".") # Import the MCPClient from simple_client.py from examples.simple_client import MCPClient async def list_resources(client: MCPClient, args: argparse.Namespace) -> None: """List resources in DolphinScheduler.""" parameters: Dict[str, Any] = {} if args.pid is not None: parameters["pid"] = args.pid if args.resource_type is not None: parameters["resource_type"] = args.resource_type if args.search_val is not None: parameters["search_val"] = args.search_val response = await client.invoke_tool("get-resource-list", parameters) if "result" in response and "data" in response["result"]: resources = response["result"]["data"] print(f"Found {len(resources)} resources:") for resource in resources: print(f"- {resource.get('name', 'Unknown')} (ID: {resource.get('id', 'Unknown')})") print(f" Type: {resource.get('type', 'Unknown')}") if "description" in resource and resource["description"]: print(f" Description: {resource['description']}") print() else: print("Error or no resources found:") print(json.dumps(response, indent=2)) async def upload_resource(client: MCPClient, args: argparse.Namespace) -> None: """Upload a resource to DolphinScheduler.""" with open(args.file_path, "rb") as file: file_content = file.read() parameters = { "pid": args.pid, "name": os.path.basename(args.file_path), "description": args.description or "", "content": file_content.decode("utf-8") if args.is_text else "", "is_text": args.is_text, } response = await client.invoke_tool("create-resource", parameters) print(json.dumps(response, indent=2)) async def delete_resource(client: MCPClient, args: argparse.Namespace) -> None: """Delete a resource from DolphinScheduler.""" parameters = { "resource_id": args.resource_id } response = await client.invoke_tool("delete-resource", parameters) print(json.dumps(response, indent=2)) async def main(): """Manage DolphinScheduler resources using MCP.""" parser = argparse.ArgumentParser(description="Manage DolphinScheduler resources") subparsers = parser.add_subparsers(dest="command", help="Command to execute") # List resources command list_parser = subparsers.add_parser("list", help="List resources") list_parser.add_argument("--pid", type=int, help="Parent directory ID (0 for root)") list_parser.add_argument("--resource-type", type=str, choices=["FILE", "UDF"], help="Resource type") list_parser.add_argument("--search-val", type=str, help="Search value") # Upload resource command upload_parser = subparsers.add_parser("upload", help="Upload a resource") upload_parser.add_argument("--pid", type=int, required=True, help="Parent directory ID") upload_parser.add_argument("--file-path", type=str, required=True, help="Path to the file to upload") upload_parser.add_argument("--description", type=str, help="Resource description") upload_parser.add_argument("--is-text", action="store_true", help="Whether the file is a text file") # Delete resource command delete_parser = subparsers.add_parser("delete", help="Delete a resource") delete_parser.add_argument("--resource-id", type=int, required=True, help="Resource ID to delete") args = parser.parse_args() # Set up the MCP server URL mcp_url = os.environ.get("MCP_URL", "http://localhost:8089/mcp") print(f"Connecting to MCP server at {mcp_url}") async with MCPClient(mcp_url) as client: if args.command == "list": await list_resources(client, args) elif args.command == "upload": await upload_resource(client, args) elif args.command == "delete": await delete_resource(client, args) else: parser.print_help() if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/10_remaining_11_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "PageInfoDqRule": { "type": "object", "properties": { "totalList": { "type": "array", "items": { "$ref": "#/components/schemas/DqRule" } }, "total": { "type": "integer", "format": "int32" }, "totalPage": { "type": "integer", "format": "int32" }, "pageSize": { "type": "integer", "format": "int32" }, "currentPage": { "type": "integer", "format": "int32" }, "pageNo": { "type": "integer", "format": "int32" } } }, "DqExecuteResult": { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" }, "processDefinitionId": { "type": "integer", "format": "int64" }, "processDefinitionName": { "type": "string" }, "processDefinitionCode": { "type": "integer", "format": "int64" }, "processInstanceId": { "type": "integer", "format": "int64" }, "processInstanceName": { "type": "string" }, "projectCode": { "type": "integer", "format": "int64" }, "taskInstanceId": { "type": "integer", "format": "int64" }, "taskName": { "type": "string" }, "ruleType": { "type": "integer", "format": "int32" }, "ruleName": { "type": "string" }, "statisticsValue": { "type": "number", "format": "double" }, "comparisonValue": { "type": "number", "format": "double" }, "comparisonType": { "type": "integer", "format": "int32" }, "comparisonTypeName": { "type": "string" }, "checkType": { "type": "integer", "format": "int32" }, "threshold": { "type": "number", "format": "double" }, "operator": { "type": "integer", "format": "int32" }, "failureStrategy": { "type": "integer", "format": "int32" }, "userId": { "type": "integer", "format": "int32" }, "userName": { "type": "string" }, "state": { "type": "integer", "format": "int32" }, "errorOutputPath": { "type": "string" }, "createTime": { "type": "string", "format": "date-time" }, "updateTime": { "type": "string", "format": "date-time" } } } } } } ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/tools_generated/audit_log_tools.py: -------------------------------------------------------------------------------- ```python """Tools for audit log operations in DolphinScheduler.""" from typing import Dict, List, Optional from ..fastmcp_compat import FastMCPTool from ..client import DolphinSchedulerClient class GetProjectsAuditAuditLogOperationType(FastMCPTool): """Tool to query_audit_operation_type_list""" name = "get_projects_audit_audit_log_operation_type" description = "QUERY_AUDIT_OPERATION_TYPE_LIST" is_async = True async def _run(self) -> Dict: """Execute the GET operation on /projects/audit/audit-log-operation-type.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/projects/audit/audit-log-operation-type" ) return {"success": True, "data": response} finally: await client.close() class GetProjectsAuditAuditLogModelType(FastMCPTool): """Tool to query_audit_model_type_list""" name = "get_projects_audit_audit_log_model_type" description = "QUERY_AUDIT_MODEL_TYPE_LIST" is_async = True async def _run(self) -> Dict: """Execute the GET operation on /projects/audit/audit-log-model-type.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/projects/audit/audit-log-model-type" ) return {"success": True, "data": response} finally: await client.close() class GetProjectsAuditAuditLogList(FastMCPTool): """Tool to 查询审计日志""" name = "get_projects_audit_audit_log_list" description = "查询审计日志" is_async = True schema = { "type": "object", "properties": { "page_no": { "type": "integer", "format": "int32", "description": "\u9875\u7801\u53f7" }, "page_size": { "type": "integer", "format": "int32", "description": "\u9875\u5927\u5c0f" }, "model_types": { "type": "string" }, "operation_types": { "type": "string", "description": "OPERATION_TYPES" }, "start_date": { "type": "string", "description": "\u5f00\u59cb\u65f6\u95f4" }, "end_date": { "type": "string", "description": "\u7ed3\u675f\u65f6\u95f4" }, "user_name": { "type": "string", "description": "\u7528\u6237\u540d" }, "model_name": { "type": "string" }, "object_types": { "type": "string", "description": "MODEL_TYPES" }, "object_name": { "type": "string", "description": "MODEL_NAME" } }, "required": [ "page_no", "page_size" ] } async def _run(self, page_no, page_size, model_types, operation_types, start_date, end_date, user_name, model_name, object_types, object_name) -> Dict: """Execute the GET operation on /projects/audit/audit-log-list.""" client = DolphinSchedulerClient() try: params = { "pageNo": page_no, "pageSize": page_size, "modelTypes": model_types, "operationTypes": operation_types, "startDate": start_date, "endDate": end_date, "userName": user_name, "modelName": model_name, } response = await client.request( "GET", f"/projects/audit/audit-log-list", params=params ) return {"success": True, "data": response} finally: await client.close() def register_audit_log_tools(mcp): """Register audit log tools with FastMCP. Args: mcp: The FastMCP instance to register tools with. """ from ..fastmcp_compat import register_tool_class register_tool_class(mcp, GetProjectsAuditAuditLogList) register_tool_class(mcp, GetProjectsAuditAuditLogModelType) register_tool_class(mcp, GetProjectsAuditAuditLogOperationType) ``` -------------------------------------------------------------------------------- /docs/api.md: -------------------------------------------------------------------------------- ```markdown # DolphinScheduler MCP Server API This document describes the tools available in the DolphinScheduler MCP server and how to use them. ## Connection Management Tools ### Get Connection Settings Retrieves the current connection settings for the DolphinScheduler API. **Tool Name:** `get-connection-settings` **Arguments:** None **Example Request:** ```json { "name": "get-connection-settings" } ``` **Example Response:** ```json { "url": "http://localhost:12345/dolphinscheduler", "has_api_key": true } ``` ### Update Connection Settings Updates the connection settings for the DolphinScheduler API. **Tool Name:** `update-connection-settings` **Arguments:** - `url` (string, optional): The new DolphinScheduler API URL - `api_key` (string, optional): The new API key for authentication **Example Request:** ```json { "name": "update-connection-settings", "arguments": { "url": "http://new-host:12345/dolphinscheduler", "api_key": "new-api-key" } } ``` **Example Response:** ```json { "success": true, "url": "http://new-host:12345/dolphinscheduler", "has_api_key": true } ``` ## Project Management Tools ### Get Projects Retrieves a list of all projects in DolphinScheduler. **Tool Name:** `get-projects` **Arguments:** None **Example Request:** ```json { "name": "get-projects" } ``` ### Get Project Retrieves details of a specific project. **Tool Name:** `get-project` **Arguments:** - `project_id` (integer): The ID of the project to retrieve **Example Request:** ```json { "name": "get-project", "arguments": { "project_id": 1 } } ``` ## Workflow Management Tools ### Get Workflows Retrieves workflows for a specific project. **Tool Name:** `get-workflows` **Arguments:** - `project_id` (integer): The ID of the project **Example Request:** ```json { "name": "get-workflows", "arguments": { "project_id": 1 } } ``` ### Get Workflow Retrieves details of a specific workflow. **Tool Name:** `get-workflow` **Arguments:** - `project_id` (integer): The ID of the project - `workflow_id` (integer): The ID of the workflow **Example Request:** ```json { "name": "get-workflow", "arguments": { "project_id": 1, "workflow_id": 1 } } ``` ## Workflow Instance Management Tools ### Get Workflow Instances Retrieves workflow instances for a specific project. **Tool Name:** `get-workflow-instances` **Arguments:** - `project_id` (integer): The ID of the project **Example Request:** ```json { "name": "get-workflow-instances", "arguments": { "project_id": 1 } } ``` ### Get Workflow Instance Retrieves details of a specific workflow instance. **Tool Name:** `get-workflow-instance` **Arguments:** - `project_id` (integer): The ID of the project - `instance_id` (integer): The ID of the workflow instance **Example Request:** ```json { "name": "get-workflow-instance", "arguments": { "project_id": 1, "instance_id": 1 } } ``` ### Start Workflow Starts a workflow execution. **Tool Name:** `start-workflow` **Arguments:** - `project_id` (integer): The ID of the project - `workflow_id` (integer): The ID of the workflow to execute - `params` (object, optional): Workflow parameters **Example Request:** ```json { "name": "start-workflow", "arguments": { "project_id": 1, "workflow_id": 1, "params": { "param1": "value1", "param2": "value2" } } } ``` ### Stop Workflow Instance Stops a running workflow instance. **Tool Name:** `stop-workflow-instance` **Arguments:** - `project_id` (integer): The ID of the project - `instance_id` (integer): The ID of the workflow instance to stop **Example Request:** ```json { "name": "stop-workflow-instance", "arguments": { "project_id": 1, "instance_id": 1 } } ``` ## Task Instance Management Tools ### Get Task Instances Retrieves task instances for a workflow instance. **Tool Name:** `get-task-instances` **Arguments:** - `project_id` (integer): The ID of the project - `instance_id` (integer): The ID of the workflow instance **Example Request:** ```json { "name": "get-task-instances", "arguments": { "project_id": 1, "instance_id": 1 } } ``` ## System Status Tools ### Get System Status Retrieves overall system status including master and worker nodes. **Tool Name:** `get-system-status` **Arguments:** None **Example Request:** ```json { "name": "get-system-status" } ``` ## Resource Management Tools ### Get Resources Retrieves a list of resources (files and directories). **Tool Name:** `get-resources` **Arguments:** None **Example Request:** ```json { "name": "get-resources" } ``` ``` -------------------------------------------------------------------------------- /mcp-openapi-split/paths/26_login_api.json: -------------------------------------------------------------------------------- ```json { "paths": { "/signOut": { "post": { "tags": [ "用户登录相关操作" ], "summary": "signOut", "description": "退出登录", "operationId": "signOut", "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } }, "/login": { "post": { "tags": [ "用户登录相关操作" ], "summary": "login", "description": "用户登录", "operationId": "login", "parameters": [ { "name": "userName", "in": "query", "description": "用户名", "required": true, "schema": { "type": "string" } }, { "name": "userPassword", "in": "query", "description": "用户密码", "required": true, "schema": { "type": "string" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } }, "/redirect/login/oauth2": { "get": { "tags": [ "用户登录相关操作" ], "summary": "redirectToOauth2", "description": "REDIRECT_TO_OAUTH2_LOGIN", "operationId": "loginByAuth2", "parameters": [ { "name": "code", "in": "query", "required": true, "schema": { "type": "string" } }, { "name": "provider", "in": "query", "required": true, "schema": { "type": "string" } } ], "responses": { "200": { "description": "OK" } } } }, "/oauth2-provider": { "get": { "tags": [ "用户登录相关操作" ], "summary": "getOauth2Provider", "description": "GET_OAUTH2_PROVIDER", "operationId": "oauth2Provider", "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultListOAuth2ClientProperties" } } } } } } }, "/login/sso": { "get": { "tags": [ "用户登录相关操作" ], "summary": "sso login", "description": "user sso login", "operationId": "ssoLogin", "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } }, "/cookies": { "delete": { "tags": [ "用户登录相关操作" ], "operationId": "clearCookieSessionId", "responses": { "200": { "description": "OK" } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/18_remaining_7_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "PageInfoErrorCommand": { "type": "object", "properties": { "totalList": { "type": "array", "items": { "$ref": "#/components/schemas/ErrorCommand" } }, "total": { "type": "integer", "format": "int32" }, "totalPage": { "type": "integer", "format": "int32" }, "pageSize": { "type": "integer", "format": "int32" }, "currentPage": { "type": "integer", "format": "int32" }, "pageNo": { "type": "integer", "format": "int32" } } }, "Command": { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" }, "commandType": { "type": "string", "enum": [ "START_PROCESS", "START_CURRENT_TASK_PROCESS", "RECOVER_TOLERANCE_FAULT_PROCESS", "RECOVER_SUSPENDED_PROCESS", "START_FAILURE_TASK_PROCESS", "COMPLEMENT_DATA", "SCHEDULER", "REPEAT_RUNNING", "PAUSE", "STOP", "RECOVER_WAITING_THREAD", "RECOVER_SERIAL_WAIT", "EXECUTE_TASK", "DYNAMIC_GENERATION" ] }, "processDefinitionCode": { "type": "integer", "format": "int64" }, "executorId": { "type": "integer", "format": "int32" }, "commandParam": { "type": "string" }, "taskDependType": { "type": "string", "enum": [ "TASK_ONLY", "TASK_PRE", "TASK_POST" ] }, "failureStrategy": { "type": "string", "enum": [ "END", "CONTINUE" ] }, "warningType": { "type": "string", "enum": [ "NONE", "SUCCESS", "FAILURE", "ALL", "GLOBAL" ] }, "warningGroupId": { "type": "integer", "format": "int32" }, "scheduleTime": { "type": "string", "format": "date-time" }, "startTime": { "type": "string", "format": "date-time" }, "processInstancePriority": { "type": "string", "enum": [ "HIGHEST", "HIGH", "MEDIUM", "LOW", "LOWEST" ] }, "updateTime": { "type": "string", "format": "date-time" }, "workerGroup": { "type": "string" }, "tenantCode": { "type": "string" }, "environmentCode": { "type": "integer", "format": "int64" }, "dryRun": { "type": "integer", "format": "int32" }, "processInstanceId": { "type": "integer", "format": "int32" }, "processDefinitionVersion": { "type": "integer", "format": "int32" }, "testFlag": { "type": "integer", "format": "int32" } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/03_datasource_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "BaseDataSourceParamDTO": { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" }, "name": { "type": "string" }, "note": { "type": "string" }, "host": { "type": "string" }, "port": { "type": "integer", "format": "int32" }, "database": { "type": "string" }, "userName": { "type": "string" }, "password": { "type": "string" }, "other": { "type": "object", "additionalProperties": { "type": "string" } }, "hostAndPortByAddress": { "type": "string", "writeOnly": true }, "type": { "type": "string", "enum": [ "MYSQL", "POSTGRESQL", "HIVE", "SPARK", "CLICKHOUSE", "ORACLE", "SQLSERVER", "DB2", "PRESTO", "H2", "REDSHIFT", "ATHENA", "TRINO", "STARROCKS", "AZURESQL", "DAMENG", "OCEANBASE", "SSH", "KYUUBI", "DATABEND", "SNOWFLAKE", "VERTICA", "HANA", "DORIS", "ZEPPELIN", "SAGEMAKER" ] } } }, "DataSource": { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" }, "userId": { "type": "integer", "format": "int32" }, "userName": { "type": "string" }, "name": { "type": "string" }, "note": { "type": "string" }, "type": { "type": "string", "enum": [ "MYSQL", "POSTGRESQL", "HIVE", "SPARK", "CLICKHOUSE", "ORACLE", "SQLSERVER", "DB2", "PRESTO", "H2", "REDSHIFT", "ATHENA", "TRINO", "STARROCKS", "AZURESQL", "DAMENG", "OCEANBASE", "SSH", "KYUUBI", "DATABEND", "SNOWFLAKE", "VERTICA", "HANA", "DORIS", "ZEPPELIN", "SAGEMAKER" ] }, "connectionParams": { "type": "string" }, "createTime": { "type": "string", "format": "date-time" }, "updateTime": { "type": "string", "format": "date-time" } } }, "ResultDataSource": { "type": "object", "properties": { "code": { "type": "integer", "format": "int32" }, "msg": { "type": "string" }, "data": { "$ref": "#/components/schemas/DataSource" }, "failed": { "type": "boolean" }, "success": { "type": "boolean" } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/17_remaining_6_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "PageInfoAuditDto": { "type": "object", "properties": { "totalList": { "type": "array", "items": { "$ref": "#/components/schemas/AuditDto" } }, "total": { "type": "integer", "format": "int32" }, "totalPage": { "type": "integer", "format": "int32" }, "pageSize": { "type": "integer", "format": "int32" }, "currentPage": { "type": "integer", "format": "int32" }, "pageNo": { "type": "integer", "format": "int32" } } }, "ErrorCommand": { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" }, "commandType": { "type": "string", "enum": [ "START_PROCESS", "START_CURRENT_TASK_PROCESS", "RECOVER_TOLERANCE_FAULT_PROCESS", "RECOVER_SUSPENDED_PROCESS", "START_FAILURE_TASK_PROCESS", "COMPLEMENT_DATA", "SCHEDULER", "REPEAT_RUNNING", "PAUSE", "STOP", "RECOVER_WAITING_THREAD", "RECOVER_SERIAL_WAIT", "EXECUTE_TASK", "DYNAMIC_GENERATION" ] }, "processDefinitionCode": { "type": "integer", "format": "int64" }, "processDefinitionVersion": { "type": "integer", "format": "int32" }, "processInstanceId": { "type": "integer", "format": "int32" }, "executorId": { "type": "integer", "format": "int32" }, "commandParam": { "type": "string" }, "taskDependType": { "type": "string", "enum": [ "TASK_ONLY", "TASK_PRE", "TASK_POST" ] }, "failureStrategy": { "type": "string", "enum": [ "END", "CONTINUE" ] }, "warningType": { "type": "string", "enum": [ "NONE", "SUCCESS", "FAILURE", "ALL", "GLOBAL" ] }, "warningGroupId": { "type": "integer", "format": "int32" }, "scheduleTime": { "type": "string", "format": "date-time" }, "startTime": { "type": "string", "format": "date-time" }, "processInstancePriority": { "type": "string", "enum": [ "HIGHEST", "HIGH", "MEDIUM", "LOW", "LOWEST" ] }, "updateTime": { "type": "string", "format": "date-time" }, "message": { "type": "string" }, "workerGroup": { "type": "string" }, "tenantCode": { "type": "string" }, "environmentCode": { "type": "integer", "format": "int64" }, "dryRun": { "type": "integer", "format": "int32" }, "testFlag": { "type": "integer", "format": "int32" } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/paths/16_audit_log_api.json: -------------------------------------------------------------------------------- ```json { "paths": { "/projects/audit/audit-log-operation-type": { "get": { "tags": [ "审计日志执行相关操作" ], "summary": "queryAuditOperationTypeList", "description": "QUERY_AUDIT_OPERATION_TYPE_LIST", "operationId": "queryAuditOperationTypeList", "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultListAuditOperationTypeDto" } } } } } } }, "/projects/audit/audit-log-model-type": { "get": { "tags": [ "审计日志执行相关操作" ], "summary": "queryAuditModelTypeList", "description": "QUERY_AUDIT_MODEL_TYPE_LIST", "operationId": "queryAuditModelTypeList", "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultListAuditModelTypeDto" } } } } } } }, "/projects/audit/audit-log-list": { "get": { "tags": [ "审计日志执行相关操作" ], "summary": "queryAuditLogListPaging", "description": "查询审计日志", "operationId": "queryAuditLogListPaging", "parameters": [ { "name": "pageNo", "in": "query", "description": "页码号", "required": true, "schema": { "type": "integer", "format": "int32" } }, { "name": "pageSize", "in": "query", "description": "页大小", "required": true, "schema": { "type": "integer", "format": "int32" } }, { "name": "modelTypes", "in": "query", "required": false, "schema": { "type": "string" } }, { "name": "operationTypes", "in": "query", "description": "OPERATION_TYPES", "required": false, "schema": { "type": "string" } }, { "name": "startDate", "in": "query", "description": "开始时间", "required": false, "schema": { "type": "string" } }, { "name": "endDate", "in": "query", "description": "结束时间", "required": false, "schema": { "type": "string" } }, { "name": "userName", "in": "query", "description": "用户名", "required": false, "schema": { "type": "string" } }, { "name": "modelName", "in": "query", "required": false, "schema": { "type": "string" } }, { "name": "objectTypes", "description": "MODEL_TYPES", "schema": { "type": "string" } }, { "name": "objectName", "description": "MODEL_NAME", "schema": { "type": "string" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultPageInfoAuditDto" } } } } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/schemas/02_cluster_schemas.json: -------------------------------------------------------------------------------- ```json { "components": { "schemas": { "Cluster": { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" }, "code": { "type": "integer", "format": "int64" }, "name": { "type": "string" }, "config": { "type": "string" }, "description": { "type": "string" }, "operator": { "type": "integer", "format": "int32" }, "createTime": { "type": "string", "format": "date-time" }, "updateTime": { "type": "string", "format": "date-time" } } }, "ResultCluster": { "type": "object", "properties": { "code": { "type": "integer", "format": "int32" }, "msg": { "type": "string" }, "data": { "$ref": "#/components/schemas/Cluster" }, "failed": { "type": "boolean" }, "success": { "type": "boolean" } } }, "ClusterDto": { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" }, "code": { "type": "integer", "format": "int64" }, "name": { "type": "string" }, "config": { "type": "string" }, "description": { "type": "string" }, "processDefinitions": { "type": "array", "items": { "type": "string" } }, "operator": { "type": "integer", "format": "int32" }, "createTime": { "type": "string", "format": "date-time" }, "updateTime": { "type": "string", "format": "date-time" } } }, "ResultListClusterDto": { "type": "object", "properties": { "code": { "type": "integer", "format": "int32" }, "msg": { "type": "string" }, "data": { "type": "array", "items": { "$ref": "#/components/schemas/ClusterDto" } }, "failed": { "type": "boolean" }, "success": { "type": "boolean" } } }, "ResultClusterDto": { "type": "object", "properties": { "code": { "type": "integer", "format": "int32" }, "msg": { "type": "string" }, "data": { "$ref": "#/components/schemas/ClusterDto" }, "failed": { "type": "boolean" }, "success": { "type": "boolean" } } }, "PageInfoClusterDto": { "type": "object", "properties": { "totalList": { "type": "array", "items": { "$ref": "#/components/schemas/ClusterDto" } }, "total": { "type": "integer", "format": "int32" }, "totalPage": { "type": "integer", "format": "int32" }, "pageSize": { "type": "integer", "format": "int32" }, "currentPage": { "type": "integer", "format": "int32" }, "pageNo": { "type": "integer", "format": "int32" } } }, "ResultPageInfoClusterDto": { "type": "object", "properties": { "code": { "type": "integer", "format": "int32" }, "msg": { "type": "string" }, "data": { "$ref": "#/components/schemas/PageInfoClusterDto" }, "failed": { "type": "boolean" }, "success": { "type": "boolean" } } } } } } ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/tools_generated/worker_group_tools.py: -------------------------------------------------------------------------------- ```python """Tools for worker group operations in DolphinScheduler.""" from typing import Dict, List, Optional from ..fastmcp_compat import FastMCPTool from ..client import DolphinSchedulerClient class GetWorkerGroups(FastMCPTool): """Tool to worker分组管理""" name = "get_worker_groups" description = "Worker分组管理" is_async = True schema = { "type": "object", "properties": { "page_no": { "type": "integer", "format": "int32", "description": "\u9875\u7801\u53f7" }, "page_size": { "type": "integer", "format": "int32", "description": "\u9875\u5927\u5c0f" }, "search_val": { "type": "string", "description": "\u641c\u7d22\u503c" } }, "required": [ "page_no", "page_size" ] } async def _run(self, page_no, page_size, search_val) -> Dict: """Execute the GET operation on /worker-groups.""" client = DolphinSchedulerClient() try: params = { "pageNo": page_no, "pageSize": page_size, "searchVal": search_val, } response = await client.request( "GET", f"/worker-groups", params=params ) return {"success": True, "data": response} finally: await client.close() class CreateWorkerGroups(FastMCPTool): """Tool to 创建worker分组""" name = "create_worker_groups" description = "创建Worker分组" is_async = True schema = { "type": "object", "properties": { "id": { "type": "integer", "format": "int32", "description": "Worker Server\u5206\u7ec4ID" }, "name": { "type": "string", "description": "Worker\u5206\u7ec4\u540d\u79f0" }, "addr_list": { "type": "string", "description": "worker\u5730\u5740\u5217\u8868" }, "description": { "type": "string", "description": "WORKER_DESC" }, "other_params_json": { "type": "string", "description": "WORKER_PARAMS_JSON" } }, "required": [ "name", "addr_list" ] } async def _run(self, id, name, addr_list, description, other_params_json) -> Dict: """Execute the POST operation on /worker-groups.""" client = DolphinSchedulerClient() try: params = { "id": id, "name": name, "addrList": addr_list, "description": description, "otherParamsJson": other_params_json, } response = await client.request( "POST", f"/worker-groups", params=params ) return {"success": True, "data": response} finally: await client.close() class GetWorkerGroupsWorkerAddressList(FastMCPTool): """Tool to 查询worker地址列表""" name = "get_worker_groups_worker_address_list" description = "查询worker地址列表" is_async = True async def _run(self) -> Dict: """Execute the GET operation on /worker-groups/worker-address-list.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/worker-groups/worker-address-list" ) return {"success": True, "data": response} finally: await client.close() class GetWorkerGroupsAll(FastMCPTool): """Tool to 查询worker group分组""" name = "get_worker_groups_all" description = "查询worker group分组" is_async = True async def _run(self) -> Dict: """Execute the GET operation on /worker-groups/all.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/worker-groups/all" ) return {"success": True, "data": response} finally: await client.close() class DeleteWorkerGroups(FastMCPTool): """Tool to 通过id删除worker group""" name = "delete_worker_groups" description = "通过ID删除worker group" is_async = True schema = { "type": "object", "properties": { "id": { "type": "integer", "format": "int32", "description": "Worker Server\u5206\u7ec4ID" } }, "required": [ "id" ] } async def _run(self, id) -> Dict: """Execute the DELETE operation on /worker-groups/{id}. Args: id: Worker分组ID Returns: API响应 """ client = DolphinSchedulerClient() try: # 确保路径参数正确处理 worker_group_id = int(id) if id is not None else None if worker_group_id is None: return { "success": False, "error": "Missing required parameter: id" } response = await client.request( "DELETE", f"/worker-groups/{worker_group_id}" ) return {"success": True, "data": response} except ValueError: return { "success": False, "error": f"Invalid ID format: {id}" } except Exception as e: return { "success": False, "error": str(e) } finally: await client.close() def register_worker_group_tools(mcp): """Register worker group tool with FastMCP. Args: mcp: The FastMCP instance to register tools with. """ from ..fastmcp_compat import register_tool_class register_tool_class(mcp, CreateWorkerGroups) register_tool_class(mcp, DeleteWorkerGroups) register_tool_class(mcp, GetWorkerGroups) register_tool_class(mcp, GetWorkerGroupsAll) register_tool_class(mcp, GetWorkerGroupsWorkerAddressList) ``` -------------------------------------------------------------------------------- /mcp-openapi-split/paths/21_log_api.json: -------------------------------------------------------------------------------- ```json { "paths": { "/log/{projectCode}/download-log": { "get": { "tags": [ "日志相关操作" ], "summary": "downloadTaskLogInSpecifiedProject", "description": "下载指定项目的任务实例日志", "operationId": "downloadTaskLog", "parameters": [ { "name": "projectCode", "in": "path", "description": "项目Code", "required": true, "schema": { "type": "integer", "format": "int64" } }, { "name": "taskInstanceId", "in": "query", "description": "任务实例ID", "required": true, "schema": { "type": "integer", "format": "int32" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "type": "string" } } } } } } }, "/log/{projectCode}/detail": { "get": { "tags": [ "日志相关操作" ], "summary": "queryLogInSpecifiedProject", "description": "查询指定项目的任务实例日志", "operationId": "queryLog", "parameters": [ { "name": "projectCode", "in": "path", "description": "项目Code", "required": true, "schema": { "type": "integer", "format": "int64" } }, { "name": "taskInstanceId", "in": "query", "description": "任务实例ID", "required": true, "schema": { "type": "integer", "format": "int32" } }, { "name": "skipLineNum", "in": "query", "description": "忽略行数", "required": true, "schema": { "type": "integer", "format": "int32" } }, { "name": "limit", "in": "query", "description": "显示多少条", "required": true, "schema": { "type": "integer", "format": "int32" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultString" } } } } } } }, "/log/download-log": { "get": { "tags": [ "日志相关操作" ], "summary": "downloadTaskLog", "description": "下载任务实例日志", "operationId": "downloadTaskLog_1", "parameters": [ { "name": "taskInstanceId", "in": "query", "description": "任务实例ID", "required": true, "schema": { "type": "integer", "format": "int32" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "type": "string" } } } } } } }, "/log/detail": { "get": { "tags": [ "日志相关操作" ], "summary": "queryLog", "description": "查询任务实例日志", "operationId": "queryLog_1", "parameters": [ { "name": "taskInstanceId", "in": "query", "description": "任务实例ID", "required": true, "schema": { "type": "integer", "format": "int32" } }, { "name": "skipLineNum", "in": "query", "description": "忽略行数", "required": true, "schema": { "type": "integer", "format": "int32" } }, { "name": "limit", "in": "query", "description": "显示多少条", "required": true, "schema": { "type": "integer", "format": "int32" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultResponseTaskLog" } } } } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/paths/06_worker_group_api.json: -------------------------------------------------------------------------------- ```json { "paths": { "/worker-groups": { "get": { "tags": [ "Worker分组管理" ], "summary": "queryAllWorkerGroupsPaging", "description": "Worker分组管理", "operationId": "queryAllWorkerGroupsPaging", "parameters": [ { "name": "pageNo", "in": "query", "description": "页码号", "required": true, "schema": { "type": "integer", "format": "int32" } }, { "name": "pageSize", "in": "query", "description": "页大小", "required": true, "schema": { "type": "integer", "format": "int32" } }, { "name": "searchVal", "in": "query", "description": "搜索值", "required": false, "schema": { "type": "string" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } }, "post": { "tags": [ "Worker分组管理" ], "summary": "saveWorkerGroup", "description": "创建Worker分组", "operationId": "saveWorkerGroup", "parameters": [ { "name": "id", "in": "query", "description": "Worker Server分组ID", "required": false, "schema": { "type": "integer", "format": "int32" } }, { "name": "name", "in": "query", "description": "Worker分组名称", "required": true, "schema": { "type": "string" } }, { "name": "addrList", "in": "query", "description": "worker地址列表", "required": true, "schema": { "type": "string" } }, { "name": "description", "in": "query", "description": "WORKER_DESC", "required": false, "schema": { "type": "string" } }, { "name": "otherParamsJson", "in": "query", "description": "WORKER_PARAMS_JSON", "required": false, "schema": { "type": "string" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } }, "/worker-groups/worker-address-list": { "get": { "tags": [ "Worker分组管理" ], "summary": "queryWorkerAddressList", "description": "查询worker地址列表", "operationId": "queryWorkerAddressList", "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } }, "/worker-groups/all": { "get": { "tags": [ "Worker分组管理" ], "summary": "queryAllWorkerGroups", "description": "查询worker group分组", "operationId": "queryAllWorkerGroups", "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } }, "/worker-groups/{id}": { "delete": { "tags": [ "Worker分组管理" ], "summary": "deleteWorkerGroupById", "description": "通过ID删除worker group", "operationId": "deleteWorkerGroupById", "parameters": [ { "name": "id", "in": "path", "description": "Worker Server分组ID", "required": true, "schema": { "type": "integer", "format": "int32" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } } } } ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/tools_generated/lineage_tools.py: -------------------------------------------------------------------------------- ```python """Tools for lineage operations in DolphinScheduler.""" from typing import Dict, List, Optional from ..fastmcp_compat import FastMCPTool from ..client import DolphinSchedulerClient class CreateProjectsLineagesTasksVerifyDelete(FastMCPTool): """Tool to 校验是否可以删除任务""" name = "create_projects_lineages_tasks_verify_delete" description = "校验是否可以删除任务" is_async = True schema = { "type": "object", "properties": { "project_code": { "type": "integer", "format": "int64", "description": "\u9879\u76eeCode" }, "process_definition_code": { "type": "integer", "format": "int64", "description": "\u6d41\u7a0b\u5b9a\u4e49\u7f16\u7801" }, "task_code": { "type": "integer", "format": "int64", "description": "\u4efb\u52a1\u5b9a\u4e49\u4ee3\u7801" } }, "required": [ "project_code", "process_definition_code", "task_code" ] } async def _run(self, project_code, process_definition_code, task_code) -> Dict: """Execute the POST operation on /projects/{projectCode}/lineages/tasks/verify-delete.""" client = DolphinSchedulerClient() try: params = { "processDefinitionCode": process_definition_code, "taskCode": task_code, } response = await client.request( "POST", f"/projects/{project_code}/lineages/tasks/verify-delete", params=params ) return {"success": True, "data": response} finally: await client.close() class ListProjectsLineages(FastMCPTool): """Tool to 通过血缘代码查询工作流血缘关系""" name = "list_projects_lineages" description = "通过血缘代码查询工作流血缘关系" is_async = True schema = { "type": "object", "properties": { "project_code": { "type": "integer", "format": "int64", "description": "\u9879\u76eeCode" }, "work_flow_code": { "type": "integer", "format": "int64" } }, "required": [ "project_code", "work_flow_code" ] } async def _run(self, project_code, work_flow_code) -> Dict: """Execute the GET operation on /projects/{projectCode}/lineages/{workFlowCode}.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/projects/{project_code}/lineages/{work_flow_code}" ) return {"success": True, "data": response} finally: await client.close() class ListProjectsLineagesQueryDependentTasks(FastMCPTool): """Tool to query_downstream_dependent_task_notes""" name = "list_projects_lineages_query_dependent_tasks" description = "QUERY_DOWNSTREAM_DEPENDENT_TASK_NOTES" is_async = True schema = { "type": "object", "properties": { "work_flow_code": { "type": "integer", "format": "int64", "description": "\u6d41\u7a0b\u5b9a\u4e49\u7f16\u7801" }, "task_code": { "type": "integer", "format": "int64", "description": "\u4efb\u52a1\u5b9a\u4e49\u4ee3\u7801" } }, "required": [ "work_flow_code" ] } async def _run(self, work_flow_code, task_code) -> Dict: """Execute the GET operation on /projects/{projectCode}/lineages/query-dependent-tasks.""" client = DolphinSchedulerClient() try: params = { "workFlowCode": work_flow_code, "taskCode": task_code, } response = await client.request( "GET", f"/projects/{project_code}/lineages/query-dependent-tasks", params=params ) return {"success": True, "data": response} finally: await client.close() class GetProjectsLineagesQueryByName(FastMCPTool): """Tool to 通过名称查询工作流血缘列表""" name = "get_projects_lineages_query_by_name" description = "通过名称查询工作流血缘列表" is_async = True schema = { "type": "object", "properties": { "project_code": { "type": "integer", "format": "int64", "description": "\u9879\u76eeCode" }, "work_flow_name": { "type": "string" } }, "required": [ "project_code" ] } async def _run(self, project_code, work_flow_name) -> Dict: """Execute the GET operation on /projects/{projectCode}/lineages/query-by-name.""" client = DolphinSchedulerClient() try: params = { "workFlowName": work_flow_name, } response = await client.request( "GET", f"/projects/{project_code}/lineages/query-by-name", params=params ) return {"success": True, "data": response} finally: await client.close() class ListProjectsLineagesList(FastMCPTool): """Tool to 查询工作量血缘关系""" name = "list_projects_lineages_list" description = "查询工作量血缘关系" is_async = True schema = { "type": "object", "properties": { "project_code": { "type": "integer", "format": "int64", "description": "\u9879\u76eeCode" } }, "required": [ "project_code" ] } async def _run(self, project_code) -> Dict: """Execute the GET operation on /projects/{projectCode}/lineages/list.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/projects/{project_code}/lineages/list" ) return {"success": True, "data": response} finally: await client.close() def register_lineage_tools(mcp): """Register lineage tools with FastMCP. Args: mcp: The FastMCP instance to register tools with. """ from ..fastmcp_compat import register_tool_class register_tool_class(mcp, CreateProjectsLineagesTasksVerifyDelete) register_tool_class(mcp, GetProjectsLineagesQueryByName) register_tool_class(mcp, ListProjectsLineages) register_tool_class(mcp, ListProjectsLineagesList) register_tool_class(mcp, ListProjectsLineagesQueryDependentTasks) ``` -------------------------------------------------------------------------------- /mcp-openapi-split/paths/18_workflow_lineage_api.json: -------------------------------------------------------------------------------- ```json { "paths": { "/projects/{projectCode}/lineages/tasks/verify-delete": { "post": { "tags": [ "工作流血缘相关操作" ], "summary": "verifyTaskCanDelete", "description": "校验是否可以删除任务", "operationId": "verifyTaskCanDelete", "parameters": [ { "name": "projectCode", "in": "path", "description": "项目Code", "required": true, "schema": { "type": "integer", "format": "int64" } }, { "name": "processDefinitionCode", "in": "query", "description": "流程定义编码", "required": true, "schema": { "type": "integer", "format": "int64" } }, { "name": "taskCode", "in": "query", "description": "任务定义代码", "required": true, "schema": { "type": "integer", "format": "int64" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/Result" } } } } } } }, "/projects/{projectCode}/lineages/{workFlowCode}": { "get": { "tags": [ "工作流血缘相关操作" ], "summary": "queryLineageByWorkFlowCode", "description": "通过血缘代码查询工作流血缘关系", "operationId": "queryWorkFlowLineageByCode", "parameters": [ { "name": "projectCode", "in": "path", "description": "项目Code", "required": true, "schema": { "type": "integer", "format": "int64" } }, { "name": "workFlowCode", "in": "path", "required": true, "schema": { "type": "integer", "format": "int64" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultMapStringObject" } } } } } } }, "/projects/{projectCode}/lineages/query-dependent-tasks": { "get": { "tags": [ "工作流血缘相关操作" ], "summary": "queryDownstreamDependentTaskList", "description": "QUERY_DOWNSTREAM_DEPENDENT_TASK_NOTES", "operationId": "queryDownstreamDependentTaskList", "parameters": [ { "name": "workFlowCode", "in": "query", "description": "流程定义编码", "required": true, "schema": { "type": "integer", "format": "int64" } }, { "name": "taskCode", "in": "query", "description": "任务定义代码", "required": false, "schema": { "type": "integer", "format": "int64" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultMapStringObject" } } } } } } }, "/projects/{projectCode}/lineages/query-by-name": { "get": { "tags": [ "工作流血缘相关操作" ], "summary": "queryLineageByWorkFlowName", "description": "通过名称查询工作流血缘列表", "operationId": "queryWorkFlowLineageByName", "parameters": [ { "name": "projectCode", "in": "path", "description": "项目Code", "required": true, "schema": { "type": "integer", "format": "int64" } }, { "name": "workFlowName", "in": "query", "required": false, "schema": { "type": "string" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultListWorkFlowLineage" } } } } } } }, "/projects/{projectCode}/lineages/list": { "get": { "tags": [ "工作流血缘相关操作" ], "summary": "queryWorkFlowList", "description": "查询工作量血缘关系", "operationId": "queryWorkFlowLineage", "parameters": [ { "name": "projectCode", "in": "path", "description": "项目Code", "required": true, "schema": { "type": "integer", "format": "int64" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultMapStringObject" } } } } } } } } } ``` -------------------------------------------------------------------------------- /mcp-openapi-split/paths/32_queue_api.json: -------------------------------------------------------------------------------- ```json { "paths": { "/queues/{id}": { "put": { "tags": [ "队列相关操作" ], "summary": "updateQueue", "description": "更新队列", "operationId": "updateQueue", "parameters": [ { "name": "id", "in": "path", "description": "队列ID", "required": true, "schema": { "type": "integer", "format": "int32" } }, { "name": "queue", "in": "query", "description": "hadoop yarn队列名", "required": true, "schema": { "type": "string" } }, { "name": "queueName", "in": "query", "description": "队列名", "required": true, "schema": { "type": "string" } } ], "responses": { "201": { "description": "Created", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultQueue" } } } } } }, "delete": { "tags": [ "队列相关操作" ], "summary": "deleteQueueById", "description": "DELETE_QUEUE_NOTES", "operationId": "deleteQueueById", "parameters": [ { "name": "id", "in": "path", "description": "队列ID", "required": true, "schema": { "type": "integer", "format": "int32" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultBoolean" } } } } } } }, "/queues": { "get": { "tags": [ "队列相关操作" ], "summary": "queryQueueListPaging", "description": "分页查询队列列表", "operationId": "queryQueueListPaging", "parameters": [ { "name": "pageNo", "in": "query", "description": "页码号", "required": true, "schema": { "type": "integer", "format": "int32" } }, { "name": "searchVal", "in": "query", "description": "搜索值", "required": false, "schema": { "type": "string" } }, { "name": "pageSize", "in": "query", "description": "页大小", "required": true, "schema": { "type": "integer", "format": "int32" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultPageInfoQueue" } } } } } }, "post": { "tags": [ "队列相关操作" ], "summary": "createQueue", "description": "创建队列", "operationId": "createQueue", "parameters": [ { "name": "queue", "in": "query", "description": "hadoop yarn队列名", "required": true, "schema": { "type": "string" } }, { "name": "queueName", "in": "query", "description": "队列名", "required": true, "schema": { "type": "string" } } ], "responses": { "201": { "description": "Created", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultQueue" } } } } } } }, "/queues/verify": { "post": { "tags": [ "队列相关操作" ], "summary": "verifyQueue", "description": "验证队列", "operationId": "verifyQueue", "parameters": [ { "name": "queue", "in": "query", "description": "hadoop yarn队列名", "required": true, "schema": { "type": "string" } }, { "name": "queueName", "in": "query", "description": "队列名", "required": true, "schema": { "type": "string" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultBoolean" } } } } } } }, "/queues/list": { "get": { "tags": [ "队列相关操作" ], "summary": "queryList", "description": "查询队列列表", "operationId": "queryList", "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultListQueue" } } } } } } } } } ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/tools_generated/k8s_namespace_tools.py: -------------------------------------------------------------------------------- ```python """Tools for k8s namespace operations in DolphinScheduler.""" from typing import Dict, List, Optional from ..fastmcp_compat import FastMCPTool from ..client import DolphinSchedulerClient class GetK8sNamespace(FastMCPTool): """Tool to 查询命名空间列表页面""" name = "get_k8s_namespace" description = "查询命名空间列表页面" is_async = True schema = { "type": "object", "properties": { "searchVal": { "type": "string", "description": "\u641c\u7d22\u503c" }, "pageSize": { "type": "integer", "format": "int32", "description": "\u9875\u5927\u5c0f" }, "pageNo": { "type": "integer", "format": "int32", "description": "\u9875\u7801\u53f7" } }, "required": [ "pageSize", "pageNo" ] } async def _run(self, searchVal, pageSize, pageNo) -> Dict: """Execute the GET operation on /k8s-namespace.""" client = DolphinSchedulerClient() try: params = { "searchVal": searchVal, "pageSize": pageSize, "pageNo": pageNo, } response = await client.request( "GET", f"/k8s-namespace", params=params ) return {"success": True, "data": response} finally: await client.close() class CreateK8sNamespace(FastMCPTool): """Tool to 创建命名空间""" name = "create_k8s_namespace" description = "创建命名空间" is_async = True schema = { "type": "object", "properties": { "namespace": { "type": "string", "description": "\u547d\u540d\u7a7a\u95f4" }, "clusterCode": { "type": "integer", "format": "int64", "description": "\u96c6\u7fa4\u4ee3\u7801" } }, "required": [ "namespace", "clusterCode" ] } async def _run(self, namespace, clusterCode) -> Dict: """Execute the POST operation on /k8s-namespace.""" client = DolphinSchedulerClient() try: params = { "namespace": namespace, "clusterCode": clusterCode, } response = await client.request( "POST", f"/k8s-namespace", params=params ) return {"success": True, "data": response} finally: await client.close() class CreateK8sNamespaceVerify(FastMCPTool): """Tool to 校验命名空间""" name = "create_k8s_namespace_verify" description = "校验命名空间" is_async = True schema = { "type": "object", "properties": { "namespace": { "type": "string", "description": "\u547d\u540d\u7a7a\u95f4" }, "clusterCode": { "type": "integer", "format": "int64", "description": "\u96c6\u7fa4\u4ee3\u7801" } }, "required": [ "namespace", "clusterCode" ] } async def _run(self, namespace, clusterCode) -> Dict: """Execute the POST operation on /k8s-namespace/verify.""" client = DolphinSchedulerClient() try: params = { "namespace": namespace, "clusterCode": clusterCode, } response = await client.request( "POST", f"/k8s-namespace/verify", params=params ) return {"success": True, "data": response} finally: await client.close() class CreateK8sNamespaceDelete(FastMCPTool): """Tool to 通过id删除命名空间""" name = "create_k8s_namespace_delete" description = "通过ID删除命名空间" is_async = True schema = { "type": "object", "properties": { "id": { "type": "integer", "format": "int32", "description": "k8s\u547d\u540d\u7a7a\u95f4ID" } }, "required": [ "id" ] } async def _run(self, id) -> Dict: """Execute the POST operation on /k8s-namespace/delete.""" client = DolphinSchedulerClient() try: params = { "id": id, } response = await client.request( "POST", f"/k8s-namespace/delete", params=params ) return {"success": True, "data": response} finally: await client.close() class GetK8sNamespaceUnauthNamespace(FastMCPTool): """Tool to 查询未授权命名空间""" name = "get_k8s_namespace_unauth_namespace" description = "查询未授权命名空间" is_async = True schema = { "type": "object", "properties": { "userId": { "type": "integer", "format": "int32", "description": "\u7528\u6237ID" } }, "required": [ "userId" ] } async def _run(self, userId) -> Dict: """Execute the GET operation on /k8s-namespace/unauth-namespace.""" client = DolphinSchedulerClient() try: params = { "userId": userId, } response = await client.request( "GET", f"/k8s-namespace/unauth-namespace", params=params ) return {"success": True, "data": response} finally: await client.close() class GetK8sNamespaceAvailableNamespaceList(FastMCPTool): """Tool to 获取可用命名空间列表""" name = "get_k8s_namespace_available_namespace_list" description = "获取可用命名空间列表" is_async = True async def _run(self) -> Dict: """Execute the GET operation on /k8s-namespace/available-namespace-list.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/k8s-namespace/available-namespace-list" ) return {"success": True, "data": response} finally: await client.close() class GetK8sNamespaceAuthorizedNamespace(FastMCPTool): """Tool to 获取命名空间列表""" name = "get_k8s_namespace_authorized_namespace" description = "获取命名空间列表" is_async = True schema = { "type": "object", "properties": { "userId": { "type": "integer", "format": "int32", "description": "\u7528\u6237ID" } }, "required": [ "userId" ] } async def _run(self, userId) -> Dict: """Execute the GET operation on /k8s-namespace/authorized-namespace.""" client = DolphinSchedulerClient() try: params = { "userId": userId, } response = await client.request( "GET", f"/k8s-namespace/authorized-namespace", params=params ) return {"success": True, "data": response} finally: await client.close() def register_k8s_namespace_tools(mcp): """Register k8s namespace tools with FastMCP. Args: mcp: The FastMCP instance to register tools with. """ from ..fastmcp_compat import register_tool_class register_tool_class(mcp, CreateK8sNamespace) register_tool_class(mcp, CreateK8sNamespaceDelete) register_tool_class(mcp, CreateK8sNamespaceVerify) register_tool_class(mcp, GetK8sNamespace) register_tool_class(mcp, GetK8sNamespaceAuthorizedNamespace) register_tool_class(mcp, GetK8sNamespaceAvailableNamespaceList) register_tool_class(mcp, GetK8sNamespaceUnauthNamespace) ``` -------------------------------------------------------------------------------- /src/dolphinscheduler_mcp/tools_generated/access_token_tools.py: -------------------------------------------------------------------------------- ```python """Tools for access token operations in DolphinScheduler.""" from typing import Dict, List, Optional from ..fastmcp_compat import FastMCPTool from ..client import DolphinSchedulerClient class UpdateAccessTokens(FastMCPTool): """Tool to 更新指定用户的安全令牌""" name = "update_access_tokens" description = "更新指定用户的安全令牌" is_async = True schema = { "type": "object", "properties": { "id": { "type": "integer", "format": "int32", "description": "\u5b89\u5168\u4ee4\u724c\u7684ID" }, "user_id": { "type": "integer", "format": "int32", "description": "\u7528\u6237ID" }, "expire_time": { "type": "string", "description": "\u5b89\u5168\u4ee4\u724c\u7684\u8fc7\u671f\u65f6\u95f4" }, "token": { "type": "string", "description": "\u5b89\u5168\u4ee4\u724c\u5b57\u7b26\u4e32\uff0c\u82e5\u672a\u663e\u5f0f\u6307\u5b9a\u5c06\u4f1a\u81ea\u52a8\u751f\u6210" } }, "required": [ "id", "user_id", "expire_time" ] } async def _run(self, id, user_id, expire_time, token) -> Dict: """Execute the PUT operation on /access-tokens/{id}.""" client = DolphinSchedulerClient() try: params = { "userId": user_id, "expireTime": expire_time, "token": token, } response = await client.request( "PUT", f"/access-tokens/{id}", params=params ) return {"success": True, "data": response} finally: await client.close() class DeleteAccessTokens(FastMCPTool): """Tool to delete_token_notes""" name = "delete_access_tokens" description = "DELETE_TOKEN_NOTES" is_async = True schema = { "type": "object", "properties": { "id": { "type": "integer", "format": "int32" } }, "required": [ "id" ] } async def _run(self, id) -> Dict: """Execute the DELETE operation on /access-tokens/{id}.""" client = DolphinSchedulerClient() try: response = await client.request( "DELETE", f"/access-tokens/{id}" ) return {"success": True, "data": response} finally: await client.close() class GetAccessTokens(FastMCPTool): """Tool to 分页查询access token列表""" name = "get_access_tokens" description = "分页查询access token列表" is_async = True schema = { "type": "object", "properties": { "page_no": { "type": "integer", "format": "int32", "description": "\u9875\u7801\u53f7" }, "search_val": { "type": "string", "description": "\u641c\u7d22\u503c" }, "page_size": { "type": "integer", "format": "int32", "description": "\u9875\u5927\u5c0f" } }, "required": [ "page_no", "page_size" ] } async def _run(self, page_no, search_val, page_size) -> Dict: """Execute the GET operation on /access-tokens.""" client = DolphinSchedulerClient() try: params = { "pageNo": page_no, "searchVal": search_val, "pageSize": page_size, } response = await client.request( "GET", f"/access-tokens", params=params ) return {"success": True, "data": response} finally: await client.close() class CreateAccessTokens(FastMCPTool): """Tool to 为指定用户创建安全令牌""" name = "create_access_tokens" description = "为指定用户创建安全令牌" is_async = True schema = { "type": "object", "properties": { "user_id": { "type": "integer", "format": "int32", "description": "\u7528\u6237ID" }, "expire_time": { "type": "string", "description": "\u5b89\u5168\u4ee4\u724c\u7684\u8fc7\u671f\u65f6\u95f4" }, "token": { "type": "string", "description": "\u5b89\u5168\u4ee4\u724c\u5b57\u7b26\u4e32\uff0c\u82e5\u672a\u663e\u5f0f\u6307\u5b9a\u5c06\u4f1a\u81ea\u52a8\u751f\u6210" } }, "required": [ "user_id", "expire_time" ] } async def _run(self, user_id, expire_time, token) -> Dict: """Execute the POST operation on /access-tokens.""" client = DolphinSchedulerClient() try: params = { "userId": user_id, "expireTime": expire_time, "token": token, } response = await client.request( "POST", f"/access-tokens", params=params ) return {"success": True, "data": response} finally: await client.close() class CreateAccessTokensGenerate(FastMCPTool): """Tool to no description available.""" name = "create_access_tokens_generate" description = "No description available." is_async = True schema = { "type": "object", "properties": { "user_id": { "type": "integer", "format": "int32" }, "expire_time": { "type": "string" } }, "required": [ "user_id", "expire_time" ] } async def _run(self, user_id, expire_time) -> Dict: """Execute the POST operation on /access-tokens/generate.""" client = DolphinSchedulerClient() try: params = { "userId": user_id, "expireTime": expire_time, } response = await client.request( "POST", f"/access-tokens/generate", params=params ) return {"success": True, "data": response} finally: await client.close() class GetAccessTokensUser(FastMCPTool): """Tool to 查询指定用户的access token""" name = "get_access_tokens_user" description = "查询指定用户的access token" is_async = True schema = { "type": "object", "properties": { "user_id": { "type": "integer", "format": "int32", "description": "\u7528\u6237ID" } }, "required": [ "user_id" ] } async def _run(self, user_id) -> Dict: """Execute the GET operation on /access-tokens/user/{userId}.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/access-tokens/user/{user_id}" ) return {"success": True, "data": response} finally: await client.close() def register_access_token_tools(mcp): """Register access token tools with FastMCP. Args: mcp: The FastMCP instance to register tools with. """ from ..fastmcp_compat import register_tool_class register_tool_class(mcp, CreateAccessTokens) register_tool_class(mcp, CreateAccessTokensGenerate) register_tool_class(mcp, DeleteAccessTokens) register_tool_class(mcp, GetAccessTokens) register_tool_class(mcp, GetAccessTokensUser) register_tool_class(mcp, UpdateAccessTokens) ``` -------------------------------------------------------------------------------- /mcp-openapi-split/paths/29_tenant_api.json: -------------------------------------------------------------------------------- ```json { "paths": { "/tenants/{id}": { "put": { "tags": [ "租户相关操作" ], "summary": "updateTenant", "description": "更新租户", "operationId": "updateTenant", "parameters": [ { "name": "id", "in": "path", "description": "租户ID", "required": true, "schema": { "type": "integer", "format": "int32" } }, { "name": "tenantCode", "in": "query", "description": "操作系统租户", "required": true, "schema": { "type": "string" } }, { "name": "queueId", "in": "query", "description": "队列ID", "required": true, "schema": { "type": "integer", "format": "int32" } }, { "name": "description", "in": "query", "description": "租户描述", "required": false, "schema": { "type": "string" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultBoolean" } } } } } }, "delete": { "tags": [ "租户相关操作" ], "summary": "deleteTenantById", "description": "删除租户", "operationId": "deleteTenantById", "parameters": [ { "name": "id", "in": "path", "description": "租户ID", "required": true, "schema": { "type": "integer", "format": "int32" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultBoolean" } } } } } } }, "/tenants": { "get": { "tags": [ "租户相关操作" ], "summary": "queryTenantListPaging", "description": "分页查询租户列表", "operationId": "queryTenantListPaging", "parameters": [ { "name": "searchVal", "in": "query", "description": "搜索值", "required": false, "schema": { "type": "string" } }, { "name": "pageNo", "in": "query", "description": "页码号", "required": true, "schema": { "type": "integer", "format": "int32" } }, { "name": "pageSize", "in": "query", "description": "页大小", "required": true, "schema": { "type": "integer", "format": "int32" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultPageInfoTenant" } } } } } }, "post": { "tags": [ "租户相关操作" ], "summary": "createTenant", "description": "创建租户", "operationId": "createTenant", "parameters": [ { "name": "tenantCode", "in": "query", "description": "操作系统租户", "required": true, "schema": { "type": "string" } }, { "name": "queueId", "in": "query", "description": "队列ID", "required": true, "schema": { "type": "integer", "format": "int32" } }, { "name": "description", "in": "query", "description": "租户描述", "required": false, "schema": { "type": "string" } } ], "responses": { "201": { "description": "Created", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultTenant" } } } } } } }, "/tenants/verify-code": { "get": { "tags": [ "租户相关操作" ], "summary": "verifyTenantCode", "description": "验证租户", "operationId": "verifyTenantCode", "parameters": [ { "name": "tenantCode", "in": "query", "description": "操作系统租户", "required": true, "schema": { "type": "string" } } ], "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultBoolean" } } } } } } }, "/tenants/list": { "get": { "tags": [ "租户相关操作" ], "summary": "queryTenantList", "description": "查询租户列表", "operationId": "queryTenantList", "responses": { "200": { "description": "OK", "content": { "*/*": { "schema": { "$ref": "#/components/schemas/ResultListTenant" } } } } } } } } } ```