This is page 1 of 2. Use http://codebase.md/enesbol/gcp-mcp?lines=true&page={x} to view the full context. # Directory Structure ``` ├── .gitignore ├── .python-version ├── Dockerfile ├── LICENSE ├── pyproject.toml ├── README.md ├── requirements.txt ├── smithery.yaml ├── src │ └── gcp-mcp-server │ ├── __init__.py │ ├── core │ │ ├── __init__.py │ │ ├── auth.py │ │ ├── context.py │ │ ├── logging_handler.py │ │ ├── security.py │ │ └── server.py │ ├── main.py │ ├── prompts │ │ ├── __init__.py │ │ └── common.py │ └── services │ ├── __init__.py │ ├── artifact_registry.py │ ├── client_instances.py │ ├── cloud_audit_logs.py │ ├── cloud_bigquery.py │ ├── cloud_build.py │ ├── cloud_compute_engine.py │ ├── cloud_monitoring.py │ ├── cloud_run.py │ ├── cloud_storage.py │ └── README.md ├── utils │ ├── __init__.py │ └── helpers.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 1 | 3.13 ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` 1 | # Python-generated files 2 | __pycache__/ 3 | *.py[oc] 4 | build/ 5 | dist/ 6 | wheels/ 7 | *.egg-info 8 | 9 | # Virtual environments 10 | .venv ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/README.md: -------------------------------------------------------------------------------- ```markdown 1 | # GCP Services MCP Implementation 2 | 3 | This repository contains Model Context Protocol (MCP) implementations for various Google Cloud Platform (GCP) services. These modules expose GCP resources, tools, and prompts in a standardized way for AI assistants to interact with GCP infrastructure. 4 | 5 | ## Overview 6 | 7 | The MCP framework provides a consistent way to expose cloud resources to AI assistants. Each service module follows a common pattern: 8 | 9 | 1. A `register` function that takes an MCP instance as a parameter 10 | 2. Resources for retrieving information about GCP resources 11 | 3. Tools for performing operations on GCP resources 12 | 4. Prompts for guiding users through common tasks 13 | 14 | ## Implementation Pattern 15 | 16 | All service modules follow a consistent implementation pattern: 17 | 18 | ```python 19 | import json 20 | from services import client_instances 21 | from google.cloud import service_client 22 | 23 | def register(mcp_instance): 24 | """Register all resources and tools with the MCP instance.""" 25 | 26 | # Resources 27 | @mcp_instance.resource("gcp://service/{project_id}/resources") 28 | def list_resources(project_id: str = None) -> str: 29 | # Implement resource listing 30 | pass 31 | 32 | # Tools 33 | @mcp_instance.tool() 34 | def create_resource(resource_name: str, project_id: str = None) -> str: 35 | # Implement resource creation 36 | pass 37 | 38 | # Prompts 39 | @mcp_instance.prompt() 40 | def configuration_help() -> str: 41 | # Return helpful prompt text 42 | return """Help text here""" 43 | ``` 44 | 45 | ## Key Design Principles 46 | 47 | ### 1. Client Management 48 | 49 | Clients are accessed through a central `client_instances` module: 50 | 51 | ```python 52 | client = client_instances.get_clients().service_name 53 | ``` 54 | 55 | This approach: 56 | - Centralizes client creation and authentication 57 | - Avoids duplicating client instantiation logic 58 | - Enables consistent credential management 59 | 60 | ### 2. Parameter Defaults 61 | 62 | Required parameters come first, followed by optional parameters with sensible defaults: 63 | 64 | ```python 65 | def create_resource( 66 | resource_name: str, # Required parameter first 67 | project_id: str = None, # Optional parameters with defaults 68 | location: str = None 69 | ) -> str: 70 | # Use defaults from client_instances when not provided 71 | project_id = project_id or client_instances.get_project_id() 72 | location = location or client_instances.get_location() 73 | ``` 74 | 75 | ### 3. Error Handling 76 | 77 | All functions include consistent error handling: 78 | 79 | ```python 80 | try: 81 | # Implementation code 82 | return json.dumps(result, indent=2) 83 | except Exception as e: 84 | return json.dumps({"error": str(e)}, indent=2) 85 | ``` 86 | 87 | ### 4. JSON Responses 88 | 89 | All responses are consistently formatted JSON strings with proper indentation: 90 | 91 | ```python 92 | return json.dumps( 93 | { 94 | "status": "success", 95 | "resources": result, 96 | "count": len(result) 97 | }, 98 | indent=2 99 | ) 100 | ``` 101 | 102 | ### 5. Defensive Coding 103 | 104 | All implementations use defensive coding practices to handle potentially missing or null values: 105 | 106 | ```python 107 | "labels": dict(resource.labels) if resource.labels else {} 108 | ``` 109 | 110 | ## Available Services 111 | 112 | The following GCP services are implemented: 113 | 114 | 1. **BigQuery** - Data warehouse operations 115 | 2. **Cloud Storage** - Object storage operations 116 | 3. **Cloud Run** - Serverless container management 117 | 4. **Cloud Build** - CI/CD pipeline management 118 | 5. **Artifact Registry** - Container and package registry 119 | 6. **Compute Engine** - VM instance management 120 | 7. **Cloud Monitoring** - Metrics and alerting 121 | 8. **Cloud Audit Logs** - Logging and auditing 122 | 123 | ## Service-Specific Features 124 | 125 | ### BigQuery 126 | 127 | - Dataset and table management 128 | - Query execution 129 | - Data import/export 130 | 131 | ### Cloud Storage 132 | 133 | - Bucket management 134 | - Object operations (upload, download, delete) 135 | - Bucket configuration 136 | 137 | ### Cloud Run 138 | 139 | - Service deployment 140 | - Service configuration 141 | - Traffic management 142 | 143 | ### Cloud Build 144 | 145 | - Trigger management 146 | - Build execution 147 | - Build monitoring 148 | 149 | ### Artifact Registry 150 | 151 | - Repository management 152 | - Package operations 153 | 154 | ### Compute Engine 155 | 156 | - Instance lifecycle management 157 | - Instance configuration 158 | 159 | ### Cloud Monitoring 160 | 161 | - Metric exploration 162 | - Alert policy management 163 | - Notification channel configuration 164 | 165 | ### Cloud Audit Logs 166 | 167 | - Log querying 168 | - Log sink management 169 | 170 | ## Usage Example 171 | 172 | To register all services with an MCP instance: 173 | 174 | ```python 175 | from mcp.server.fastmcp import FastMCP 176 | from services import ( 177 | bigquery, 178 | storage, 179 | cloudrun, 180 | cloudbuild, 181 | artifactregistry, 182 | compute, 183 | monitoring, 184 | auditlogs 185 | ) 186 | 187 | # Create MCP instance 188 | mcp = FastMCP("GCP Services") 189 | 190 | # Register all services 191 | bigquery.register(mcp) 192 | storage.register(mcp) 193 | cloudrun.register(mcp) 194 | cloudbuild.register(mcp) 195 | artifactregistry.register(mcp) 196 | compute.register(mcp) 197 | monitoring.register(mcp) 198 | auditlogs.register(mcp) 199 | ``` 200 | 201 | ## Important Notes 202 | 203 | 1. **Default Project and Location**: 204 | - Services default to the project and location configured in `client_instances` 205 | - Always allow these to be overridden by parameters 206 | 207 | 2. **Authentication**: 208 | - Authentication is handled by the `client_instances` module 209 | - No credentials should be hardcoded in service implementations 210 | 211 | 3. **Error Handling**: 212 | - All operations should include robust error handling 213 | - Error responses should be informative but not expose sensitive information 214 | 215 | 4. **Response Formatting**: 216 | - All responses should be valid JSON with proper indentation 217 | - Success responses should include a "status": "success" field 218 | - Error responses should include a "status": "error" field with a "message" 219 | 220 | ## Best Practices for MCP Integration 221 | 222 | 1. **Resource Naming**: 223 | - Use consistent URI patterns for resources (gcp://service/{project_id}/resource) 224 | - Group related resources logically 225 | 226 | 2. **Tool Design**: 227 | - Design tools around specific user intents 228 | - Order parameters with required ones first, followed by optional ones 229 | - Provide sensible defaults for optional parameters 230 | 231 | 3. **Prompt Design**: 232 | - Create prompts for common user scenarios 233 | - Include enough context for the AI to provide helpful responses 234 | - Structure prompts as guiding questions or templates 235 | 236 | 4. **Documentation**: 237 | - Include descriptive docstrings for all resources, tools, and prompts 238 | - Document all parameters and return values 239 | - Include usage examples where appropriate 240 | 241 | ## Extending the Services 242 | 243 | To add a new GCP service: 244 | 245 | 1. Create a new module following the pattern above 246 | 2. Implement resources for retrieving information 247 | 3. Implement tools for operations 248 | 4. Create prompts for common user tasks 249 | 5. Register the new service in your MCP initialization 250 | 251 | ## Troubleshooting 252 | 253 | If you encounter issues: 254 | 255 | 1. Check client initialization in `client_instances` 256 | 2. Verify required permissions for the service operations 257 | 3. Look for additional error details in the returned JSON 258 | 4. Add print statements for debugging complex operations ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # This is not a Ready MCP Server 2 | 3 | 4 | 5 | # GCP MCP Server 6 | 7 | A comprehensive Model Context Protocol (MCP) server implementation for Google Cloud Platform (GCP) services, enabling AI assistants to interact with and manage GCP resources through a standardized interface. 8 | 9 | ## Overview 10 | 11 | GCP MCP Server provides AI assistants with capabilities to: 12 | 13 | - **Query GCP Resources**: Get information about your cloud infrastructure 14 | - **Manage Cloud Resources**: Create, configure, and manage GCP services 15 | - **Receive Assistance**: Get AI-guided help with GCP configurations and best practices 16 | 17 | The implementation follows the MCP specification to enable AI systems to interact with GCP services in a secure, controlled manner. 18 | 19 | ## Supported GCP Services 20 | 21 | This implementation includes support for the following GCP services: 22 | 23 | - **Artifact Registry**: Container and package management 24 | - **BigQuery**: Data warehousing and analytics 25 | - **Cloud Audit Logs**: Logging and audit trail analysis 26 | - **Cloud Build**: CI/CD pipeline management 27 | - **Cloud Compute Engine**: Virtual machine instances 28 | - **Cloud Monitoring**: Metrics, alerting, and dashboards 29 | - **Cloud Run**: Serverless container deployments 30 | - **Cloud Storage**: Object storage management 31 | 32 | ## Architecture 33 | 34 | The project is structured as follows: 35 | 36 | ``` 37 | gcp-mcp-server/ 38 | ├── core/ # Core MCP server functionality auth context logging_handler security 39 | ├── prompts/ # AI assistant prompts for GCP operations 40 | ├── services/ # GCP service implementations 41 | │ ├── README.md # Service implementation details 42 | │ └── ... # Individual service modules 43 | ├── main.py # Main server entry point 44 | └── ... 45 | ``` 46 | 47 | Key components: 48 | 49 | - **Service Modules**: Each GCP service has its own module with resources, tools, and prompts 50 | - **Client Instances**: Centralized client management for authentication and resource access 51 | - **Core Components**: Base functionality for the MCP server implementation 52 | 53 | ## Getting Started 54 | 55 | ### Prerequisites 56 | 57 | - Python 3.10+ 58 | - GCP project with enabled APIs for the services you want to use 59 | - Authenticated GCP credentials (Application Default Credentials recommended) 60 | 61 | ### Installation 62 | 63 | 1. Clone the repository: 64 | ```bash 65 | git clone https://github.com/yourusername/gcp-mcp-server.git 66 | cd gcp-mcp-server 67 | ``` 68 | 69 | 2. Set up a virtual environment: 70 | ```bash 71 | python -m venv venv 72 | source venv/bin/activate # On Windows: venv\Scripts\activate 73 | ``` 74 | 75 | 3. Install dependencies: 76 | ```bash 77 | pip install -r requirements.txt 78 | ``` 79 | 80 | 4. Configure your GCP credentials: 81 | ```bash 82 | # Using gcloud 83 | gcloud auth application-default login 84 | 85 | # Or set GOOGLE_APPLICATION_CREDENTIALS 86 | export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json" 87 | ``` 88 | 89 | 5. Set up environment variables: 90 | ```bash 91 | cp .env.example .env 92 | # Edit .env with your configuration 93 | ``` 94 | 95 | ### Running the Server 96 | 97 | Start the MCP server: 98 | 99 | ```bash 100 | python main.py 101 | ``` 102 | 103 | For development and testing: 104 | 105 | ```bash 106 | # Development mode with auto-reload 107 | python main.py --dev 108 | 109 | # Run with specific configuration 110 | python main.py --config config.yaml 111 | ``` 112 | 113 | ## Docker Deployment 114 | 115 | Build and run with Docker: 116 | 117 | ```bash 118 | # Build the image 119 | docker build -t gcp-mcp-server . 120 | 121 | # Run the container 122 | docker run -p 8080:8080 -v ~/.config/gcloud:/root/.config/gcloud gcp-mcp-server 123 | ``` 124 | 125 | ## Configuration 126 | 127 | The server can be configured through environment variables or a configuration file: 128 | 129 | | Environment Variable | Description | Default | 130 | |----------------------|-------------|---------| 131 | | `GCP_PROJECT_ID` | Default GCP project ID | None (required) | 132 | | `GCP_DEFAULT_LOCATION` | Default region/zone | `us-central1` | 133 | | `MCP_SERVER_PORT` | Server port | `8080` | 134 | | `LOG_LEVEL` | Logging level | `INFO` | 135 | 136 | See `.env.example` for a complete list of configuration options. 137 | 138 | ## Development 139 | 140 | ### Adding a New GCP Service 141 | 142 | 1. Create a new file in the `services/` directory 143 | 2. Implement the service following the pattern in existing services 144 | 3. Register the service in `main.py` 145 | 146 | See the [services README](services/README.md) for detailed implementation guidance. 147 | 148 | 149 | ## Security Considerations 150 | 151 | - The server uses Application Default Credentials for authentication 152 | - Authorization is determined by the permissions of the authenticated identity 153 | - No credentials are hardcoded in the service implementations 154 | - Consider running with a service account with appropriate permissions 155 | 156 | ## Contributing 157 | 158 | Contributions are welcome! Please feel free to submit a Pull Request. 159 | 160 | 1. Fork the repository 161 | 2. Create your feature branch (`git checkout -b feature/amazing-feature`) 162 | 3. Commit your changes (`git commit -m 'Add some amazing feature'`) 163 | 4. Push to the branch (`git push origin feature/amazing-feature`) 164 | 5. Open a Pull Request 165 | 166 | ## License 167 | 168 | This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. 169 | 170 | ## Acknowledgments 171 | 172 | - Google Cloud Platform team for their comprehensive APIs 173 | - Model Context Protocol for providing a standardized way for AI to interact with services 174 | 175 | ### Using the Server 176 | 177 | To use this server: 178 | 179 | 1. Place your GCP service account key file as `service-account.json` in the same directory 180 | 2. Install the MCP package: `pip install "mcp[cli]"` 181 | 3. Install the required GCP package: `pip install google-cloud-run` 182 | 4. Run: `mcp dev gcp_cloudrun_server.py` 183 | 184 | Or install it in Claude Desktop: 185 | ``` 186 | mcp install gcp_cloudrun_server.py --name "GCP Cloud Run Manager" 187 | ``` 188 | 189 | 190 | ## MCP Server Configuration 191 | 192 | The following configuration can be added to your configuration file for GCP Cloud Tools: 193 | 194 | ```json 195 | "mcpServers": { 196 | "GCP Cloud Tools": { 197 | "command": "uv", 198 | "args": [ 199 | "run", 200 | "--with", 201 | "google-cloud-artifact-registry>=1.10.0", 202 | "--with", 203 | "google-cloud-bigquery>=3.27.0", 204 | "--with", 205 | "google-cloud-build>=3.0.0", 206 | "--with", 207 | "google-cloud-compute>=1.0.0", 208 | "--with", 209 | "google-cloud-logging>=3.5.0", 210 | "--with", 211 | "google-cloud-monitoring>=2.0.0", 212 | "--with", 213 | "google-cloud-run>=0.9.0", 214 | "--with", 215 | "google-cloud-storage>=2.10.0", 216 | "--with", 217 | "mcp[cli]", 218 | "--with", 219 | "python-dotenv>=1.0.0", 220 | "mcp", 221 | "run", 222 | "C:\\Users\\enes_\\Desktop\\mcp-repo-final\\gcp-mcp\\src\\gcp-mcp-server\\main.py" 223 | ], 224 | "env": { 225 | "GOOGLE_APPLICATION_CREDENTIALS": "C:/Users/enes_/Desktop/mcp-repo-final/gcp-mcp/service-account.json", 226 | "GCP_PROJECT_ID": "gcp-mcp-cloud-project", 227 | "GCP_LOCATION": "us-east1" 228 | } 229 | } 230 | } 231 | ``` 232 | 233 | ### Configuration Details 234 | 235 | This configuration sets up an MCP server for Google Cloud Platform tools with the following: 236 | 237 | - **Command**: Uses `uv` package manager to run the server 238 | - **Dependencies**: Includes various Google Cloud libraries (Artifact Registry, BigQuery, Cloud Build, etc.) 239 | - **Environment Variables**: 240 | - `GOOGLE_APPLICATION_CREDENTIALS`: Path to your GCP service account credentials 241 | - `GCP_PROJECT_ID`: Your Google Cloud project ID 242 | - `GCP_LOCATION`: GCP region (us-east1) 243 | 244 | ### Usage 245 | 246 | Add this configuration to your MCP configuration file to enable GCP Cloud Tools functionality. 247 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/prompts/common.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/core/__init__.py: -------------------------------------------------------------------------------- ```python 1 | """Core package for MCP server components.""" 2 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/__init__.py: -------------------------------------------------------------------------------- ```python 1 | """GCP MCP server package.""" 2 | 3 | __version__ = "0.1.0" 4 | ``` -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` 1 | mcp>=1.0.0 2 | google-cloud-bigquery 3 | google-cloud-storage 4 | google-cloud-run 5 | google-cloud-artifact-registry 6 | google-cloud-logging 7 | python-dotenv 8 | google-cloud-monitoring 9 | google-cloud-compute 10 | google-cloud-build 11 | ``` -------------------------------------------------------------------------------- /utils/helpers.py: -------------------------------------------------------------------------------- ```python 1 | """Common utility functions.""" 2 | 3 | 4 | def format_resource_name(project_id: str, resource_name: str) -> str: 5 | """Format a resource name with project ID.""" 6 | return f"projects/{project_id}/resources/{resource_name}" 7 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/core/security.py: -------------------------------------------------------------------------------- ```python 1 | import re 2 | 3 | 4 | class DataSanitizer: 5 | PATTERNS = [ 6 | r"(?i)(token|key|secret|password)", 7 | r"\b\d{4}-\d{4}-\d{4}-\d{4}\b", # CC-like numbers 8 | ] 9 | 10 | @classmethod 11 | def sanitize(cls, text: str) -> str: 12 | for pattern in cls.PATTERNS: 13 | text = re.sub(pattern, "[REDACTED]", text) 14 | return text 15 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/__init__.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | GCP MCP Server services package. 3 | This package contains service modules that register tools and resources with the MCP server. 4 | """ 5 | 6 | # The following allows these modules to be imported directly from the services package 7 | from . import ( 8 | artifact_registry, 9 | client_instances, 10 | cloud_audit_logs, 11 | cloud_bigquery, 12 | cloud_build, 13 | cloud_compute_engine, 14 | cloud_monitoring, 15 | cloud_run, 16 | cloud_storage, 17 | ) 18 | ``` -------------------------------------------------------------------------------- /utils/__init__.py: -------------------------------------------------------------------------------- ```python 1 | """Utility functions for MCP server.""" 2 | 3 | import json 4 | from typing import Any 5 | 6 | 7 | def format_json_response(data: Any) -> str: 8 | """Format data as a JSON string with consistent styling.""" 9 | return json.dumps({"status": "success", "data": data}, indent=2) 10 | 11 | 12 | def format_error_response(message: str) -> str: 13 | """Format error message as a JSON string.""" 14 | return json.dumps({"status": "error", "message": message}, indent=2) 15 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/core/logging_handler.py: -------------------------------------------------------------------------------- ```python 1 | import logging 2 | 3 | 4 | class MCPLogger: 5 | """Simple logger for MCP server""" 6 | 7 | def __init__(self, name): 8 | self.logger = logging.getLogger(f"mcp.{name}") 9 | 10 | def info(self, message): 11 | self.logger.info(message) 12 | 13 | def warning(self, message): 14 | self.logger.warning(message) 15 | 16 | def error(self, message): 17 | self.logger.error(message) 18 | 19 | def critical(self, message): 20 | self.logger.critical(message) 21 | 22 | def debug(self, message): 23 | self.logger.debug(message) 24 | 25 | def audit_log(self, action, resource, details=None): 26 | """Simple audit logging""" 27 | self.logger.info(f"AUDIT: {action} on {resource}") 28 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/prompts/__init__.py: -------------------------------------------------------------------------------- ```python 1 | """Common prompts for GCP operations.""" 2 | 3 | from ..core import mcp 4 | 5 | 6 | @mcp.prompt() 7 | def gcp_service_help(service_name: str) -> str: 8 | """Get help for using a GCP service.""" 9 | return f""" 10 | I need help with using {service_name} in Google Cloud Platform. 11 | 12 | Please help me understand: 13 | 1. Common operations and best practices 14 | 2. Required parameters and configuration 15 | 3. Security considerations 16 | 4. Recommended patterns for {service_name} 17 | """ 18 | 19 | 20 | @mcp.prompt() 21 | def error_analysis(error_message: str) -> str: 22 | """Analyze a GCP error message.""" 23 | return f""" 24 | I received this error from GCP: 25 | {error_message} 26 | 27 | Please help me: 28 | 1. Understand what caused this error 29 | 2. Find potential solutions 30 | 3. Prevent similar errors in the future 31 | """ 32 | ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml 1 | [project] 2 | name = "gcp-mcp-server" 3 | version = "0.1.0" 4 | description = "A Model Context Protocol server that provides access to Google Cloud Platform services, enabling LLMs to manage and interact with GCP resources." 5 | readme = "README.md" 6 | requires-python = ">=3.13" 7 | 8 | dependencies = [ 9 | "mcp[cli]>=1.0.0", 10 | "google-cloud-bigquery", 11 | "google-cloud-storage", 12 | "google-cloud-run", 13 | "google-cloud-artifact-registry", 14 | "google-cloud-logging", 15 | "python-dotenv", 16 | "google-cloud-monitoring", 17 | "google-cloud-compute", 18 | "google-cloud-build", 19 | ] 20 | [[project.authors]] 21 | name = "Enes Bol" 22 | email = "[email protected]" 23 | 24 | [build-system] 25 | requires = ["hatchling"] 26 | build-backend = "hatchling.build" 27 | 28 | [tool.hatch.build.targets.wheel] 29 | packages = ["src/gcp_mcp"] 30 | 31 | [project.scripts] 32 | gcp-mcp-server = "gcp_mcp.main:main" 33 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/client_instances.py: -------------------------------------------------------------------------------- ```python 1 | import logging 2 | 3 | from core.context import GCPClients 4 | 5 | # Set up logging 6 | logger = logging.getLogger(__name__) 7 | 8 | # Global client instance 9 | _gcp_clients = None 10 | _project_id = None 11 | _location = None 12 | 13 | 14 | def initialize_clients(credentials=None): 15 | """Initialize GCP clients with credentials.""" 16 | global _gcp_clients, _project_id, _location 17 | 18 | try: 19 | # Initialize GCP clients 20 | _gcp_clients = GCPClients(credentials) 21 | _project_id = _gcp_clients.project_id 22 | _location = _gcp_clients.location 23 | logger.info(f"GCP clients initialized with project: {_project_id}") 24 | return True 25 | except Exception as e: 26 | logger.error(f"Failed to initialize GCP clients: {str(e)}") 27 | return False 28 | 29 | 30 | def get_clients(): 31 | """Get the initialized GCP clients.""" 32 | if _gcp_clients is None: 33 | logger.warning("GCP clients not initialized. Attempting to initialize...") 34 | initialize_clients() 35 | return _gcp_clients 36 | 37 | 38 | def get_project_id(): 39 | """Get the GCP project ID.""" 40 | return _project_id 41 | 42 | 43 | def get_location(): 44 | """Get the GCP location.""" 45 | return _location 46 | ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml 1 | # Smithery.ai configuration 2 | startCommand: 3 | type: stdio 4 | configSchema: 5 | type: object 6 | properties: 7 | project_id: 8 | type: string 9 | description: "Google Cloud Project ID" 10 | region: 11 | type: string 12 | description: "Default GCP region" 13 | default: "us-central1" 14 | timeout: 15 | type: integer 16 | description: "Default timeout for operations in seconds" 17 | default: 300 18 | service_account_json: 19 | type: string 20 | description: "GCP Service Account key JSON (as string)" 21 | service_account_path: 22 | type: string 23 | description: "Path to GCP Service Account key file" 24 | required: 25 | - project_id 26 | commandFunction: |- 27 | (config) => ({ 28 | "command": "python", 29 | "args": [ 30 | "main.py" 31 | ], 32 | "env": { 33 | "GCP_PROJECT_ID": config.project_id, 34 | "GCP_REGION": config.region || "us-central1", 35 | "GCP_TIMEOUT": String(config.timeout || 300), 36 | "GCP_SERVICE_ACCOUNT_JSON": config.service_account_json || "", 37 | "GCP_SERVICE_ACCOUNT_KEY_PATH": config.service_account_path || "" 38 | } 39 | }) ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile 1 | # Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile 2 | # Use a Python image with uv pre-installed 3 | FROM ghcr.io/astral-sh/uv:python3.13-bookworm-slim AS uv 4 | 5 | # Set working directory 6 | WORKDIR /app 7 | 8 | # Enable bytecode compilation 9 | ENV UV_COMPILE_BYTECODE=1 10 | 11 | # Copy pyproject.toml and lock file for dependencies 12 | COPY pyproject.toml uv.lock ./ 13 | 14 | # Install the project's dependencies 15 | RUN --mount=type=cache,target=/root/.cache/uv \ 16 | uv sync --frozen --no-install-project --no-dev --no-editable 17 | 18 | # Add the rest of the project source code and install it 19 | ADD src /app/src 20 | 21 | # Sync and install the project 22 | RUN --mount=type=cache,target=/root/.cache/uv \ 23 | uv sync --frozen --no-dev --no-editable 24 | 25 | FROM python:3.13-slim-bookworm 26 | 27 | # Set working directory 28 | WORKDIR /app 29 | 30 | # Copy virtual environment from the builder 31 | COPY --from=uv /root/.local /root/.local 32 | COPY --from=uv --chown=app:app /app/.venv /app/.venv 33 | 34 | # Place executables in the environment at the front of the path 35 | ENV PATH="/app/.venv/bin:$PATH" 36 | 37 | # Define the entry point 38 | ENTRYPOINT ["gcp-mcp-server"] 39 | 40 | # Example command 41 | # CMD ["--project", "your-gcp-project-id", "--location", "your-gcp-location"] ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/core/server.py: -------------------------------------------------------------------------------- ```python 1 | # import logging 2 | # import os 3 | # from contextlib import asynccontextmanager 4 | # from typing import Any, AsyncIterator, Dict 5 | 6 | # from auth import get_credentials 7 | # from mcp.server.fastmcp import FastMCP 8 | 9 | # from ..services import bigquery, compute, iam, storage 10 | 11 | # logging.basicConfig(level=logging.INFO) 12 | # logger = logging.getLogger(__name__) 13 | 14 | 15 | # @asynccontextmanager 16 | # async def gcp_lifespan(server) -> AsyncIterator[Dict[str, Any]]: 17 | # """Set up GCP context with credentials.""" 18 | # logger.info("Initializing GCP MCP server...") 19 | # try: 20 | # # Get GCP credentials 21 | # credentials = get_credentials() 22 | # project_id = os.environ.get("GCP_PROJECT_ID") 23 | 24 | # if not project_id: 25 | # logger.warning( 26 | # "GCP_PROJECT_ID not set in environment. Some features may not work correctly." 27 | # ) 28 | 29 | # logger.info(f"Server initialized with project: {project_id or 'Not set'}") 30 | 31 | # # Yield context to be used by handlers 32 | # yield {"credentials": credentials, "project_id": project_id} 33 | # except Exception as e: 34 | # logger.error(f"Failed to initialize GCP context: {str(e)}") 35 | # raise 36 | # finally: 37 | # logger.info("Shutting down GCP MCP server...") 38 | 39 | 40 | # # Create main server FIRST 41 | # mcp = FastMCP( 42 | # "GCP Manager", 43 | # description="Manage Google Cloud Platform Resources", 44 | # lifespan=gcp_lifespan, 45 | # ) 46 | 47 | # # Register all services 48 | # compute.register(mcp) 49 | # storage.register(mcp) 50 | # bigquery.register(mcp) 51 | # iam.register(mcp) 52 | 53 | 54 | # # THEN define resources and tools 55 | # @mcp.resource("gcp://project") 56 | # def get_project_info(): 57 | # """Get information about the current GCP project""" 58 | # project_id = os.environ.get("GCP_PROJECT_ID") 59 | # return f"Project ID: {project_id}" 60 | 61 | 62 | # @mcp.resource("gcp://storage/buckets") 63 | # def list_buckets(): 64 | # """List GCP storage buckets""" 65 | # from google.cloud import storage 66 | 67 | # client = storage.Client() 68 | # buckets = list(client.list_buckets(max_results=10)) 69 | # return "\n".join([f"- {bucket.name}" for bucket in buckets]) 70 | 71 | 72 | # @mcp.resource("test://hello") 73 | # def hello_resource(): 74 | # """A simple test resource""" 75 | # return "Hello World" 76 | 77 | 78 | # @mcp.tool() 79 | # def list_gcp_instances(region: str = "us-central1") -> str: 80 | # """List GCP compute instances in a region""" 81 | # # Use your credentials to list instances 82 | # return f"Instances in {region}: [instance list would go here]" 83 | 84 | 85 | # @mcp.tool() 86 | # def test_gcp_auth() -> str: 87 | # """Test GCP authentication""" 88 | # try: 89 | # from google.cloud import storage 90 | 91 | # client = storage.Client() 92 | # buckets = list(client.list_buckets(max_results=5)) 93 | # return f"Authentication successful. Found {len(buckets)} buckets." 94 | # except Exception as e: 95 | # return f"Authentication failed: {str(e)}" 96 | 97 | 98 | # if __name__ == "__main__": 99 | # mcp.run() 100 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/core/auth.py: -------------------------------------------------------------------------------- ```python 1 | import json 2 | import os 3 | 4 | import google.auth 5 | from google.auth.exceptions import DefaultCredentialsError 6 | from google.auth.transport.requests import Request 7 | from google.oauth2 import service_account 8 | 9 | from .logging_handler import MCPLogger 10 | 11 | # Define required scopes for GCP APIs 12 | REQUIRED_SCOPES = [ 13 | "https://www.googleapis.com/auth/cloud-platform", 14 | ] 15 | 16 | logger = MCPLogger("auth") 17 | 18 | 19 | def get_credentials(): 20 | """ 21 | Get Google Cloud credentials from environment. 22 | Attempts to load credentials in the following order: 23 | 1. From GOOGLE_APPLICATION_CREDENTIALS environment variable 24 | 2. From GCP_SERVICE_ACCOUNT_JSON environment variable containing JSON 25 | 3. Default application credentials 26 | """ 27 | try: 28 | # Check for credentials file path 29 | creds_file = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") 30 | if creds_file and os.path.exists(creds_file): 31 | logger.audit_log( 32 | action="credential_load", 33 | resource="service_account", 34 | details={"method": "file", "file": creds_file}, 35 | ) 36 | logger.info(f"Loading credentials from file: {creds_file}") 37 | credentials = service_account.Credentials.from_service_account_file( 38 | creds_file, scopes=REQUIRED_SCOPES 39 | ) 40 | return validate_credentials(credentials) 41 | 42 | # Check for service account JSON in environment 43 | sa_json = os.environ.get("GCP_SERVICE_ACCOUNT_JSON") 44 | if sa_json: 45 | try: 46 | service_account_info = json.loads(sa_json) 47 | logger.info( 48 | "Loading credentials from GCP_SERVICE_ACCOUNT_JSON environment variable" 49 | ) 50 | credentials = service_account.Credentials.from_service_account_info( 51 | service_account_info, scopes=REQUIRED_SCOPES 52 | ) 53 | logger.audit_log( 54 | action="credential_load", 55 | resource="service_account", 56 | details={"method": "environment_json"}, 57 | ) 58 | return validate_credentials(credentials) 59 | except json.JSONDecodeError as e: 60 | logger.error(f"Failed to parse GCP_SERVICE_ACCOUNT_JSON: {str(e)}") 61 | 62 | # Fall back to default credentials 63 | try: 64 | logger.audit_log( 65 | action="credential_load", 66 | resource="application_default", 67 | details={"method": "default"}, 68 | ) 69 | logger.info("Loading default application credentials") 70 | credentials, project_id = google.auth.default(scopes=REQUIRED_SCOPES) 71 | if project_id and not os.environ.get("GCP_PROJECT_ID"): 72 | # Set project ID from default credentials if not already set 73 | os.environ["GCP_PROJECT_ID"] = project_id 74 | logger.info(f"Using project ID from default credentials: {project_id}") 75 | return validate_credentials(credentials) 76 | except DefaultCredentialsError as e: 77 | logger.error(f"Failed to load GCP credentials: {str(e)}") 78 | raise AuthenticationError("Failed to obtain valid credentials") 79 | 80 | except Exception as e: 81 | logger.critical(f"Authentication failure: {str(e)}") 82 | raise AuthenticationError(f"Failed to obtain valid credentials: {str(e)}") 83 | 84 | 85 | def validate_credentials(credentials): 86 | """Validate credential permissions and expiration""" 87 | # Some credentials don't have a valid attribute or may not need refresh 88 | if hasattr(credentials, "valid") and not credentials.valid: 89 | credentials.refresh(Request()) 90 | 91 | # Check expiration 92 | if hasattr(credentials, "expiry"): 93 | if hasattr(credentials, "expired") and credentials.expired: 94 | try: 95 | credentials.refresh(Request()) 96 | except Exception as e: 97 | raise AuthenticationError( 98 | f"Failed to refresh expired credentials: {str(e)}" 99 | ) 100 | 101 | return credentials 102 | 103 | 104 | class AuthenticationError(Exception): 105 | """Custom exception for authentication failures""" 106 | 107 | pass 108 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/main.py: -------------------------------------------------------------------------------- ```python 1 | import os 2 | 3 | # At the top of main.py, add these imports and logging statements 4 | import sys 5 | 6 | # Get the directory containing main.py 7 | current_dir = os.path.dirname(os.path.abspath(__file__)) 8 | # Add it to Python's module search path 9 | sys.path.append(current_dir) 10 | # Also add the parent directory if needed 11 | parent_dir = os.path.dirname(current_dir) 12 | sys.path.append(parent_dir) 13 | 14 | 15 | import inspect 16 | import logging 17 | from contextlib import asynccontextmanager 18 | from typing import Any, AsyncIterator, Dict 19 | 20 | # Import the GCP Clients and Context 21 | from mcp.server.fastmcp import FastMCP 22 | from services import client_instances 23 | 24 | # Set up logging 25 | logging.basicConfig(level=logging.INFO) 26 | logger = logging.getLogger(__name__) 27 | 28 | # After adding paths, print them to see what's happening 29 | print(f"Current working directory: {os.getcwd()}") 30 | print(f"Updated Python path: {sys.path}") 31 | 32 | 33 | @asynccontextmanager 34 | async def gcp_lifespan(server) -> AsyncIterator[Dict[str, Any]]: 35 | """Set up GCP context with credentials.""" 36 | logger.info("Initializing GCP MCP server...") 37 | try: 38 | # Initialize GCP credentials 39 | credentials = None 40 | # Check for service account key file path 41 | sa_path = os.environ.get("GCP_SERVICE_ACCOUNT_KEY_PATH") 42 | if sa_path and os.path.exists(sa_path): 43 | logger.info(f"Using service account key from: {sa_path}") 44 | 45 | # Initialize GCP clients using the client_instances module 46 | client_instances.initialize_clients(credentials) 47 | clients = client_instances.get_clients() 48 | project_id = client_instances.get_project_id() 49 | location = client_instances.get_location() 50 | 51 | logger.info(f"Server initialized with project: {project_id}") 52 | # Yield context with clients and credentials 53 | yield { 54 | "clients": clients, 55 | "project_id": project_id, 56 | "credentials": credentials, 57 | "location": location, 58 | } 59 | except Exception as e: 60 | logger.error(f"Failed to initialize GCP context: {str(e)}") 61 | raise 62 | finally: 63 | logger.info("Shutting down GCP MCP server...") 64 | 65 | 66 | # Create the global MCP instance 67 | mcp = FastMCP( 68 | "GCP Manager", 69 | description="Manage Google Cloud Platform Resources", 70 | lifespan=gcp_lifespan, 71 | dependencies=[ 72 | "mcp>=1.0.0", 73 | "google-cloud-bigquery", 74 | "google-cloud-storage", 75 | "google-cloud-run", 76 | "google-cloud-artifact-registry", 77 | "google-cloud-logging", 78 | "python-dotenv", 79 | "google-cloud-monitoring", 80 | "google-cloud-compute", 81 | "google-cloud-build", 82 | ], 83 | ) 84 | 85 | 86 | # Basic resources and tools 87 | @mcp.resource("test://hello") 88 | def hello_resource(): 89 | """A simple test resource""" 90 | return "Hello World" 91 | 92 | 93 | @mcp.tool() 94 | def test_gcp_auth() -> str: 95 | """Test GCP authentication""" 96 | try: 97 | # Get clients from client_instances module instead of context 98 | clients = client_instances.get_clients() 99 | 100 | # Test if we can list storage buckets 101 | if hasattr(clients, "storage"): 102 | try: 103 | buckets = list(clients.storage.list_buckets(max_results=5)) 104 | return f"Authentication successful. Found {len(buckets)} buckets. {buckets}" 105 | except Exception as e: 106 | return f"Storage authentication failed: {str(e)}" 107 | except Exception as e: 108 | return f"Authentication failed: {str(e)}" 109 | 110 | 111 | # # Function to register services 112 | # def register_services(): 113 | # """Register all service modules with the MCP instance.""" 114 | # services_dir = os.path.join(os.path.dirname(__file__), "services") 115 | # logger.info(f"Loading services from {services_dir}") 116 | # if not os.path.exists(services_dir): 117 | # logger.warning(f"Services directory not found: {services_dir}") 118 | # return 119 | # # Get all Python files in the services directory 120 | # for filename in os.listdir(services_dir): 121 | # if ( 122 | # filename.endswith(".py") 123 | # and filename != "__init__.py" 124 | # and filename != "client_instances.py" 125 | # ): 126 | # module_name = filename[:-3] # Remove .py extension 127 | # try: 128 | # # Load the module using importlib 129 | # module_path = os.path.join(services_dir, filename) 130 | # spec = importlib.util.spec_from_file_location( 131 | # f"services.{module_name}", module_path 132 | # ) 133 | # module = importlib.util.module_from_spec(spec) 134 | # # Inject mcp instance into the module 135 | # module.mcp = mcp 136 | # # Execute the module 137 | # spec.loader.exec_module(module) 138 | # logger.info(f"Loaded service module: {module_name}") 139 | # # If the module has a register function, call it 140 | # if hasattr(module, "register"): 141 | # # Pass the mcp instance and the server's request_context to register 142 | # module.register(mcp) 143 | # logger.info(f"Registered service: {module_name}") 144 | # except Exception as e: 145 | # logger.error(f"Error loading service {module_name}: {e}") 146 | 147 | 148 | def register_services(): 149 | """Register all service modules with the MCP instance.""" 150 | logger.info("Explicitly registering service modules") 151 | 152 | # Print diagnostic information 153 | logger.info(f"Python sys.path: {sys.path}") 154 | logger.info(f"Current working directory: {os.getcwd()}") 155 | logger.info(f"Script location: {os.path.abspath(__file__)}") 156 | logger.info(f"Parent directory: {os.path.dirname(os.path.abspath(__file__))}") 157 | 158 | try: 159 | # Try importing a service module to check if imports work 160 | logger.info("Attempting to import artifact_registry") 161 | import services.artifact_registry as artifact_registry 162 | 163 | logger.info(f"Module location: {inspect.getfile(artifact_registry)}") 164 | from services import ( 165 | artifact_registry, 166 | cloud_audit_logs, 167 | cloud_bigquery, 168 | cloud_build, 169 | cloud_compute_engine, 170 | cloud_monitoring, 171 | cloud_run, 172 | cloud_storage, 173 | ) 174 | 175 | # Register each service module 176 | artifact_registry.register(mcp) 177 | cloud_audit_logs.register(mcp) 178 | cloud_bigquery.register(mcp) 179 | cloud_build.register(mcp) 180 | cloud_compute_engine.register(mcp) 181 | cloud_monitoring.register(mcp) 182 | cloud_run.register(mcp) 183 | cloud_storage.register(mcp) 184 | 185 | logger.info("All service modules registered successfully") 186 | except Exception as e: 187 | logger.error(f"Error registering service modules: {e}") 188 | # Add detailed error logging 189 | import traceback 190 | 191 | logger.error(traceback.format_exc()) 192 | 193 | 194 | if __name__ == "__main__": 195 | logger.info("Starting GCP MCP server") 196 | # Register services 197 | register_services() 198 | # Run the server 199 | mcp.run() 200 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/core/context.py: -------------------------------------------------------------------------------- ```python 1 | # context.py 2 | import json 3 | import os 4 | from typing import Any, Optional, Type 5 | 6 | import google.auth 7 | 8 | # Import other services as needed 9 | from google.cloud import ( 10 | artifactregistry_v1, 11 | bigquery, 12 | compute_v1, 13 | monitoring_v3, 14 | storage, 15 | ) 16 | from google.oauth2 import service_account 17 | 18 | 19 | class GCPClients: 20 | """Client manager for GCP services""" 21 | 22 | def __init__(self, credentials=None): 23 | self.credentials = self._get_credentials(credentials) 24 | self.project_id = self._get_project_id() 25 | self.location = self._get_location() 26 | self._clients = {} 27 | 28 | # Initialize clients 29 | self._storage_client = None 30 | self._bigquery_client = None 31 | self._run_client = None 32 | self._logging_client = None 33 | self._monitoring_client = None 34 | self._compute_client = None 35 | self._sql_client = None 36 | self._cloudbuild_client = None 37 | self._artifactregistry_client = None 38 | 39 | def _get_credentials(self, credentials=None): 40 | """Get credentials from various sources""" 41 | # If credentials are directly provided 42 | if credentials: 43 | return credentials 44 | 45 | # Check for service account JSON in environment variable 46 | sa_json = os.environ.get("GCP_SERVICE_ACCOUNT_JSON") 47 | if sa_json: 48 | try: 49 | sa_info = json.loads(sa_json) 50 | return service_account.Credentials.from_service_account_info(sa_info) 51 | except Exception as e: 52 | print(f"Error loading service account JSON: {e}") 53 | 54 | # Check for service account key file path 55 | sa_path = os.environ.get("GCP_SERVICE_ACCOUNT_KEY_PATH") 56 | if sa_path: 57 | try: 58 | return service_account.Credentials.from_service_account_file(sa_path) 59 | except Exception as e: 60 | print(f"Error loading service account key file: {e}") 61 | 62 | # Fall back to application default credentials 63 | try: 64 | credentials, project = google.auth.default() 65 | return credentials 66 | except Exception as e: 67 | print(f"Error getting default credentials: {e}") 68 | raise RuntimeError("No valid GCP credentials found") 69 | 70 | def _get_project_id(self) -> str: 71 | """Get project ID from environment or credentials""" 72 | project_id = os.environ.get("GCP_PROJECT_ID") 73 | if project_id: 74 | return project_id 75 | 76 | if hasattr(self.credentials, "project_id"): 77 | return self.credentials.project_id 78 | 79 | try: 80 | _, project_id = google.auth.default() 81 | return project_id 82 | except Exception: 83 | raise RuntimeError("Unable to determine GCP project ID") 84 | 85 | def _get_location(self) -> str: 86 | """Get default location/region from environment""" 87 | return os.environ.get("GCP_LOCATION", "us-central1") 88 | 89 | # def _init_client( 90 | # self, client_class: Type, current_client: Optional[Any], **kwargs 91 | # ) -> Any: 92 | # """Helper method to initialize clients with error handling""" 93 | # if not current_client: 94 | # try: 95 | # return client_class( 96 | # project=self.project_id, # <-- This adds project automatically 97 | # credentials=self.credentials, # <-- This adds credentials automatically 98 | # **kwargs, # <-- This adds any extra params (like database) 99 | # ) 100 | # except Exception as e: 101 | # raise RuntimeError( 102 | # f"Failed to initialize {client_class.__name__}: {str(e)}" 103 | # ) 104 | # return current_client 105 | 106 | def _init_client( 107 | self, client_class: Type, current_client: Optional[Any], **kwargs 108 | ) -> Any: 109 | """Helper method to initialize clients with error handling""" 110 | if not current_client: 111 | try: 112 | # Check if this client accepts project parameter 113 | if client_class in [ 114 | storage.Client 115 | ]: # Add other clients that accept project 116 | kwargs["project"] = self.project_id 117 | 118 | return client_class(credentials=self.credentials, **kwargs) 119 | except Exception as e: 120 | raise RuntimeError( 121 | f"Failed to initialize {client_class.__name__}: {str(e)}" 122 | ) 123 | return current_client 124 | 125 | @property 126 | def storage(self) -> storage.Client: 127 | self._storage_client = self._init_client(storage.Client, self._storage_client) 128 | return self._storage_client 129 | 130 | @property 131 | def bigquery(self) -> bigquery.Client: 132 | self._bigquery_client = self._init_client( 133 | bigquery.Client, self._bigquery_client 134 | ) 135 | return self._bigquery_client 136 | 137 | @property 138 | def artifactregistry(self) -> artifactregistry_v1.ArtifactRegistryClient: 139 | """Get the Artifact Registry client.""" 140 | if not self._artifactregistry_client: 141 | try: 142 | # ArtifactRegistryClient doesn't accept project parameter 143 | self._artifactregistry_client = ( 144 | artifactregistry_v1.ArtifactRegistryClient( 145 | credentials=self.credentials 146 | ) 147 | ) 148 | except Exception as e: 149 | raise RuntimeError( 150 | f"Failed to initialize ArtifactRegistryClient: {str(e)}" 151 | ) 152 | return self._artifactregistry_client 153 | 154 | # Uncomment and implement other client properties as needed 155 | # @property 156 | # def storage(self) -> storage.Client: 157 | # self._storage_client = self._init_client(storage.Client, self._storage_client) 158 | # return self._storage_client 159 | 160 | def close_all(self): 161 | """Close all open clients""" 162 | for client in self._clients.values(): 163 | if hasattr(client, "transport") and hasattr(client.transport, "close"): 164 | client.transport.close() 165 | elif hasattr(client, "close"): 166 | client.close() 167 | self._clients.clear() 168 | 169 | 170 | # Update Context class to handle credentials properly 171 | class Context: 172 | def __init__(self, request_context): 173 | self.request_context = request_context 174 | # Get credentials from lifespan context 175 | credentials = request_context.lifespan_context.get("credentials") 176 | # Initialize GCPClients with those credentials 177 | self.clients = GCPClients(credentials) 178 | 179 | def close(self): 180 | """Clean up when request ends""" 181 | if hasattr(self, "clients"): 182 | self.clients.close_all() 183 | 184 | # @property 185 | # def run(self) -> run_v2.CloudRunClient: 186 | # self._run_client = self._init_client(run_v2.CloudRunClient, self._run_client) 187 | # return self._run_client 188 | 189 | # @property 190 | # def logging(self) -> logging_v2.LoggingServiceV2Client: 191 | # self._logging_client = self._init_client( 192 | # logging_v2.LoggingServiceV2Client, self._logging_client 193 | # ) 194 | # return self._logging_client 195 | 196 | @property 197 | def monitoring(self) -> monitoring_v3.MetricServiceClient: 198 | self._monitoring_client = self._init_client( 199 | monitoring_v3.MetricServiceClient, self._monitoring_client 200 | ) 201 | return self._monitoring_client 202 | 203 | @property 204 | def compute(self) -> compute_v1.InstancesClient: 205 | self._compute_client = self._init_client( 206 | compute_v1.InstancesClient, self._compute_client 207 | ) 208 | return self._compute_client 209 | 210 | # @property 211 | # def sql(self) -> sql_v1.InstancesClient: 212 | # self._sql_client = self._init_client(sql_v1.InstancesClient, self._sql_client) 213 | # return self._sql_client 214 | 215 | # @property 216 | # def cloudbuild(self) -> cloudbuild_v1.CloudBuildClient: 217 | # self._cloudbuild_client = self._init_client( 218 | # cloudbuild_v1.CloudBuildClient, self._cloudbuild_client 219 | # ) 220 | # return self._cloudbuild_client 221 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/cloud_build.py: -------------------------------------------------------------------------------- ```python 1 | import json 2 | from typing import Dict, Optional 3 | 4 | from services import client_instances 5 | 6 | 7 | def register(mcp_instance): 8 | """Register all Cloud Build resources and tools with the MCP instance.""" 9 | 10 | # Resources 11 | @mcp_instance.resource("gcp://cloudbuild/{project_id}/builds") 12 | def list_builds_resource(project_id: str = None) -> str: 13 | """List all Cloud Build executions for a project""" 14 | try: 15 | # Get client from client_instances 16 | client = client_instances.get_clients().cloudbuild 17 | project_id = project_id or client_instances.get_project_id() 18 | 19 | builds = client.list_builds(project_id=project_id) 20 | result = [] 21 | for build in builds: 22 | result.append( 23 | { 24 | "id": build.id, 25 | "status": build.status.name if build.status else None, 26 | "source": build.source.repo_source.commit_sha 27 | if build.source and build.source.repo_source 28 | else None, 29 | "create_time": build.create_time.isoformat() 30 | if build.create_time 31 | else None, 32 | "logs_url": build.logs_url, 33 | "substitutions": dict(build.substitutions) 34 | if build.substitutions 35 | else {}, 36 | } 37 | ) 38 | return json.dumps(result, indent=2) 39 | except Exception as e: 40 | return json.dumps({"error": str(e)}, indent=2) 41 | 42 | @mcp_instance.resource("gcp://cloudbuild/{project_id}/triggers") 43 | def list_triggers_resource(project_id: str = None) -> str: 44 | """List all Cloud Build triggers for a project""" 45 | try: 46 | # Get client from client_instances 47 | client = client_instances.get_clients().cloudbuild 48 | project_id = project_id or client_instances.get_project_id() 49 | 50 | triggers = client.list_triggers(project_id=project_id) 51 | result = [] 52 | for trigger in triggers: 53 | result.append( 54 | { 55 | "id": trigger.id, 56 | "name": trigger.name, 57 | "description": trigger.description, 58 | "trigger_template": { 59 | "repo_name": trigger.trigger_template.repo_name, 60 | "branch_name": trigger.trigger_template.branch_name, 61 | } 62 | if trigger.trigger_template 63 | else None, 64 | } 65 | ) 66 | return json.dumps(result, indent=2) 67 | except Exception as e: 68 | return json.dumps({"error": str(e)}, indent=2) 69 | 70 | # Tools 71 | @mcp_instance.tool() 72 | def trigger_build( 73 | build_config: Dict, 74 | project_id: str = None, 75 | ) -> str: 76 | """ 77 | Trigger a new Cloud Build execution 78 | 79 | Args: 80 | build_config: Dictionary containing the build configuration 81 | project_id: GCP project ID (defaults to configured project) 82 | """ 83 | try: 84 | # Get client from client_instances 85 | client = client_instances.get_clients().cloudbuild 86 | project_id = project_id or client_instances.get_project_id() 87 | 88 | # Convert config dict to Build object 89 | build = cloudbuild_v1.Build.from_json(json.dumps(build_config)) 90 | 91 | print(f"Triggering build in project {project_id}...") 92 | operation = client.create_build(project_id=project_id, build=build) 93 | response = operation.result() 94 | 95 | return json.dumps( 96 | { 97 | "status": "success", 98 | "build_id": response.id, 99 | "status": response.status.name if response.status else None, 100 | "logs_url": response.logs_url, 101 | "build_name": response.name, 102 | }, 103 | indent=2, 104 | ) 105 | except Exception as e: 106 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 107 | 108 | @mcp_instance.tool() 109 | def list_builds( 110 | project_id: str = None, 111 | filter: Optional[str] = None, 112 | page_size: Optional[int] = 100, 113 | ) -> str: 114 | """ 115 | List Cloud Build executions with optional filtering 116 | 117 | Args: 118 | project_id: GCP project ID (defaults to configured project) 119 | filter: Filter expression for builds 120 | page_size: Maximum number of results to return 121 | """ 122 | try: 123 | # Get client from client_instances 124 | client = client_instances.get_clients().cloudbuild 125 | project_id = project_id or client_instances.get_project_id() 126 | 127 | request = cloudbuild_v1.ListBuildsRequest( 128 | project_id=project_id, filter=filter, page_size=page_size 129 | ) 130 | 131 | builds = [] 132 | for build in client.list_builds(request=request): 133 | builds.append( 134 | { 135 | "id": build.id, 136 | "status": build.status.name if build.status else None, 137 | "create_time": build.create_time.isoformat() 138 | if build.create_time 139 | else None, 140 | "source": build.source.repo_source.commit_sha 141 | if build.source and build.source.repo_source 142 | else None, 143 | "logs_url": build.logs_url, 144 | } 145 | ) 146 | 147 | return json.dumps( 148 | {"status": "success", "builds": builds, "count": len(builds)}, indent=2 149 | ) 150 | except Exception as e: 151 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 152 | 153 | @mcp_instance.tool() 154 | def create_build_trigger( 155 | trigger_config: Dict, 156 | project_id: str = None, 157 | ) -> str: 158 | """ 159 | Create a new Cloud Build trigger 160 | 161 | Args: 162 | trigger_config: Dictionary containing the trigger configuration 163 | project_id: GCP project ID (defaults to configured project) 164 | """ 165 | try: 166 | # Get client from client_instances 167 | client = client_instances.get_clients().cloudbuild 168 | project_id = project_id or client_instances.get_project_id() 169 | 170 | # Convert config dict to Trigger object 171 | trigger = cloudbuild_v1.BuildTrigger.from_json(json.dumps(trigger_config)) 172 | 173 | print(f"Creating trigger in project {project_id}...") 174 | response = client.create_trigger(project_id=project_id, trigger=trigger) 175 | 176 | return json.dumps( 177 | { 178 | "status": "success", 179 | "trigger_id": response.id, 180 | "name": response.name, 181 | "description": response.description, 182 | "create_time": response.create_time.isoformat() 183 | if response.create_time 184 | else None, 185 | }, 186 | indent=2, 187 | ) 188 | except Exception as e: 189 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 190 | 191 | # Prompts 192 | @mcp_instance.prompt() 193 | def build_config_prompt(service_type: str = "docker") -> str: 194 | """Prompt for creating a Cloud Build configuration""" 195 | return f""" 196 | I need to create a Cloud Build configuration for {service_type} deployments. Please help with: 197 | 198 | 1. Recommended build steps for {service_type} 199 | 2. Proper use of substitutions 200 | 3. Caching strategies 201 | 4. Security best practices 202 | 5. Integration with other GCP services 203 | """ 204 | 205 | @mcp_instance.prompt() 206 | def trigger_setup_prompt(repo_type: str = "github") -> str: 207 | """Prompt for configuring build triggers""" 208 | return f""" 209 | I want to set up a {repo_type} trigger for Cloud Build. Please explain: 210 | 211 | 1. Required permissions and connections 212 | 2. Event types (push, PR, etc.) 213 | 3. File pattern matching 214 | 4. Branch/tag filtering 215 | 5. Approval workflows 216 | """ 217 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/cloud_compute_engine.py: -------------------------------------------------------------------------------- ```python 1 | import json 2 | from typing import Optional 3 | 4 | from services import client_instances 5 | 6 | 7 | def register(mcp_instance): 8 | """Register all Compute Engine resources and tools with the MCP instance.""" 9 | 10 | # Resources 11 | @mcp_instance.resource("gcp://compute/{project_id}/{zone}/instances") 12 | def list_instances_resource(project_id: str = None, zone: str = None) -> str: 13 | """List all Compute Engine instances in a zone""" 14 | try: 15 | # Get client from client_instances 16 | client = client_instances.get_clients().compute 17 | project_id = project_id or client_instances.get_project_id() 18 | zone = zone or client_instances.get_location() 19 | 20 | instance_list = client.list(project=project_id, zone=zone) 21 | result = [] 22 | for instance in instance_list: 23 | result.append( 24 | { 25 | "name": instance.name, 26 | "status": instance.status, 27 | "machine_type": instance.machine_type.split("/")[-1] 28 | if instance.machine_type 29 | else None, 30 | "internal_ip": instance.network_interfaces[0].network_i_p 31 | if instance.network_interfaces 32 | and len(instance.network_interfaces) > 0 33 | else None, 34 | "creation_timestamp": instance.creation_timestamp, 35 | } 36 | ) 37 | return json.dumps(result, indent=2) 38 | except Exception as e: 39 | return json.dumps({"error": str(e)}, indent=2) 40 | 41 | # Tools 42 | @mcp_instance.tool() 43 | def create_instance( 44 | instance_name: str, 45 | machine_type: str, 46 | project_id: str = None, 47 | zone: str = None, 48 | image: str = "projects/debian-cloud/global/images/family/debian-11", 49 | network: str = "global/networks/default", 50 | disk_size_gb: int = 10, 51 | ) -> str: 52 | """ 53 | Create a new Compute Engine instance 54 | 55 | Args: 56 | instance_name: Name for the new instance 57 | machine_type: Machine type (e.g., n2-standard-2) 58 | project_id: GCP project ID (defaults to configured project) 59 | zone: GCP zone (defaults to configured location) 60 | image: Source image for boot disk 61 | network: Network to connect to 62 | disk_size_gb: Size of boot disk in GB 63 | """ 64 | try: 65 | # Get client from client_instances 66 | client = client_instances.get_clients().compute 67 | project_id = project_id or client_instances.get_project_id() 68 | zone = zone or client_instances.get_location() 69 | 70 | # Build instance configuration 71 | instance_config = { 72 | "name": instance_name, 73 | "machine_type": f"zones/{zone}/machineTypes/{machine_type}", 74 | "disks": [ 75 | { 76 | "boot": True, 77 | "auto_delete": True, 78 | "initialize_params": { 79 | "source_image": image, 80 | "disk_size_gb": disk_size_gb, 81 | }, 82 | } 83 | ], 84 | "network_interfaces": [ 85 | { 86 | "network": network, 87 | "access_configs": [ 88 | {"type": "ONE_TO_ONE_NAT", "name": "External NAT"} 89 | ], 90 | } 91 | ], 92 | } 93 | 94 | print(f"Creating instance {instance_name} in {zone}...") 95 | operation = client.insert( 96 | project=project_id, zone=zone, instance_resource=instance_config 97 | ) 98 | 99 | # Wait for operation to complete 100 | operation.result() 101 | return json.dumps( 102 | { 103 | "status": "success", 104 | "instance_name": instance_name, 105 | "zone": zone, 106 | "operation_type": "insert", 107 | "operation_status": "DONE", 108 | }, 109 | indent=2, 110 | ) 111 | except Exception as e: 112 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 113 | 114 | @mcp_instance.tool() 115 | def list_instances( 116 | project_id: str = None, 117 | zone: str = None, 118 | filter: Optional[str] = None, 119 | ) -> str: 120 | """ 121 | List Compute Engine instances in a zone 122 | 123 | Args: 124 | project_id: GCP project ID (defaults to configured project) 125 | zone: GCP zone (defaults to configured location) 126 | filter: Optional filter expression (e.g., "status=RUNNING") 127 | """ 128 | try: 129 | # Get client from client_instances 130 | client = client_instances.get_clients().compute 131 | project_id = project_id or client_instances.get_project_id() 132 | zone = zone or client_instances.get_location() 133 | 134 | instance_list = client.list(project=project_id, zone=zone, filter=filter) 135 | 136 | instances = [] 137 | for instance in instance_list: 138 | instances.append( 139 | { 140 | "name": instance.name, 141 | "status": instance.status, 142 | "machine_type": instance.machine_type.split("/")[-1] 143 | if instance.machine_type 144 | else None, 145 | "internal_ip": instance.network_interfaces[0].network_i_p 146 | if instance.network_interfaces 147 | and len(instance.network_interfaces) > 0 148 | else None, 149 | "creation_timestamp": instance.creation_timestamp, 150 | } 151 | ) 152 | 153 | return json.dumps( 154 | {"status": "success", "instances": instances, "count": len(instances)}, 155 | indent=2, 156 | ) 157 | except Exception as e: 158 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 159 | 160 | @mcp_instance.tool() 161 | def start_instance( 162 | instance_name: str, 163 | project_id: str = None, 164 | zone: str = None, 165 | ) -> str: 166 | """ 167 | Start a stopped Compute Engine instance 168 | 169 | Args: 170 | instance_name: Name of the instance to start 171 | project_id: GCP project ID (defaults to configured project) 172 | zone: GCP zone (defaults to configured location) 173 | """ 174 | try: 175 | # Get client from client_instances 176 | client = client_instances.get_clients().compute 177 | project_id = project_id or client_instances.get_project_id() 178 | zone = zone or client_instances.get_location() 179 | 180 | print(f"Starting instance {instance_name}...") 181 | operation = client.start( 182 | project=project_id, zone=zone, instance=instance_name 183 | ) 184 | operation.result() 185 | 186 | return json.dumps( 187 | { 188 | "status": "success", 189 | "instance_name": instance_name, 190 | "operation_status": "DONE", 191 | }, 192 | indent=2, 193 | ) 194 | except Exception as e: 195 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 196 | 197 | @mcp_instance.tool() 198 | def stop_instance( 199 | instance_name: str, 200 | project_id: str = None, 201 | zone: str = None, 202 | ) -> str: 203 | """ 204 | Stop a running Compute Engine instance 205 | 206 | Args: 207 | instance_name: Name of the instance to stop 208 | project_id: GCP project ID (defaults to configured project) 209 | zone: GCP zone (defaults to configured location) 210 | """ 211 | try: 212 | # Get client from client_instances 213 | client = client_instances.get_clients().compute 214 | project_id = project_id or client_instances.get_project_id() 215 | zone = zone or client_instances.get_location() 216 | 217 | print(f"Stopping instance {instance_name}...") 218 | operation = client.stop( 219 | project=project_id, zone=zone, instance=instance_name 220 | ) 221 | operation.result() 222 | 223 | return json.dumps( 224 | { 225 | "status": "success", 226 | "instance_name": instance_name, 227 | "operation_status": "DONE", 228 | }, 229 | indent=2, 230 | ) 231 | except Exception as e: 232 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 233 | 234 | @mcp_instance.tool() 235 | def delete_instance( 236 | instance_name: str, 237 | project_id: str = None, 238 | zone: str = None, 239 | ) -> str: 240 | """ 241 | Delete a Compute Engine instance 242 | 243 | Args: 244 | instance_name: Name of the instance to delete 245 | project_id: GCP project ID (defaults to configured project) 246 | zone: GCP zone (defaults to configured location) 247 | """ 248 | try: 249 | # Get client from client_instances 250 | client = client_instances.get_clients().compute 251 | project_id = project_id or client_instances.get_project_id() 252 | zone = zone or client_instances.get_location() 253 | 254 | print(f"Deleting instance {instance_name}...") 255 | operation = client.delete( 256 | project=project_id, zone=zone, instance=instance_name 257 | ) 258 | operation.result() 259 | 260 | return json.dumps( 261 | { 262 | "status": "success", 263 | "instance_name": instance_name, 264 | "operation_status": "DONE", 265 | }, 266 | indent=2, 267 | ) 268 | except Exception as e: 269 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 270 | 271 | # Prompts 272 | @mcp_instance.prompt() 273 | def instance_config_prompt(workload_type: str = "web-server") -> str: 274 | """Prompt for creating Compute Engine configurations""" 275 | return f""" 276 | I need to configure a Compute Engine instance for {workload_type}. Please help with: 277 | 278 | 1. Recommended machine types for {workload_type} 279 | 2. Disk type and size recommendations 280 | 3. Network configuration best practices 281 | 4. Security considerations (service accounts, firewall rules) 282 | 5. Cost optimization strategies 283 | """ 284 | 285 | @mcp_instance.prompt() 286 | def troubleshooting_prompt(issue: str = "instance not responding") -> str: 287 | """Prompt for troubleshooting Compute Engine issues""" 288 | return f""" 289 | I'm experiencing {issue} with my Compute Engine instance. Please guide me through: 290 | 291 | 1. Common causes for this issue 292 | 2. Diagnostic steps using Cloud Console and CLI 293 | 3. Log analysis techniques 294 | 4. Recovery procedures 295 | 5. Prevention strategies 296 | """ 297 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/cloud_audit_logs.py: -------------------------------------------------------------------------------- ```python 1 | import json 2 | 3 | from google.cloud import logging_v2 4 | from services import client_instances 5 | 6 | 7 | def register(mcp_instance): 8 | """Register all Cloud Audit Logs resources and tools with the MCP instance.""" 9 | 10 | # Resources 11 | @mcp_instance.resource("gcp://audit-logs/{project_id}") 12 | def list_audit_logs_resource(project_id: str = None) -> str: 13 | """List recent audit logs from a specific project""" 14 | try: 15 | # Get client from client_instances 16 | client = client_instances.get_clients().logging 17 | project_id = project_id or client_instances.get_project_id() 18 | 19 | # Filter for audit logs only 20 | filter_str = 'logName:"cloudaudit.googleapis.com"' 21 | 22 | # List log entries 23 | entries = client.list_log_entries( 24 | request={ 25 | "resource_names": [f"projects/{project_id}"], 26 | "filter": filter_str, 27 | "page_size": 10, # Limiting to 10 for responsiveness 28 | } 29 | ) 30 | 31 | result = [] 32 | for entry in entries: 33 | log_data = { 34 | "timestamp": entry.timestamp.isoformat() 35 | if entry.timestamp 36 | else None, 37 | "severity": entry.severity.name if entry.severity else None, 38 | "log_name": entry.log_name, 39 | "resource": { 40 | "type": entry.resource.type, 41 | "labels": dict(entry.resource.labels) 42 | if entry.resource.labels 43 | else {}, 44 | } 45 | if entry.resource 46 | else {}, 47 | } 48 | 49 | # Handle payload based on type 50 | if hasattr(entry, "json_payload") and entry.json_payload: 51 | log_data["payload"] = dict(entry.json_payload) 52 | elif hasattr(entry, "proto_payload") and entry.proto_payload: 53 | log_data["payload"] = "Proto payload (details omitted)" 54 | elif hasattr(entry, "text_payload") and entry.text_payload: 55 | log_data["payload"] = entry.text_payload 56 | 57 | result.append(log_data) 58 | 59 | return json.dumps(result, indent=2) 60 | except Exception as e: 61 | return json.dumps({"error": str(e)}, indent=2) 62 | 63 | @mcp_instance.resource("gcp://audit-logs/{project_id}/sinks") 64 | def list_log_sinks_resource(project_id: str = None) -> str: 65 | """List log sinks configured for a project""" 66 | try: 67 | # Get client from client_instances 68 | client = client_instances.get_clients().logging 69 | project_id = project_id or client_instances.get_project_id() 70 | 71 | parent = f"projects/{project_id}" 72 | sinks = client.list_sinks(parent=parent) 73 | 74 | result = [] 75 | for sink in sinks: 76 | result.append( 77 | { 78 | "name": sink.name, 79 | "destination": sink.destination, 80 | "filter": sink.filter, 81 | "description": sink.description, 82 | "disabled": sink.disabled, 83 | } 84 | ) 85 | 86 | return json.dumps(result, indent=2) 87 | except Exception as e: 88 | return json.dumps({"error": str(e)}, indent=2) 89 | 90 | # Tools 91 | @mcp_instance.tool() 92 | def list_audit_logs( 93 | project_id: str = None, 94 | filter_str: str = 'logName:"cloudaudit.googleapis.com"', 95 | page_size: int = 20, 96 | ) -> str: 97 | """ 98 | List Google Cloud Audit logs from a project 99 | 100 | Args: 101 | project_id: GCP project ID (defaults to configured project) 102 | filter_str: Log filter expression (defaults to audit logs) 103 | page_size: Maximum number of entries to return 104 | """ 105 | try: 106 | # Get client from client_instances 107 | client = client_instances.get_clients().logging 108 | project_id = project_id or client_instances.get_project_id() 109 | 110 | print(f"Retrieving audit logs from project {project_id}...") 111 | 112 | # List log entries 113 | entries = client.list_log_entries( 114 | request={ 115 | "resource_names": [f"projects/{project_id}"], 116 | "filter": filter_str, 117 | "page_size": page_size, 118 | } 119 | ) 120 | 121 | result = [] 122 | for entry in entries: 123 | log_data = { 124 | "timestamp": entry.timestamp.isoformat() 125 | if entry.timestamp 126 | else None, 127 | "severity": entry.severity.name if entry.severity else None, 128 | "log_name": entry.log_name, 129 | "resource": { 130 | "type": entry.resource.type, 131 | "labels": dict(entry.resource.labels) 132 | if entry.resource.labels 133 | else {}, 134 | } 135 | if entry.resource 136 | else {}, 137 | } 138 | 139 | # Handle payload based on type 140 | if hasattr(entry, "json_payload") and entry.json_payload: 141 | log_data["payload"] = dict(entry.json_payload) 142 | elif hasattr(entry, "proto_payload") and entry.proto_payload: 143 | log_data["payload"] = "Proto payload (details omitted)" 144 | elif hasattr(entry, "text_payload") and entry.text_payload: 145 | log_data["payload"] = entry.text_payload 146 | 147 | result.append(log_data) 148 | 149 | return json.dumps( 150 | { 151 | "status": "success", 152 | "entries": result, 153 | "count": len(result), 154 | }, 155 | indent=2, 156 | ) 157 | except Exception as e: 158 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 159 | 160 | @mcp_instance.tool() 161 | def filter_admin_activity_logs( 162 | project_id: str = None, 163 | service_name: str = "", 164 | resource_type: str = "", 165 | time_range: str = "1h", 166 | page_size: int = 20, 167 | ) -> str: 168 | """ 169 | Filter Admin Activity audit logs for specific services or resource types 170 | 171 | Args: 172 | project_id: GCP project ID (defaults to configured project) 173 | service_name: Optional service name to filter by (e.g., compute.googleapis.com) 174 | resource_type: Optional resource type to filter by (e.g., gce_instance) 175 | time_range: Time range for logs (e.g., 1h, 24h, 7d) 176 | page_size: Maximum number of entries to return 177 | """ 178 | try: 179 | # Get client from client_instances 180 | client = client_instances.get_clients().logging 181 | project_id = project_id or client_instances.get_project_id() 182 | 183 | # Build filter string 184 | filter_parts = ['logName:"cloudaudit.googleapis.com%2Factivity"'] 185 | 186 | if service_name: 187 | filter_parts.append(f'protoPayload.serviceName="{service_name}"') 188 | 189 | if resource_type: 190 | filter_parts.append(f'resource.type="{resource_type}"') 191 | 192 | if time_range: 193 | filter_parts.append(f'timestamp >= "-{time_range}"') 194 | 195 | filter_str = " AND ".join(filter_parts) 196 | 197 | print(f"Filtering Admin Activity logs with: {filter_str}") 198 | 199 | # List log entries 200 | entries = client.list_log_entries( 201 | request={ 202 | "resource_names": [f"projects/{project_id}"], 203 | "filter": filter_str, 204 | "page_size": page_size, 205 | } 206 | ) 207 | 208 | result = [] 209 | for entry in entries: 210 | # Extract relevant fields for admin activity logs 211 | log_data = { 212 | "timestamp": entry.timestamp.isoformat() 213 | if entry.timestamp 214 | else None, 215 | "method_name": None, 216 | "resource_name": None, 217 | "service_name": None, 218 | "user": None, 219 | } 220 | 221 | # Extract data from proto payload 222 | if hasattr(entry, "proto_payload") and entry.proto_payload: 223 | payload = entry.proto_payload 224 | if hasattr(payload, "method_name"): 225 | log_data["method_name"] = payload.method_name 226 | if hasattr(payload, "resource_name"): 227 | log_data["resource_name"] = payload.resource_name 228 | if hasattr(payload, "service_name"): 229 | log_data["service_name"] = payload.service_name 230 | 231 | # Extract authentication info 232 | if hasattr(payload, "authentication_info"): 233 | auth_info = payload.authentication_info 234 | if hasattr(auth_info, "principal_email"): 235 | log_data["user"] = auth_info.principal_email 236 | 237 | result.append(log_data) 238 | 239 | return json.dumps( 240 | { 241 | "status": "success", 242 | "entries": result, 243 | "count": len(result), 244 | "filter": filter_str, 245 | }, 246 | indent=2, 247 | ) 248 | except Exception as e: 249 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 250 | 251 | @mcp_instance.tool() 252 | def create_log_sink( 253 | sink_name: str, 254 | destination: str, 255 | project_id: str = None, 256 | filter_str: str = 'logName:"cloudaudit.googleapis.com"', 257 | description: str = "", 258 | ) -> str: 259 | """ 260 | Create a log sink to export audit logs to a destination 261 | 262 | Args: 263 | sink_name: Name for the new log sink 264 | destination: Destination for logs (e.g., storage.googleapis.com/my-bucket) 265 | project_id: GCP project ID (defaults to configured project) 266 | filter_str: Log filter expression (defaults to audit logs) 267 | description: Optional description for the sink 268 | """ 269 | try: 270 | # Get client from client_instances 271 | client = client_instances.get_clients().logging 272 | project_id = project_id or client_instances.get_project_id() 273 | 274 | parent = f"projects/{project_id}" 275 | 276 | # Create sink configuration 277 | sink = logging_v2.LogSink( 278 | name=sink_name, 279 | destination=destination, 280 | filter=filter_str, 281 | description=description, 282 | ) 283 | 284 | print(f"Creating log sink '{sink_name}' to export to {destination}...") 285 | 286 | # Create the sink 287 | response = client.create_sink( 288 | request={ 289 | "parent": parent, 290 | "sink": sink, 291 | } 292 | ) 293 | 294 | # Important: Recommend setting up IAM permissions 295 | writer_identity = response.writer_identity 296 | 297 | return json.dumps( 298 | { 299 | "status": "success", 300 | "name": response.name, 301 | "destination": response.destination, 302 | "filter": response.filter, 303 | "writer_identity": writer_identity, 304 | "next_steps": f"Important: Grant {writer_identity} the appropriate permissions on the destination", 305 | }, 306 | indent=2, 307 | ) 308 | except Exception as e: 309 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 310 | 311 | # Prompts 312 | @mcp_instance.prompt() 313 | def audit_log_investigation() -> str: 314 | """Prompt for investigating security incidents through audit logs""" 315 | return """ 316 | I need to investigate a potential security incident in my Google Cloud project. 317 | 318 | Please help me: 319 | 1. Determine what types of audit logs I should check (Admin Activity, Data Access, System Event) 320 | 2. Create an effective filter query to find relevant logs 321 | 3. Identify key fields to examine for signs of unusual activity 322 | 4. Suggest common indicators of potential security issues in audit logs 323 | """ 324 | 325 | @mcp_instance.prompt() 326 | def log_export_setup() -> str: 327 | """Prompt for setting up log exports for compliance""" 328 | return """ 329 | I need to set up log exports for compliance requirements. 330 | 331 | Please help me: 332 | 1. Understand the different destinations available for log exports 333 | 2. Design an effective filter to capture all required audit events 334 | 3. Implement a log sink with appropriate permissions 335 | 4. Verify my setup is correctly capturing and exporting logs 336 | """ 337 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/cloud_storage.py: -------------------------------------------------------------------------------- ```python 1 | import json 2 | import os 3 | from typing import Dict, Optional 4 | 5 | from services import client_instances 6 | 7 | 8 | def register(mcp_instance): 9 | """Register all Cloud Storage resources and tools with the MCP instance.""" 10 | 11 | # Resources 12 | @mcp_instance.resource("gcp://storage/{project_id}/buckets") 13 | def list_buckets_resource(project_id: str = None) -> str: 14 | """List all Cloud Storage buckets in a project""" 15 | try: 16 | # Get client from client_instances 17 | client = client_instances.get_clients().storage 18 | project_id = project_id or client_instances.get_project_id() 19 | 20 | buckets = client.list_buckets() 21 | 22 | result = [] 23 | for bucket in buckets: 24 | result.append( 25 | { 26 | "name": bucket.name, 27 | "location": bucket.location, 28 | "storage_class": bucket.storage_class, 29 | "time_created": bucket.time_created.isoformat() 30 | if bucket.time_created 31 | else None, 32 | "versioning_enabled": bucket.versioning_enabled, 33 | "labels": dict(bucket.labels) if bucket.labels else {}, 34 | } 35 | ) 36 | 37 | return json.dumps(result, indent=2) 38 | except Exception as e: 39 | return json.dumps({"error": str(e)}, indent=2) 40 | 41 | @mcp_instance.resource("gcp://storage/{project_id}/bucket/{bucket_name}") 42 | def get_bucket_resource(project_id: str = None, bucket_name: str = None) -> str: 43 | """Get details for a specific Cloud Storage bucket""" 44 | try: 45 | # Get client from client_instances 46 | client = client_instances.get_clients().storage 47 | project_id = project_id or client_instances.get_project_id() 48 | 49 | bucket = client.get_bucket(bucket_name) 50 | result = { 51 | "name": bucket.name, 52 | "location": bucket.location, 53 | "storage_class": bucket.storage_class, 54 | "time_created": bucket.time_created.isoformat() 55 | if bucket.time_created 56 | else None, 57 | "versioning_enabled": bucket.versioning_enabled, 58 | "requester_pays": bucket.requester_pays, 59 | "lifecycle_rules": bucket.lifecycle_rules, 60 | "cors": bucket.cors, 61 | "labels": dict(bucket.labels) if bucket.labels else {}, 62 | } 63 | return json.dumps(result, indent=2) 64 | except Exception as e: 65 | return json.dumps({"error": str(e)}, indent=2) 66 | 67 | @mcp_instance.resource("gcp://storage/{project_id}/bucket/{bucket_name}/objects") 68 | def list_objects_resource(project_id: str = None, bucket_name: str = None) -> str: 69 | """List objects in a specific Cloud Storage bucket""" 70 | prefix = "" # Move it inside the function with a default value 71 | try: 72 | # Get client from client_instances 73 | client = client_instances.get_clients().storage 74 | project_id = project_id or client_instances.get_project_id() 75 | 76 | bucket = client.get_bucket(bucket_name) 77 | blobs = bucket.list_blobs(prefix=prefix) 78 | 79 | result = [] 80 | for blob in blobs: 81 | result.append( 82 | { 83 | "name": blob.name, 84 | "size": blob.size, 85 | "updated": blob.updated.isoformat() if blob.updated else None, 86 | "content_type": blob.content_type, 87 | "md5_hash": blob.md5_hash, 88 | "generation": blob.generation, 89 | "metadata": blob.metadata, 90 | } 91 | ) 92 | 93 | return json.dumps(result, indent=2) 94 | except Exception as e: 95 | return json.dumps({"error": str(e)}, indent=2) 96 | 97 | # Tools 98 | @mcp_instance.tool() 99 | def create_bucket( 100 | bucket_name: str, 101 | project_id: str = None, 102 | location: str = "us-central1", 103 | storage_class: str = "STANDARD", 104 | labels: Optional[Dict[str, str]] = None, 105 | versioning_enabled: bool = False, 106 | ) -> str: 107 | """ 108 | Create a Cloud Storage bucket 109 | 110 | Args: 111 | bucket_name: Name for the new bucket 112 | project_id: GCP project ID (defaults to configured project) 113 | location: GCP region (e.g., us-central1) 114 | storage_class: Storage class (STANDARD, NEARLINE, COLDLINE, ARCHIVE) 115 | labels: Optional key-value pairs for bucket labels 116 | versioning_enabled: Whether to enable object versioning 117 | """ 118 | try: 119 | # Get client from client_instances 120 | client = client_instances.get_clients().storage 121 | project_id = project_id or client_instances.get_project_id() 122 | 123 | # Validate storage class 124 | valid_storage_classes = ["STANDARD", "NEARLINE", "COLDLINE", "ARCHIVE"] 125 | if storage_class not in valid_storage_classes: 126 | return json.dumps( 127 | { 128 | "status": "error", 129 | "message": f"Invalid storage class: {storage_class}. Valid classes are: {', '.join(valid_storage_classes)}", 130 | }, 131 | indent=2, 132 | ) 133 | 134 | # Log info (similar to ctx.info) 135 | print(f"Creating bucket {bucket_name} in {location}...") 136 | bucket = client.bucket(bucket_name) 137 | bucket.create(location=location, storage_class=storage_class, labels=labels) 138 | 139 | # Set versioning if enabled 140 | if versioning_enabled: 141 | bucket.versioning_enabled = True 142 | bucket.patch() 143 | 144 | return json.dumps( 145 | { 146 | "status": "success", 147 | "name": bucket.name, 148 | "location": bucket.location, 149 | "storage_class": bucket.storage_class, 150 | "time_created": bucket.time_created.isoformat() 151 | if bucket.time_created 152 | else None, 153 | "versioning_enabled": bucket.versioning_enabled, 154 | "url": f"gs://{bucket_name}/", 155 | }, 156 | indent=2, 157 | ) 158 | except Exception as e: 159 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 160 | 161 | @mcp_instance.tool() 162 | def list_buckets(project_id: str = None, prefix: str = "") -> str: 163 | """ 164 | List Cloud Storage buckets in a project 165 | 166 | Args: 167 | project_id: GCP project ID (defaults to configured project) 168 | prefix: Optional prefix to filter bucket names 169 | """ 170 | try: 171 | # Get client from client_instances 172 | client = client_instances.get_clients().storage 173 | project_id = project_id or client_instances.get_project_id() 174 | 175 | # Log info (similar to ctx.info) 176 | print(f"Listing buckets in project {project_id}...") 177 | 178 | # List buckets with optional prefix filter 179 | if prefix: 180 | buckets = [ 181 | b for b in client.list_buckets() if b.name.startswith(prefix) 182 | ] 183 | else: 184 | buckets = list(client.list_buckets()) 185 | 186 | result = [] 187 | for bucket in buckets: 188 | result.append( 189 | { 190 | "name": bucket.name, 191 | "location": bucket.location, 192 | "storage_class": bucket.storage_class, 193 | "time_created": bucket.time_created.isoformat() 194 | if bucket.time_created 195 | else None, 196 | } 197 | ) 198 | 199 | return json.dumps( 200 | {"status": "success", "buckets": result, "count": len(result)}, indent=2 201 | ) 202 | except Exception as e: 203 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 204 | 205 | @mcp_instance.tool() 206 | def upload_object( 207 | bucket_name: str, 208 | source_file_path: str, 209 | destination_blob_name: Optional[str] = None, 210 | project_id: str = None, 211 | content_type: Optional[str] = None, 212 | metadata: Optional[Dict[str, str]] = None, 213 | ) -> str: 214 | """ 215 | Upload an object to a Cloud Storage bucket 216 | 217 | Args: 218 | bucket_name: Name of the bucket 219 | source_file_path: Local path to the file to upload 220 | destination_blob_name: Name to assign to the blob (defaults to file basename) 221 | project_id: GCP project ID (defaults to configured project) 222 | content_type: Content type of the object (optional) 223 | metadata: Custom metadata dictionary (optional) 224 | """ 225 | try: 226 | # Get client from client_instances 227 | client = client_instances.get_clients().storage 228 | project_id = project_id or client_instances.get_project_id() 229 | 230 | # Check if file exists 231 | if not os.path.exists(source_file_path): 232 | return json.dumps( 233 | { 234 | "status": "error", 235 | "message": f"File not found: {source_file_path}", 236 | }, 237 | indent=2, 238 | ) 239 | 240 | # Get bucket 241 | bucket = client.bucket(bucket_name) 242 | 243 | # Use filename if destination_blob_name not provided 244 | if not destination_blob_name: 245 | destination_blob_name = os.path.basename(source_file_path) 246 | 247 | # Create blob 248 | blob = bucket.blob(destination_blob_name) 249 | 250 | # Set content type if provided 251 | if content_type: 252 | blob.content_type = content_type 253 | 254 | # Set metadata if provided 255 | if metadata: 256 | blob.metadata = metadata 257 | 258 | # Get file size for progress reporting 259 | file_size = os.path.getsize(source_file_path) 260 | print( 261 | f"Uploading {source_file_path} ({file_size} bytes) to gs://{bucket_name}/{destination_blob_name}..." 262 | ) 263 | 264 | # Upload file 265 | blob.upload_from_filename(source_file_path) 266 | 267 | return json.dumps( 268 | { 269 | "status": "success", 270 | "bucket": bucket_name, 271 | "name": blob.name, 272 | "size": blob.size, 273 | "content_type": blob.content_type, 274 | "md5_hash": blob.md5_hash, 275 | "generation": blob.generation, 276 | "public_url": blob.public_url, 277 | "gsutil_uri": f"gs://{bucket_name}/{destination_blob_name}", 278 | }, 279 | indent=2, 280 | ) 281 | except Exception as e: 282 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 283 | 284 | @mcp_instance.tool() 285 | def download_object( 286 | bucket_name: str, 287 | source_blob_name: str, 288 | destination_file_path: str, 289 | project_id: str = None, 290 | ) -> str: 291 | """ 292 | Download an object from a Cloud Storage bucket 293 | 294 | Args: 295 | bucket_name: Name of the bucket 296 | source_blob_name: Name of the blob to download 297 | destination_file_path: Local path where the file should be saved 298 | project_id: GCP project ID (defaults to configured project) 299 | """ 300 | try: 301 | # Get client from client_instances 302 | client = client_instances.get_clients().storage 303 | project_id = project_id or client_instances.get_project_id() 304 | 305 | # Get bucket and blob 306 | bucket = client.bucket(bucket_name) 307 | blob = bucket.blob(source_blob_name) 308 | 309 | # Check if blob exists 310 | if not blob.exists(): 311 | return json.dumps( 312 | { 313 | "status": "error", 314 | "message": f"Object not found: gs://{bucket_name}/{source_blob_name}", 315 | }, 316 | indent=2, 317 | ) 318 | 319 | # Create directory if doesn't exist 320 | os.makedirs( 321 | os.path.dirname(os.path.abspath(destination_file_path)), exist_ok=True 322 | ) 323 | 324 | print( 325 | f"Downloading gs://{bucket_name}/{source_blob_name} to {destination_file_path}..." 326 | ) 327 | 328 | # Download file 329 | blob.download_to_filename(destination_file_path) 330 | 331 | return json.dumps( 332 | { 333 | "status": "success", 334 | "bucket": bucket_name, 335 | "blob_name": source_blob_name, 336 | "size": blob.size, 337 | "content_type": blob.content_type, 338 | "downloaded_to": destination_file_path, 339 | }, 340 | indent=2, 341 | ) 342 | except Exception as e: 343 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 344 | 345 | @mcp_instance.tool() 346 | def delete_object(bucket_name: str, blob_name: str, project_id: str = None) -> str: 347 | """ 348 | Delete an object from a Cloud Storage bucket 349 | 350 | Args: 351 | bucket_name: Name of the bucket 352 | blob_name: Name of the blob to delete 353 | project_id: GCP project ID (defaults to configured project) 354 | """ 355 | try: 356 | # Get client from client_instances 357 | client = client_instances.get_clients().storage 358 | project_id = project_id or client_instances.get_project_id() 359 | 360 | # Get bucket and blob 361 | bucket = client.bucket(bucket_name) 362 | blob = bucket.blob(blob_name) 363 | 364 | print(f"Deleting gs://{bucket_name}/{blob_name}...") 365 | 366 | # Delete the blob 367 | blob.delete() 368 | 369 | return json.dumps( 370 | { 371 | "status": "success", 372 | "message": f"Successfully deleted gs://{bucket_name}/{blob_name}", 373 | }, 374 | indent=2, 375 | ) 376 | except Exception as e: 377 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 378 | 379 | # Prompts 380 | @mcp_instance.prompt() 381 | def create_bucket_prompt(location: str = "us-central1") -> str: 382 | """Prompt for creating a new Cloud Storage bucket""" 383 | return f""" 384 | I need to create a new Cloud Storage bucket in {location}. 385 | 386 | Please help me with: 387 | 1. Understanding storage classes (STANDARD, NEARLINE, COLDLINE, ARCHIVE) 388 | 2. Best practices for bucket naming 389 | 3. When to enable versioning 390 | 4. Recommendations for bucket security settings 391 | 5. Steps to create the bucket 392 | """ 393 | 394 | @mcp_instance.prompt() 395 | def upload_file_prompt() -> str: 396 | """Prompt for help with uploading files to Cloud Storage""" 397 | return """ 398 | I need to upload files to a Cloud Storage bucket. 399 | 400 | Please help me understand: 401 | 1. The best way to organize files in Cloud Storage 402 | 2. When to use folders/prefixes 403 | 3. How to set appropriate permissions on uploaded files 404 | 4. How to make files publicly accessible (if needed) 405 | 5. The steps to perform the upload 406 | """ 407 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/artifact_registry.py: -------------------------------------------------------------------------------- ```python 1 | import json 2 | 3 | from services import client_instances 4 | 5 | 6 | def register(mcp_instance): 7 | """Register all resources and tools with the MCP instance.""" 8 | 9 | @mcp_instance.resource( 10 | "gcp://artifactregistry/{project_id}/{location}/repositories" 11 | ) 12 | def list_repositories_resource(project_id: str = None, location: str = None) -> str: 13 | """List all Artifact Registry repositories in a specific location""" 14 | try: 15 | # define the client 16 | client = client_instances.get_clients().artifactregistry 17 | project_id = project_id or client_instances.get_project_id() 18 | location = location or client_instances.get_location() 19 | 20 | # Use parameters directly from URL path 21 | parent = f"projects/{project_id}/locations/{location}" 22 | 23 | repositories = client.list_repositories(parent=parent) 24 | result = [] 25 | for repo in repositories: 26 | result.append( 27 | { 28 | "name": repo.name.split("/")[-1], 29 | "format": repo.format.name 30 | if hasattr(repo.format, "name") 31 | else str(repo.format), 32 | "description": repo.description, 33 | "create_time": repo.create_time.isoformat() 34 | if repo.create_time 35 | else None, 36 | "update_time": repo.update_time.isoformat() 37 | if repo.update_time 38 | else None, 39 | "kms_key_name": repo.kms_key_name, 40 | "labels": dict(repo.labels) if repo.labels else {}, 41 | } 42 | ) 43 | return json.dumps(result, indent=2) 44 | except Exception as e: 45 | return json.dumps({"error": str(e)}, indent=2) 46 | 47 | # Add a tool for creating repositories 48 | @mcp_instance.tool() 49 | def create_artifact_repository( 50 | name: str, format: str, description: str = "" 51 | ) -> str: 52 | """Create a new Artifact Registry repository""" 53 | try: 54 | # define the client 55 | client = client_instances.get_clients().artifactregistry 56 | project_id = client_instances.get_project_id() 57 | location = client_instances.get_location() 58 | 59 | parent = f"projects/{project_id}/locations/{location}" 60 | 61 | # Create repository request 62 | from google.cloud.artifactregistry_v1 import ( 63 | CreateRepositoryRequest, 64 | Repository, 65 | ) 66 | 67 | # Create the repository format enum value 68 | if format.upper() not in ["DOCKER", "MAVEN", "NPM", "PYTHON", "APT", "YUM"]: 69 | return json.dumps( 70 | { 71 | "error": f"Invalid format: {format}. Must be one of: DOCKER, MAVEN, NPM, PYTHON, APT, YUM" 72 | }, 73 | indent=2, 74 | ) 75 | 76 | repo = Repository( 77 | name=f"{parent}/repositories/{name}", 78 | format=getattr(Repository.Format, format.upper()), 79 | description=description, 80 | ) 81 | 82 | request = CreateRepositoryRequest( 83 | parent=parent, repository_id=name, repository=repo 84 | ) 85 | 86 | operation = client.create_repository(request=request) 87 | result = operation.result() # Wait for operation to complete 88 | 89 | return json.dumps( 90 | { 91 | "name": result.name.split("/")[-1], 92 | "format": result.format.name 93 | if hasattr(result.format, "name") 94 | else str(result.format), 95 | "description": result.description, 96 | "create_time": result.create_time.isoformat() 97 | if result.create_time 98 | else None, 99 | "update_time": result.update_time.isoformat() 100 | if result.update_time 101 | else None, 102 | }, 103 | indent=2, 104 | ) 105 | 106 | except Exception as e: 107 | return json.dumps({"error": str(e)}, indent=2) 108 | 109 | 110 | # @mcp_instance.tool() 111 | # def get_artifact_repository( 112 | # project_id: str = None, 113 | # location: str = None, 114 | # repository_id: str = None, 115 | # ) -> str: 116 | # """Get details about a specific Artifact Registry repository""" 117 | # try: 118 | # # Get the client from the context 119 | # clients = ctx.lifespan_context["clients"] 120 | # client = clients.artifactregistry 121 | 122 | # # Use context values if parameters not provided 123 | # project_id = project_id or ctx.lifespan_context["project_id"] 124 | # location = location or ctx.lifespan_context["location"] 125 | 126 | # if not repository_id: 127 | # return json.dumps({"error": "Repository ID is required"}, indent=2) 128 | 129 | # name = f"projects/{project_id}/locations/{location}/repositories/{repository_id}" 130 | 131 | # repo = client.get_repository(name=name) 132 | # result = { 133 | # "name": repo.name.split("/")[-1], 134 | # "format": repo.format.name 135 | # if hasattr(repo.format, "name") 136 | # else str(repo.format), 137 | # "description": repo.description, 138 | # "create_time": repo.create_time.isoformat() 139 | # if repo.create_time 140 | # else None, 141 | # "update_time": repo.update_time.isoformat() 142 | # if repo.update_time 143 | # else None, 144 | # "kms_key_name": repo.kms_key_name, 145 | # "labels": dict(repo.labels) if repo.labels else {}, 146 | # } 147 | # return json.dumps(result, indent=2) 148 | # except Exception as e: 149 | # return json.dumps({"error": str(e)}, indent=2) 150 | 151 | # @mcp_instance.tool() 152 | # def upload_artifact( 153 | # repo_name: str, 154 | # artifact_path: str, 155 | # package: str = "", 156 | # version: str = "", 157 | # project_id: str = None, 158 | # location: str = None, 159 | # ) -> str: 160 | # """ 161 | # Upload an artifact to Artifact Registry 162 | 163 | # Args: 164 | # project_id: GCP project ID 165 | # location: GCP region (e.g., us-central1) 166 | # repo_name: Name of the repository 167 | # artifact_path: Local path to the artifact file 168 | # package: Package name (optional) 169 | # version: Version string (optional) 170 | # """ 171 | # import os 172 | 173 | # try: 174 | # # Get the client from the context 175 | # clients = ctx.lifespan_context["clients"] 176 | # client = clients.artifactregistry 177 | 178 | # # Use context values if parameters not provided 179 | # project_id = project_id or ctx.lifespan_context["project_id"] 180 | # location = location or ctx.lifespan_context["location"] 181 | 182 | # if not os.path.exists(artifact_path): 183 | # return json.dumps( 184 | # {"status": "error", "message": f"File not found: {artifact_path}"} 185 | # ) 186 | 187 | # filename = os.path.basename(artifact_path) 188 | # file_size = os.path.getsize(artifact_path) 189 | 190 | # parent = ( 191 | # f"projects/{project_id}/locations/{location}/repositories/{repo_name}" 192 | # ) 193 | 194 | # with open(artifact_path, "rb") as f: 195 | # file_contents = f.read() 196 | # # Use the standard client for upload 197 | # result = client.upload_artifact( 198 | # parent=parent, contents=file_contents, artifact_path=filename 199 | # ) 200 | 201 | # return json.dumps( 202 | # { 203 | # "status": "success", 204 | # "uri": result.uri if hasattr(result, "uri") else None, 205 | # "message": f"Successfully uploaded {filename}", 206 | # }, 207 | # indent=2, 208 | # ) 209 | # except Exception as e: 210 | # return json.dumps({"status": "error", "message": str(e)}, indent=2) 211 | 212 | # # Prompts 213 | # @mcp_instance.prompt() 214 | # def create_repository_prompt(location: str = "us-central1") -> str: 215 | # """Prompt for creating a new Artifact Registry repository""" 216 | # return f""" 217 | # I need to create a new Artifact Registry repository in {location}. 218 | 219 | # Please help me with: 220 | # 1. What formats are available (Docker, NPM, Maven, etc.) 221 | # 2. Best practices for naming repositories 222 | # 3. Recommendations for labels I should apply 223 | # 4. Steps to create the repository 224 | # """ 225 | 226 | # @mcp_instance.prompt() 227 | # def upload_artifact_prompt(repo_format: str = "DOCKER") -> str: 228 | # """Prompt for help with uploading artifacts""" 229 | # return f""" 230 | # I need to upload a {repo_format.lower()} artifact to my Artifact Registry repository. 231 | 232 | # Please help me understand: 233 | # 1. The recommended way to upload {repo_format.lower()} artifacts 234 | # 2. Any naming conventions I should follow 235 | # 3. How to ensure proper versioning 236 | # 4. The steps to perform the upload 237 | # """ 238 | 239 | 240 | # import json 241 | # import os 242 | # from typing import Optional 243 | 244 | # from google.cloud import artifactregistry_v1 245 | # from mcp.server.fastmcp import Context 246 | 247 | # # Instead of importing from core.server 248 | 249 | 250 | # # Resources 251 | # @mcp.resource("gcp://artifactregistry/{project_id}/{location}/repositories") 252 | # def list_repositories_resource( 253 | # project_id: str = None, location: str = None, ctx: Context = None 254 | # ) -> str: 255 | # """List all Artifact Registry repositories in a specific location""" 256 | # client = ctx.clients.artifactregistry 257 | # project_id = project_id or ctx.clients.project_id 258 | # location = location or ctx.clients.location 259 | 260 | # parent = f"projects/{project_id}/locations/{location}" 261 | # repositories = client.list_repositories(parent=parent) 262 | 263 | # result = [] 264 | # for repo in repositories: 265 | # result.append( 266 | # { 267 | # "name": repo.name.split("/")[-1], 268 | # "format": repo.format.name, 269 | # "description": repo.description, 270 | # "create_time": repo.create_time.isoformat() 271 | # if repo.create_time 272 | # else None, 273 | # "update_time": repo.update_time.isoformat() 274 | # if repo.update_time 275 | # else None, 276 | # "kms_key_name": repo.kms_key_name, 277 | # "labels": dict(repo.labels) if repo.labels else {}, 278 | # } 279 | # ) 280 | 281 | # return json.dumps(result, indent=2) 282 | 283 | 284 | # @mcp.resource("gcp://artifactregistry/{project_id}/{location}/repository/{repo_name}") 285 | # def get_repository_resource( 286 | # repo_name: str, project_id: str = None, location: str = None, ctx: Context = None 287 | # ) -> str: 288 | # """Get details for a specific Artifact Registry repository""" 289 | # client = ctx.clients.artifactregistry 290 | # project_id = project_id or ctx.clients.project_id 291 | # location = location or ctx.clients.location 292 | 293 | # name = f"projects/{project_id}/locations/{location}/repositories/{repo_name}" 294 | 295 | # try: 296 | # repo = client.get_repository(name=name) 297 | # result = { 298 | # "name": repo.name.split("/")[-1], 299 | # "format": repo.format.name, 300 | # "description": repo.description, 301 | # "create_time": repo.create_time.isoformat() if repo.create_time else None, 302 | # "update_time": repo.update_time.isoformat() if repo.update_time else None, 303 | # "kms_key_name": repo.kms_key_name, 304 | # "labels": dict(repo.labels) if repo.labels else {}, 305 | # } 306 | # return json.dumps(result, indent=2) 307 | # except Exception as e: 308 | # return json.dumps({"error": str(e)}) 309 | 310 | 311 | # @mcp.resource( 312 | # "gcp://artifactregistry/{project_id}/{location}/repository/{repo_name}/packages" 313 | # ) 314 | # def list_packages_resource( 315 | # repo_name: str, project_id: str = None, location: str = None, ctx: Context = None 316 | # ) -> str: 317 | # """List packages in a specific Artifact Registry repository""" 318 | # client = ctx.clients.artifactregistry 319 | # project_id = project_id or ctx.clients.project_id 320 | # location = location or ctx.clients.location 321 | 322 | # parent = f"projects/{project_id}/locations/{location}/repositories/{repo_name}" 323 | # packages = client.list_packages(parent=parent) 324 | 325 | # result = [] 326 | # for package in packages: 327 | # result.append( 328 | # { 329 | # "name": package.name.split("/")[-1], 330 | # "display_name": package.display_name, 331 | # "create_time": package.create_time.isoformat() 332 | # if package.create_time 333 | # else None, 334 | # "update_time": package.update_time.isoformat() 335 | # if package.update_time 336 | # else None, 337 | # } 338 | # ) 339 | 340 | # return json.dumps(result, indent=2) 341 | 342 | 343 | # # Tools 344 | # @mcp.tool() 345 | # async def create_repository( 346 | # repo_name: str, 347 | # format: str = "DOCKER", 348 | # description: str = "", 349 | # labels: Optional[dict] = None, 350 | # project_id: str = None, 351 | # location: str = None, 352 | # ctx: Context = None, 353 | # ) -> str: 354 | # """ 355 | # Create an Artifact Registry repository 356 | 357 | # Args: 358 | # project_id: GCP project ID 359 | # location: GCP region (e.g., us-central1) 360 | # repo_name: Name for the new repository 361 | # format: Repository format (DOCKER, NPM, PYTHON, MAVEN, APT, YUM, GO) 362 | # description: Optional description for the repository 363 | # labels: Optional key-value pairs for repository labels 364 | # """ 365 | # client = ctx.clients.artifactregistry 366 | # project_id = project_id or ctx.clients.project_id 367 | # location = location or ctx.clients.location 368 | 369 | # # Validate format 370 | # try: 371 | # format_enum = artifactregistry_v1.Repository.Format[format] 372 | # except KeyError: 373 | # return json.dumps( 374 | # { 375 | # "status": "error", 376 | # "message": f"Invalid format: {format}. Valid formats are: {', '.join(artifactregistry_v1.Repository.Format._member_names_)}", 377 | # } 378 | # ) 379 | 380 | # # Create repository 381 | # parent = f"projects/{project_id}/locations/{location}" 382 | # repository = artifactregistry_v1.Repository( 383 | # format=format_enum, 384 | # description=description, 385 | # ) 386 | 387 | # if labels: 388 | # repository.labels = labels 389 | 390 | # request = artifactregistry_v1.CreateRepositoryRequest( 391 | # parent=parent, repository_id=repo_name, repository=repository 392 | # ) 393 | 394 | # try: 395 | # ctx.info(f"Creating repository {repo_name} in {location}...") 396 | # response = client.create_repository(request=request) 397 | 398 | # return json.dumps( 399 | # { 400 | # "status": "success", 401 | # "name": response.name, 402 | # "format": response.format.name, 403 | # "description": response.description, 404 | # "create_time": response.create_time.isoformat() 405 | # if response.create_time 406 | # else None, 407 | # }, 408 | # indent=2, 409 | # ) 410 | # except Exception as e: 411 | # return json.dumps({"status": "error", "message": str(e)}) 412 | 413 | 414 | # @mcp.tool() 415 | # async def list_repositories( 416 | # project_id: str = None, location: str = None, ctx: Context = None 417 | # ) -> str: 418 | # """ 419 | # List Artifact Registry repositories in a specific location 420 | 421 | # Args: 422 | # project_id: GCP project ID 423 | # location: GCP region (e.g., us-central1) 424 | # """ 425 | # client = ctx.clients.artifactregistry 426 | # project_id = project_id or ctx.clients.project_id 427 | # location = location or ctx.clients.location 428 | 429 | # parent = f"projects/{project_id}/locations/{location}" 430 | 431 | # try: 432 | # ctx.info(f"Listing repositories in {location}...") 433 | # repositories = client.list_repositories(parent=parent) 434 | 435 | # result = [] 436 | # for repo in repositories: 437 | # result.append( 438 | # { 439 | # "name": repo.name.split("/")[-1], 440 | # "format": repo.format.name, 441 | # "description": repo.description, 442 | # "create_time": repo.create_time.isoformat() 443 | # if repo.create_time 444 | # else None, 445 | # } 446 | # ) 447 | 448 | # return json.dumps( 449 | # {"status": "success", "repositories": result, "count": len(result)}, 450 | # indent=2, 451 | # ) 452 | # except Exception as e: 453 | # return json.dumps({"status": "error", "message": str(e)}) 454 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/cloud_bigquery.py: -------------------------------------------------------------------------------- ```python 1 | import json 2 | from typing import Any, Dict, List, Optional 3 | 4 | from google.cloud import bigquery 5 | from google.cloud.exceptions import NotFound 6 | from services import client_instances 7 | 8 | 9 | def register(mcp_instance): 10 | """Register all BigQuery resources and tools with the MCP instance.""" 11 | 12 | # Resources 13 | @mcp_instance.resource("gcp://bigquery/{project_id}/datasets") 14 | def list_datasets_resource(project_id: str = None) -> str: 15 | """List all BigQuery datasets in a project""" 16 | try: 17 | # Get client from client_instances 18 | client = client_instances.get_clients().bigquery 19 | project_id = project_id or client_instances.get_project_id() 20 | 21 | datasets = list(client.list_datasets()) 22 | 23 | result = [] 24 | for dataset in datasets: 25 | result.append( 26 | { 27 | "id": dataset.dataset_id, 28 | "full_id": dataset.full_dataset_id, 29 | "friendly_name": dataset.friendly_name, 30 | "location": dataset.location, 31 | "labels": dict(dataset.labels) if dataset.labels else {}, 32 | "created": dataset.created.isoformat() 33 | if dataset.created 34 | else None, 35 | } 36 | ) 37 | 38 | return json.dumps(result, indent=2) 39 | except Exception as e: 40 | return json.dumps({"error": str(e)}, indent=2) 41 | 42 | @mcp_instance.resource("gcp://bigquery/{project_id}/dataset/{dataset_id}") 43 | def get_dataset_resource(project_id: str = None, dataset_id: str = None) -> str: 44 | """Get details for a specific BigQuery dataset""" 45 | try: 46 | # Get client from client_instances 47 | client = client_instances.get_clients().bigquery 48 | project_id = project_id or client_instances.get_project_id() 49 | 50 | dataset_ref = client.dataset(dataset_id) 51 | dataset = client.get_dataset(dataset_ref) 52 | 53 | result = { 54 | "id": dataset.dataset_id, 55 | "full_id": dataset.full_dataset_id, 56 | "friendly_name": dataset.friendly_name, 57 | "description": dataset.description, 58 | "location": dataset.location, 59 | "labels": dict(dataset.labels) if dataset.labels else {}, 60 | "created": dataset.created.isoformat() if dataset.created else None, 61 | "modified": dataset.modified.isoformat() if dataset.modified else None, 62 | "default_table_expiration_ms": dataset.default_table_expiration_ms, 63 | "default_partition_expiration_ms": dataset.default_partition_expiration_ms, 64 | } 65 | return json.dumps(result, indent=2) 66 | except NotFound: 67 | return json.dumps({"error": f"Dataset {dataset_id} not found"}, indent=2) 68 | except Exception as e: 69 | return json.dumps({"error": str(e)}, indent=2) 70 | 71 | @mcp_instance.resource("gcp://bigquery/{project_id}/dataset/{dataset_id}/tables") 72 | def list_tables_resource(project_id: str = None, dataset_id: str = None) -> str: 73 | """List all tables in a BigQuery dataset""" 74 | try: 75 | # Get client from client_instances 76 | client = client_instances.get_clients().bigquery 77 | project_id = project_id or client_instances.get_project_id() 78 | 79 | tables = list(client.list_tables(dataset_id)) 80 | 81 | result = [] 82 | for table in tables: 83 | result.append( 84 | { 85 | "id": table.table_id, 86 | "full_id": f"{table.project}.{table.dataset_id}.{table.table_id}", 87 | "table_type": table.table_type, 88 | } 89 | ) 90 | 91 | return json.dumps(result, indent=2) 92 | except NotFound: 93 | return json.dumps({"error": f"Dataset {dataset_id} not found"}, indent=2) 94 | except Exception as e: 95 | return json.dumps({"error": str(e)}, indent=2) 96 | 97 | @mcp_instance.resource( 98 | "gcp://bigquery/{project_id}/dataset/{dataset_id}/table/{table_id}" 99 | ) 100 | def get_table_resource( 101 | project_id: str = None, dataset_id: str = None, table_id: str = None 102 | ) -> str: 103 | """Get details for a specific BigQuery table""" 104 | try: 105 | # Get client from client_instances 106 | client = client_instances.get_clients().bigquery 107 | project_id = project_id or client_instances.get_project_id() 108 | 109 | table_ref = client.dataset(dataset_id).table(table_id) 110 | table = client.get_table(table_ref) 111 | 112 | # Extract schema information 113 | schema_fields = [] 114 | for field in table.schema: 115 | schema_fields.append( 116 | { 117 | "name": field.name, 118 | "type": field.field_type, 119 | "mode": field.mode, 120 | "description": field.description, 121 | } 122 | ) 123 | 124 | result = { 125 | "id": table.table_id, 126 | "full_id": f"{table.project}.{table.dataset_id}.{table.table_id}", 127 | "friendly_name": table.friendly_name, 128 | "description": table.description, 129 | "num_rows": table.num_rows, 130 | "num_bytes": table.num_bytes, 131 | "table_type": table.table_type, 132 | "created": table.created.isoformat() if table.created else None, 133 | "modified": table.modified.isoformat() if table.modified else None, 134 | "expires": table.expires.isoformat() if table.expires else None, 135 | "schema": schema_fields, 136 | "labels": dict(table.labels) if table.labels else {}, 137 | } 138 | return json.dumps(result, indent=2) 139 | except NotFound: 140 | return json.dumps( 141 | {"error": f"Table {dataset_id}.{table_id} not found"}, indent=2 142 | ) 143 | except Exception as e: 144 | return json.dumps({"error": str(e)}, indent=2) 145 | 146 | # Tools 147 | @mcp_instance.tool() 148 | def run_query( 149 | query: str, 150 | project_id: str = None, 151 | location: str = None, 152 | max_results: int = 100, 153 | use_legacy_sql: bool = False, 154 | timeout_ms: int = 30000, 155 | ) -> str: 156 | """ 157 | Run a BigQuery query and return the results 158 | 159 | Args: 160 | query: SQL query to execute 161 | project_id: GCP project ID (defaults to configured project) 162 | location: Optional BigQuery location (us, eu, etc.) 163 | max_results: Maximum number of rows to return 164 | use_legacy_sql: Whether to use legacy SQL syntax 165 | timeout_ms: Query timeout in milliseconds 166 | """ 167 | try: 168 | # Get client from client_instances 169 | client = client_instances.get_clients().bigquery 170 | project_id = project_id or client_instances.get_project_id() 171 | location = location or client_instances.get_location() 172 | 173 | job_config = bigquery.QueryJobConfig( 174 | use_legacy_sql=use_legacy_sql, 175 | ) 176 | 177 | # Log info (similar to ctx.info) 178 | print(f"Running query: {query[:100]}...") 179 | 180 | query_job = client.query( 181 | query, 182 | job_config=job_config, 183 | location=location, 184 | timeout=timeout_ms / 1000.0, 185 | ) 186 | 187 | # Wait for the query to complete 188 | results = query_job.result(max_results=max_results) 189 | 190 | # Get the schema 191 | schema = [field.name for field in results.schema] 192 | 193 | # Convert rows to a list of dictionaries 194 | rows = [] 195 | for row in results: 196 | row_dict = {} 197 | for key in schema: 198 | value = row[key] 199 | if hasattr(value, "isoformat"): # Handle datetime objects 200 | row_dict[key] = value.isoformat() 201 | else: 202 | row_dict[key] = value 203 | rows.append(row_dict) 204 | 205 | # Create summary statistics 206 | stats = { 207 | "total_rows": query_job.total_rows, 208 | "total_bytes_processed": query_job.total_bytes_processed, 209 | "total_bytes_billed": query_job.total_bytes_billed, 210 | "billing_tier": query_job.billing_tier, 211 | "created": query_job.created.isoformat() if query_job.created else None, 212 | "started": query_job.started.isoformat() if query_job.started else None, 213 | "ended": query_job.ended.isoformat() if query_job.ended else None, 214 | "duration_ms": (query_job.ended - query_job.started).total_seconds() 215 | * 1000 216 | if query_job.started and query_job.ended 217 | else None, 218 | } 219 | 220 | return json.dumps( 221 | { 222 | "status": "success", 223 | "schema": schema, 224 | "rows": rows, 225 | "returned_rows": len(rows), 226 | "stats": stats, 227 | }, 228 | indent=2, 229 | ) 230 | except Exception as e: 231 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 232 | 233 | @mcp_instance.tool() 234 | def create_dataset( 235 | dataset_id: str, 236 | project_id: str = None, 237 | location: str = None, 238 | description: str = "", 239 | friendly_name: str = None, 240 | labels: Optional[Dict[str, str]] = None, 241 | default_table_expiration_ms: Optional[int] = None, 242 | ) -> str: 243 | """ 244 | Create a new BigQuery dataset 245 | 246 | Args: 247 | dataset_id: ID for the new dataset 248 | project_id: GCP project ID (defaults to configured project) 249 | location: Dataset location (US, EU, asia-northeast1, etc.) 250 | description: Optional dataset description 251 | friendly_name: Optional user-friendly name 252 | labels: Optional key-value pairs for dataset labels 253 | default_table_expiration_ms: Default expiration time for tables in milliseconds 254 | """ 255 | try: 256 | # Get client from client_instances 257 | client = client_instances.get_clients().bigquery 258 | project_id = project_id or client_instances.get_project_id() 259 | location = location or client_instances.get_location() 260 | 261 | dataset = bigquery.Dataset(f"{project_id}.{dataset_id}") 262 | dataset.location = location 263 | 264 | if description: 265 | dataset.description = description 266 | if friendly_name: 267 | dataset.friendly_name = friendly_name 268 | if labels: 269 | dataset.labels = labels 270 | if default_table_expiration_ms: 271 | dataset.default_table_expiration_ms = default_table_expiration_ms 272 | 273 | # Log info (similar to ctx.info) 274 | print(f"Creating dataset {dataset_id} in {location}...") 275 | dataset = client.create_dataset(dataset) 276 | 277 | return json.dumps( 278 | { 279 | "status": "success", 280 | "dataset_id": dataset.dataset_id, 281 | "full_dataset_id": dataset.full_dataset_id, 282 | "location": dataset.location, 283 | "created": dataset.created.isoformat() if dataset.created else None, 284 | }, 285 | indent=2, 286 | ) 287 | except Exception as e: 288 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 289 | 290 | @mcp_instance.tool() 291 | def create_table( 292 | dataset_id: str, 293 | table_id: str, 294 | schema_fields: List[Dict[str, Any]], 295 | project_id: str = None, 296 | description: str = "", 297 | friendly_name: str = None, 298 | expiration_ms: Optional[int] = None, 299 | labels: Optional[Dict[str, str]] = None, 300 | clustering_fields: Optional[List[str]] = None, 301 | time_partitioning_field: Optional[str] = None, 302 | time_partitioning_type: str = "DAY", 303 | ) -> str: 304 | """ 305 | Create a new BigQuery table 306 | 307 | Args: 308 | dataset_id: Dataset ID where the table will be created 309 | table_id: ID for the new table 310 | schema_fields: List of field definitions, each with name, type, mode, and description 311 | project_id: GCP project ID (defaults to configured project) 312 | description: Optional table description 313 | friendly_name: Optional user-friendly name 314 | expiration_ms: Optional table expiration time in milliseconds 315 | labels: Optional key-value pairs for table labels 316 | clustering_fields: Optional list of fields to cluster by 317 | time_partitioning_field: Optional field to use for time-based partitioning 318 | time_partitioning_type: Partitioning type (DAY, HOUR, MONTH, YEAR) 319 | 320 | Example schema_fields: 321 | [ 322 | {"name": "name", "type": "STRING", "mode": "REQUIRED", "description": "Name field"}, 323 | {"name": "age", "type": "INTEGER", "mode": "NULLABLE", "description": "Age field"} 324 | ] 325 | """ 326 | try: 327 | # Get client from client_instances 328 | client = client_instances.get_clients().bigquery 329 | project_id = project_id or client_instances.get_project_id() 330 | 331 | # Convert schema_fields to SchemaField objects 332 | schema = [] 333 | for field in schema_fields: 334 | schema.append( 335 | bigquery.SchemaField( 336 | name=field["name"], 337 | field_type=field["type"], 338 | mode=field.get("mode", "NULLABLE"), 339 | description=field.get("description", ""), 340 | ) 341 | ) 342 | 343 | # Create table reference 344 | table_ref = client.dataset(dataset_id).table(table_id) 345 | table = bigquery.Table(table_ref, schema=schema) 346 | 347 | # Set table properties 348 | if description: 349 | table.description = description 350 | if friendly_name: 351 | table.friendly_name = friendly_name 352 | if expiration_ms: 353 | table.expires = expiration_ms 354 | if labels: 355 | table.labels = labels 356 | 357 | # Set clustering if specified 358 | if clustering_fields: 359 | table.clustering_fields = clustering_fields 360 | 361 | # Set time partitioning if specified 362 | if time_partitioning_field: 363 | if time_partitioning_type == "DAY": 364 | table.time_partitioning = bigquery.TimePartitioning( 365 | type_=bigquery.TimePartitioningType.DAY, 366 | field=time_partitioning_field, 367 | ) 368 | elif time_partitioning_type == "HOUR": 369 | table.time_partitioning = bigquery.TimePartitioning( 370 | type_=bigquery.TimePartitioningType.HOUR, 371 | field=time_partitioning_field, 372 | ) 373 | elif time_partitioning_type == "MONTH": 374 | table.time_partitioning = bigquery.TimePartitioning( 375 | type_=bigquery.TimePartitioningType.MONTH, 376 | field=time_partitioning_field, 377 | ) 378 | elif time_partitioning_type == "YEAR": 379 | table.time_partitioning = bigquery.TimePartitioning( 380 | type_=bigquery.TimePartitioningType.YEAR, 381 | field=time_partitioning_field, 382 | ) 383 | 384 | # Log info (similar to ctx.info) 385 | print(f"Creating table {dataset_id}.{table_id}...") 386 | table = client.create_table(table) 387 | 388 | return json.dumps( 389 | { 390 | "status": "success", 391 | "table_id": table.table_id, 392 | "full_table_id": f"{table.project}.{table.dataset_id}.{table.table_id}", 393 | "created": table.created.isoformat() if table.created else None, 394 | }, 395 | indent=2, 396 | ) 397 | except Exception as e: 398 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 399 | 400 | @mcp_instance.tool() 401 | def delete_table(dataset_id: str, table_id: str, project_id: str = None) -> str: 402 | """ 403 | Delete a BigQuery table 404 | 405 | Args: 406 | dataset_id: Dataset ID containing the table 407 | table_id: ID of the table to delete 408 | project_id: GCP project ID (defaults to configured project) 409 | """ 410 | try: 411 | # Get client from client_instances 412 | client = client_instances.get_clients().bigquery 413 | project_id = project_id or client_instances.get_project_id() 414 | 415 | table_ref = client.dataset(dataset_id).table(table_id) 416 | 417 | # Log info (similar to ctx.info) 418 | print(f"Deleting table {dataset_id}.{table_id}...") 419 | client.delete_table(table_ref) 420 | 421 | return json.dumps( 422 | { 423 | "status": "success", 424 | "message": f"Table {project_id}.{dataset_id}.{table_id} successfully deleted", 425 | }, 426 | indent=2, 427 | ) 428 | except NotFound: 429 | return json.dumps( 430 | { 431 | "status": "error", 432 | "message": f"Table {project_id}.{dataset_id}.{table_id} not found", 433 | }, 434 | indent=2, 435 | ) 436 | except Exception as e: 437 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 438 | 439 | @mcp_instance.tool() 440 | def load_table_from_json( 441 | dataset_id: str, 442 | table_id: str, 443 | json_data: List[Dict[str, Any]], 444 | project_id: str = None, 445 | schema_fields: Optional[List[Dict[str, Any]]] = None, 446 | write_disposition: str = "WRITE_APPEND", 447 | ) -> str: 448 | """ 449 | Load data into a BigQuery table from JSON data 450 | 451 | Args: 452 | dataset_id: Dataset ID containing the table 453 | table_id: ID of the table to load data into 454 | json_data: List of dictionaries representing rows to insert 455 | project_id: GCP project ID (defaults to configured project) 456 | schema_fields: Optional schema definition (if not using existing table schema) 457 | write_disposition: How to handle existing data (WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY) 458 | """ 459 | try: 460 | # Get client from client_instances 461 | client = client_instances.get_clients().bigquery 462 | project_id = project_id or client_instances.get_project_id() 463 | 464 | table_ref = client.dataset(dataset_id).table(table_id) 465 | 466 | # Convert write_disposition to the appropriate enum 467 | if write_disposition == "WRITE_TRUNCATE": 468 | disposition = bigquery.WriteDisposition.WRITE_TRUNCATE 469 | elif write_disposition == "WRITE_APPEND": 470 | disposition = bigquery.WriteDisposition.WRITE_APPEND 471 | elif write_disposition == "WRITE_EMPTY": 472 | disposition = bigquery.WriteDisposition.WRITE_EMPTY 473 | else: 474 | return json.dumps( 475 | { 476 | "status": "error", 477 | "message": f"Invalid write_disposition: {write_disposition}. Use WRITE_TRUNCATE, WRITE_APPEND, or WRITE_EMPTY.", 478 | }, 479 | indent=2, 480 | ) 481 | 482 | # Create job config 483 | job_config = bigquery.LoadJobConfig( 484 | write_disposition=disposition, 485 | source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, 486 | ) 487 | 488 | # Set schema if provided 489 | if schema_fields: 490 | schema = [] 491 | for field in schema_fields: 492 | schema.append( 493 | bigquery.SchemaField( 494 | name=field["name"], 495 | field_type=field["type"], 496 | mode=field.get("mode", "NULLABLE"), 497 | description=field.get("description", ""), 498 | ) 499 | ) 500 | job_config.schema = schema 501 | 502 | # Convert JSON data to newline-delimited JSON (not needed but keeping the log) 503 | print(f"Loading {len(json_data)} rows into {dataset_id}.{table_id}...") 504 | 505 | # Create and run the load job 506 | load_job = client.load_table_from_json( 507 | json_data, table_ref, job_config=job_config 508 | ) 509 | 510 | # Wait for the job to complete 511 | load_job.result() 512 | 513 | # Get updated table info 514 | table = client.get_table(table_ref) 515 | 516 | return json.dumps( 517 | { 518 | "status": "success", 519 | "rows_loaded": len(json_data), 520 | "total_rows": table.num_rows, 521 | "message": f"Successfully loaded data into {project_id}.{dataset_id}.{table_id}", 522 | }, 523 | indent=2, 524 | ) 525 | except Exception as e: 526 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 527 | 528 | @mcp_instance.tool() 529 | def export_table_to_csv( 530 | dataset_id: str, 531 | table_id: str, 532 | destination_uri: str, 533 | project_id: str = None, 534 | print_header: bool = True, 535 | field_delimiter: str = ",", 536 | ) -> str: 537 | """ 538 | Export a BigQuery table to Cloud Storage as CSV 539 | 540 | Args: 541 | dataset_id: Dataset ID containing the table 542 | table_id: ID of the table to export 543 | destination_uri: GCS URI (gs://bucket/path) 544 | project_id: GCP project ID (defaults to configured project) 545 | print_header: Whether to include column headers 546 | field_delimiter: Delimiter character for fields 547 | """ 548 | try: 549 | # Get client from client_instances 550 | client = client_instances.get_clients().bigquery 551 | project_id = project_id or client_instances.get_project_id() 552 | 553 | table_ref = client.dataset(dataset_id).table(table_id) 554 | 555 | # Validate destination URI 556 | if not destination_uri.startswith("gs://"): 557 | return json.dumps( 558 | { 559 | "status": "error", 560 | "message": "destination_uri must start with gs://", 561 | }, 562 | indent=2, 563 | ) 564 | 565 | job_config = bigquery.ExtractJobConfig() 566 | job_config.destination_format = bigquery.DestinationFormat.CSV 567 | job_config.print_header = print_header 568 | job_config.field_delimiter = field_delimiter 569 | 570 | # Log info (similar to ctx.info) 571 | print(f"Exporting {dataset_id}.{table_id} to {destination_uri}...") 572 | 573 | # Create and run the extract job 574 | extract_job = client.extract_table( 575 | table_ref, destination_uri, job_config=job_config 576 | ) 577 | 578 | # Wait for the job to complete 579 | extract_job.result() 580 | 581 | return json.dumps( 582 | { 583 | "status": "success", 584 | "destination": destination_uri, 585 | "message": f"Successfully exported {project_id}.{dataset_id}.{table_id} to {destination_uri}", 586 | }, 587 | indent=2, 588 | ) 589 | except Exception as e: 590 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 591 | 592 | # Prompts 593 | @mcp_instance.prompt() 594 | def create_dataset_prompt() -> str: 595 | """Prompt for creating a new BigQuery dataset""" 596 | return """ 597 | I need to create a new BigQuery dataset. 598 | 599 | Please help me with: 600 | 1. Choosing an appropriate location for my dataset 601 | 2. Understanding dataset naming conventions 602 | 3. Best practices for dataset configuration (expiration, labels, etc.) 603 | 4. The process to create the dataset 604 | """ 605 | 606 | @mcp_instance.prompt() 607 | def query_optimization_prompt() -> str: 608 | """Prompt for BigQuery query optimization help""" 609 | return """ 610 | I have a BigQuery query that's slow or expensive to run. 611 | 612 | Please help me optimize it by: 613 | 1. Analyzing key factors that affect BigQuery performance 614 | 2. Identifying common patterns that lead to inefficient queries 615 | 3. Suggesting specific optimization techniques 616 | 4. Helping me understand how to use EXPLAIN plan analysis 617 | """ 618 | ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/cloud_run.py: -------------------------------------------------------------------------------- ```python 1 | import json 2 | from typing import Dict, Optional 3 | 4 | from google.cloud import run_v2 5 | from google.protobuf import field_mask_pb2 6 | from services import client_instances 7 | 8 | 9 | def register(mcp_instance): 10 | """Register all Cloud Run resources and tools with the MCP instance.""" 11 | 12 | # Resources 13 | @mcp_instance.resource("gcp://cloudrun/{project_id}/{region}/services") 14 | def list_services_resource(project_id: str = None, region: str = None) -> str: 15 | """List all Cloud Run services in a specific region""" 16 | try: 17 | # Get client from client_instances 18 | client = client_instances.get_clients().run 19 | project_id = project_id or client_instances.get_project_id() 20 | region = region or client_instances.get_location() 21 | 22 | parent = f"projects/{project_id}/locations/{region}" 23 | services = client.list_services(parent=parent) 24 | 25 | result = [] 26 | for service in services: 27 | result.append( 28 | { 29 | "name": service.name.split("/")[-1], 30 | "uid": service.uid, 31 | "generation": service.generation, 32 | "labels": dict(service.labels) if service.labels else {}, 33 | "annotations": dict(service.annotations) 34 | if service.annotations 35 | else {}, 36 | "create_time": service.create_time.isoformat() 37 | if service.create_time 38 | else None, 39 | "update_time": service.update_time.isoformat() 40 | if service.update_time 41 | else None, 42 | "uri": service.uri, 43 | } 44 | ) 45 | 46 | return json.dumps(result, indent=2) 47 | except Exception as e: 48 | return json.dumps({"error": str(e)}, indent=2) 49 | 50 | @mcp_instance.resource("gcp://run/{project_id}/{location}/service/{service_name}") 51 | def get_service_resource( 52 | service_name: str, project_id: str = None, location: str = None 53 | ) -> str: 54 | """Get details for a specific Cloud Run service""" 55 | try: 56 | # Get client from client_instances 57 | client = client_instances.get_clients().run 58 | project_id = project_id or client_instances.get_project_id() 59 | location = location or client_instances.get_location() 60 | 61 | name = f"projects/{project_id}/locations/{location}/services/{service_name}" 62 | service = client.get_service(name=name) 63 | 64 | # Extract container details 65 | containers = [] 66 | if service.template and service.template.containers: 67 | for container in service.template.containers: 68 | container_info = { 69 | "image": container.image, 70 | "command": list(container.command) if container.command else [], 71 | "args": list(container.args) if container.args else [], 72 | "env": [ 73 | {"name": env.name, "value": env.value} 74 | for env in container.env 75 | ] 76 | if container.env 77 | else [], 78 | "resources": { 79 | "limits": dict(container.resources.limits) 80 | if container.resources and container.resources.limits 81 | else {}, 82 | "cpu_idle": container.resources.cpu_idle 83 | if container.resources 84 | else None, 85 | } 86 | if container.resources 87 | else {}, 88 | "ports": [ 89 | {"name": port.name, "container_port": port.container_port} 90 | for port in container.ports 91 | ] 92 | if container.ports 93 | else [], 94 | } 95 | containers.append(container_info) 96 | 97 | # Extract scaling details 98 | scaling = None 99 | if service.template and service.template.scaling: 100 | scaling = { 101 | "min_instance_count": service.template.scaling.min_instance_count, 102 | "max_instance_count": service.template.scaling.max_instance_count, 103 | } 104 | 105 | result = { 106 | "name": service.name.split("/")[-1], 107 | "uid": service.uid, 108 | "generation": service.generation, 109 | "labels": dict(service.labels) if service.labels else {}, 110 | "annotations": dict(service.annotations) if service.annotations else {}, 111 | "create_time": service.create_time.isoformat() 112 | if service.create_time 113 | else None, 114 | "update_time": service.update_time.isoformat() 115 | if service.update_time 116 | else None, 117 | "creator": service.creator, 118 | "last_modifier": service.last_modifier, 119 | "client": service.client, 120 | "client_version": service.client_version, 121 | "ingress": service.ingress.name if service.ingress else None, 122 | "launch_stage": service.launch_stage.name 123 | if service.launch_stage 124 | else None, 125 | "traffic": [ 126 | { 127 | "type": traffic.type_.name if traffic.type_ else None, 128 | "revision": traffic.revision.split("/")[-1] 129 | if traffic.revision 130 | else None, 131 | "percent": traffic.percent, 132 | "tag": traffic.tag, 133 | } 134 | for traffic in service.traffic 135 | ] 136 | if service.traffic 137 | else [], 138 | "uri": service.uri, 139 | "template": { 140 | "containers": containers, 141 | "execution_environment": service.template.execution_environment.name 142 | if service.template and service.template.execution_environment 143 | else None, 144 | "vpc_access": { 145 | "connector": service.template.vpc_access.connector, 146 | "egress": service.template.vpc_access.egress.name 147 | if service.template.vpc_access.egress 148 | else None, 149 | } 150 | if service.template and service.template.vpc_access 151 | else None, 152 | "scaling": scaling, 153 | "timeout": f"{service.template.timeout.seconds}s {service.template.timeout.nanos}ns" 154 | if service.template and service.template.timeout 155 | else None, 156 | "service_account": service.template.service_account 157 | if service.template 158 | else None, 159 | } 160 | if service.template 161 | else {}, 162 | } 163 | 164 | return json.dumps(result, indent=2) 165 | except Exception as e: 166 | return json.dumps({"error": str(e)}, indent=2) 167 | 168 | @mcp_instance.resource( 169 | "gcp://run/{project_id}/{location}/service/{service_name}/revisions" 170 | ) 171 | def list_revisions_resource( 172 | service_name: str, project_id: str = None, location: str = None 173 | ) -> str: 174 | """List revisions for a specific Cloud Run service""" 175 | try: 176 | # Get client from client_instances 177 | client = client_instances.get_clients().run 178 | project_id = project_id or client_instances.get_project_id() 179 | location = location or client_instances.get_location() 180 | 181 | parent = f"projects/{project_id}/locations/{location}" 182 | 183 | # List all revisions 184 | revisions = client.list_revisions(parent=parent) 185 | 186 | # Filter revisions for the specified service 187 | service_revisions = [] 188 | for revision in revisions: 189 | # Check if this revision belongs to the specified service 190 | if service_name in revision.name: 191 | service_revisions.append( 192 | { 193 | "name": revision.name.split("/")[-1], 194 | "uid": revision.uid, 195 | "generation": revision.generation, 196 | "service": revision.service.split("/")[-1] 197 | if revision.service 198 | else None, 199 | "create_time": revision.create_time.isoformat() 200 | if revision.create_time 201 | else None, 202 | "update_time": revision.update_time.isoformat() 203 | if revision.update_time 204 | else None, 205 | "scaling": { 206 | "min_instance_count": revision.scaling.min_instance_count 207 | if revision.scaling 208 | else None, 209 | "max_instance_count": revision.scaling.max_instance_count 210 | if revision.scaling 211 | else None, 212 | } 213 | if revision.scaling 214 | else None, 215 | "conditions": [ 216 | { 217 | "type": condition.type, 218 | "state": condition.state.name 219 | if condition.state 220 | else None, 221 | "message": condition.message, 222 | "last_transition_time": condition.last_transition_time.isoformat() 223 | if condition.last_transition_time 224 | else None, 225 | "severity": condition.severity.name 226 | if condition.severity 227 | else None, 228 | } 229 | for condition in revision.conditions 230 | ] 231 | if revision.conditions 232 | else [], 233 | } 234 | ) 235 | 236 | return json.dumps(service_revisions, indent=2) 237 | except Exception as e: 238 | return json.dumps({"error": str(e)}, indent=2) 239 | 240 | # Tools 241 | @mcp_instance.tool() 242 | def create_service( 243 | service_name: str, 244 | image: str, 245 | project_id: str = None, 246 | location: str = None, 247 | env_vars: Optional[Dict[str, str]] = None, 248 | memory_limit: str = "512Mi", 249 | cpu_limit: str = "1", 250 | min_instances: int = 0, 251 | max_instances: int = 100, 252 | port: int = 8080, 253 | allow_unauthenticated: bool = True, 254 | service_account: str = None, 255 | vpc_connector: str = None, 256 | timeout_seconds: int = 300, 257 | ) -> str: 258 | """ 259 | Create a new Cloud Run service 260 | 261 | Args: 262 | service_name: Name for the new service 263 | image: Container image to deploy (e.g., gcr.io/project/image:tag) 264 | project_id: GCP project ID (defaults to configured project) 265 | location: GCP region (e.g., us-central1) (defaults to configured location) 266 | env_vars: Environment variables as key-value pairs 267 | memory_limit: Memory limit (e.g., 512Mi, 1Gi) 268 | cpu_limit: CPU limit (e.g., 1, 2) 269 | min_instances: Minimum number of instances 270 | max_instances: Maximum number of instances 271 | port: Container port 272 | allow_unauthenticated: Whether to allow unauthenticated access 273 | service_account: Service account email 274 | vpc_connector: VPC connector name 275 | timeout_seconds: Request timeout in seconds 276 | """ 277 | try: 278 | # Get client from client_instances 279 | client = client_instances.get_clients().run 280 | project_id = project_id or client_instances.get_project_id() 281 | location = location or client_instances.get_location() 282 | 283 | print(f"Creating Cloud Run service {service_name} in {location}...") 284 | 285 | # Create container 286 | container = run_v2.Container( 287 | image=image, 288 | resources=run_v2.ResourceRequirements( 289 | limits={ 290 | "memory": memory_limit, 291 | "cpu": cpu_limit, 292 | } 293 | ), 294 | ) 295 | 296 | # Add environment variables if provided 297 | if env_vars: 298 | container.env = [ 299 | run_v2.EnvVar(name=key, value=value) 300 | for key, value in env_vars.items() 301 | ] 302 | 303 | # Add port 304 | container.ports = [run_v2.ContainerPort(container_port=port)] 305 | 306 | # Create template 307 | template = run_v2.RevisionTemplate( 308 | containers=[container], 309 | scaling=run_v2.RevisionScaling( 310 | min_instance_count=min_instances, 311 | max_instance_count=max_instances, 312 | ), 313 | timeout=run_v2.Duration(seconds=timeout_seconds), 314 | ) 315 | 316 | # Add service account if provided 317 | if service_account: 318 | template.service_account = service_account 319 | 320 | # Add VPC connector if provided 321 | if vpc_connector: 322 | template.vpc_access = run_v2.VpcAccess( 323 | connector=vpc_connector, 324 | egress=run_v2.VpcAccess.VpcEgress.ALL_TRAFFIC, 325 | ) 326 | 327 | # Create service 328 | service = run_v2.Service( 329 | template=template, 330 | ingress=run_v2.IngressTraffic.INGRESS_TRAFFIC_ALL 331 | if allow_unauthenticated 332 | else run_v2.IngressTraffic.INGRESS_TRAFFIC_INTERNAL_ONLY, 333 | ) 334 | 335 | # Create the service 336 | parent = f"projects/{project_id}/locations/{location}" 337 | operation = client.create_service( 338 | parent=parent, 339 | service_id=service_name, 340 | service=service, 341 | ) 342 | 343 | # Wait for the operation to complete 344 | print(f"Waiting for service {service_name} to be created...") 345 | result = operation.result() 346 | 347 | return json.dumps( 348 | { 349 | "status": "success", 350 | "name": result.name.split("/")[-1], 351 | "uri": result.uri, 352 | "create_time": result.create_time.isoformat() 353 | if result.create_time 354 | else None, 355 | "update_time": result.update_time.isoformat() 356 | if result.update_time 357 | else None, 358 | }, 359 | indent=2, 360 | ) 361 | except Exception as e: 362 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 363 | 364 | @mcp_instance.tool() 365 | def list_services(project_id: str = None, region: str = None) -> str: 366 | """ 367 | List Cloud Run services in a specific region 368 | 369 | Args: 370 | project_id: GCP project ID (defaults to configured project) 371 | region: GCP region (e.g., us-central1) (defaults to configured location) 372 | """ 373 | try: 374 | # Get client from client_instances 375 | client = client_instances.get_clients().run 376 | project_id = project_id or client_instances.get_project_id() 377 | region = region or client_instances.get_location() 378 | 379 | parent = f"projects/{project_id}/locations/{region}" 380 | 381 | print(f"Listing Cloud Run services in {region}...") 382 | services = client.list_services(parent=parent) 383 | 384 | result = [] 385 | for service in services: 386 | result.append( 387 | { 388 | "name": service.name.split("/")[-1], 389 | "uri": service.uri, 390 | "create_time": service.create_time.isoformat() 391 | if service.create_time 392 | else None, 393 | "update_time": service.update_time.isoformat() 394 | if service.update_time 395 | else None, 396 | "labels": dict(service.labels) if service.labels else {}, 397 | } 398 | ) 399 | 400 | return json.dumps( 401 | {"status": "success", "services": result, "count": len(result)}, 402 | indent=2, 403 | ) 404 | except Exception as e: 405 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 406 | 407 | @mcp_instance.tool() 408 | def update_service( 409 | service_name: str, 410 | project_id: str = None, 411 | region: str = None, 412 | image: Optional[str] = None, 413 | memory: Optional[str] = None, 414 | cpu: Optional[str] = None, 415 | max_instances: Optional[int] = None, 416 | min_instances: Optional[int] = None, 417 | env_vars: Optional[Dict[str, str]] = None, 418 | concurrency: Optional[int] = None, 419 | timeout_seconds: Optional[int] = None, 420 | service_account: Optional[str] = None, 421 | labels: Optional[Dict[str, str]] = None, 422 | ) -> str: 423 | """ 424 | Update an existing Cloud Run service 425 | 426 | Args: 427 | service_name: Name of the service to update 428 | project_id: GCP project ID (defaults to configured project) 429 | region: GCP region (e.g., us-central1) (defaults to configured location) 430 | image: New container image URL (optional) 431 | memory: New memory allocation (optional) 432 | cpu: New CPU allocation (optional) 433 | max_instances: New maximum number of instances (optional) 434 | min_instances: New minimum number of instances (optional) 435 | env_vars: New environment variables (optional) 436 | concurrency: New maximum concurrent requests per instance (optional) 437 | timeout_seconds: New request timeout in seconds (optional) 438 | service_account: New service account email (optional) 439 | labels: New key-value labels to apply to the service (optional) 440 | """ 441 | try: 442 | # Get client from client_instances 443 | client = client_instances.get_clients().run 444 | project_id = project_id or client_instances.get_project_id() 445 | region = region or client_instances.get_location() 446 | 447 | # Get the existing service 448 | name = f"projects/{project_id}/locations/{region}/services/{service_name}" 449 | 450 | print(f"Getting current service configuration for {service_name}...") 451 | service = client.get_service(name=name) 452 | 453 | # Track which fields are being updated 454 | update_mask_fields = [] 455 | 456 | # Update the container image if specified 457 | if image and service.template and service.template.containers: 458 | service.template.containers[0].image = image 459 | update_mask_fields.append("template.containers[0].image") 460 | 461 | # Update resources if specified 462 | if (memory or cpu) and service.template and service.template.containers: 463 | if not service.template.containers[0].resources: 464 | service.template.containers[ 465 | 0 466 | ].resources = run_v2.ResourceRequirements(limits={}) 467 | 468 | if not service.template.containers[0].resources.limits: 469 | service.template.containers[0].resources.limits = {} 470 | 471 | if memory: 472 | service.template.containers[0].resources.limits["memory"] = memory 473 | update_mask_fields.append( 474 | "template.containers[0].resources.limits.memory" 475 | ) 476 | 477 | if cpu: 478 | service.template.containers[0].resources.limits["cpu"] = cpu 479 | update_mask_fields.append( 480 | "template.containers[0].resources.limits.cpu" 481 | ) 482 | 483 | # Update scaling parameters 484 | if max_instances is not None and service.template: 485 | service.template.max_instance_count = max_instances 486 | update_mask_fields.append("template.max_instance_count") 487 | 488 | if min_instances is not None and service.template: 489 | service.template.min_instance_count = min_instances 490 | update_mask_fields.append("template.min_instance_count") 491 | 492 | # Update concurrency 493 | if concurrency is not None and service.template: 494 | service.template.max_instance_request_concurrency = concurrency 495 | update_mask_fields.append("template.max_instance_request_concurrency") 496 | 497 | # Update timeout 498 | if timeout_seconds is not None and service.template: 499 | service.template.timeout = {"seconds": timeout_seconds} 500 | update_mask_fields.append("template.timeout") 501 | 502 | # Update service account 503 | if service_account is not None and service.template: 504 | service.template.service_account = service_account 505 | update_mask_fields.append("template.service_account") 506 | 507 | # Update environment variables 508 | if ( 509 | env_vars is not None 510 | and service.template 511 | and service.template.containers 512 | ): 513 | # Create new env var list 514 | new_env_vars = [ 515 | run_v2.EnvVar(name=key, value=value) 516 | for key, value in env_vars.items() 517 | ] 518 | service.template.containers[0].env = new_env_vars 519 | update_mask_fields.append("template.containers[0].env") 520 | 521 | # Update labels 522 | if labels is not None: 523 | service.labels = labels 524 | update_mask_fields.append("labels") 525 | 526 | # Only proceed if there are fields to update 527 | if not update_mask_fields: 528 | return json.dumps( 529 | {"status": "info", "message": "No updates specified"}, indent=2 530 | ) 531 | 532 | # Create update mask 533 | update_mask = field_mask_pb2.FieldMask(paths=update_mask_fields) 534 | 535 | # Create the request 536 | request = run_v2.UpdateServiceRequest( 537 | service=service, update_mask=update_mask 538 | ) 539 | 540 | print(f"Updating Cloud Run service {service_name}...") 541 | operation = client.update_service(request=request) 542 | 543 | print("Waiting for service update to complete...") 544 | response = operation.result() 545 | 546 | return json.dumps( 547 | { 548 | "status": "success", 549 | "name": response.name, 550 | "uri": response.uri, 551 | "updated_fields": update_mask_fields, 552 | "conditions": [ 553 | { 554 | "type": condition.type_, 555 | "state": condition.state.name 556 | if hasattr(condition.state, "name") 557 | else str(condition.state), 558 | "message": condition.message, 559 | } 560 | for condition in response.conditions 561 | ] 562 | if response.conditions 563 | else [], 564 | }, 565 | indent=2, 566 | ) 567 | except Exception as e: 568 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 569 | 570 | @mcp_instance.tool() 571 | def delete_service( 572 | service_name: str, project_id: str = None, region: str = None 573 | ) -> str: 574 | """ 575 | Delete a Cloud Run service 576 | 577 | Args: 578 | service_name: Name of the service to delete 579 | project_id: GCP project ID (defaults to configured project) 580 | region: GCP region (e.g., us-central1) (defaults to configured location) 581 | """ 582 | try: 583 | # Get client from client_instances 584 | client = client_instances.get_clients().run 585 | project_id = project_id or client_instances.get_project_id() 586 | region = region or client_instances.get_location() 587 | 588 | name = f"projects/{project_id}/locations/{region}/services/{service_name}" 589 | 590 | print(f"Deleting Cloud Run service {service_name} in {region}...") 591 | operation = client.delete_service(name=name) 592 | 593 | print("Waiting for service deletion to complete...") 594 | operation.result() 595 | 596 | return json.dumps( 597 | { 598 | "status": "success", 599 | "message": f"Service {service_name} successfully deleted", 600 | }, 601 | indent=2, 602 | ) 603 | except Exception as e: 604 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 605 | 606 | @mcp_instance.tool() 607 | def get_service( 608 | service_name: str, project_id: str = None, region: str = None 609 | ) -> str: 610 | """ 611 | Get details of a specific Cloud Run service 612 | 613 | Args: 614 | service_name: Name of the service 615 | project_id: GCP project ID (defaults to configured project) 616 | region: GCP region (e.g., us-central1) (defaults to configured location) 617 | """ 618 | try: 619 | # Get client from client_instances 620 | client = client_instances.get_clients().run 621 | project_id = project_id or client_instances.get_project_id() 622 | region = region or client_instances.get_location() 623 | 624 | name = f"projects/{project_id}/locations/{region}/services/{service_name}" 625 | 626 | print(f"Getting details for Cloud Run service {service_name}...") 627 | service = client.get_service(name=name) 628 | 629 | # Parse container details 630 | containers = [] 631 | if service.template and service.template.containers: 632 | for container in service.template.containers: 633 | container_info = { 634 | "image": container.image, 635 | "resources": { 636 | "limits": dict(container.resources.limits) 637 | if container.resources and container.resources.limits 638 | else {} 639 | } 640 | if container.resources 641 | else {}, 642 | "env_vars": {env.name: env.value for env in container.env} 643 | if container.env 644 | else {}, 645 | } 646 | containers.append(container_info) 647 | 648 | # Parse traffic 649 | traffic_result = [] 650 | if service.traffic: 651 | for traffic in service.traffic: 652 | traffic_info = { 653 | "type": traffic.type_.name 654 | if hasattr(traffic.type_, "name") 655 | else str(traffic.type_), 656 | "revision": traffic.revision.split("/")[-1] 657 | if traffic.revision 658 | else None, 659 | "percent": traffic.percent, 660 | "tag": traffic.tag, 661 | } 662 | traffic_result.append(traffic_info) 663 | 664 | return json.dumps( 665 | { 666 | "status": "success", 667 | "name": service.name.split("/")[-1], 668 | "uri": service.uri, 669 | "containers": containers, 670 | "traffic": traffic_result, 671 | "update_time": service.update_time.isoformat() 672 | if service.update_time 673 | else None, 674 | }, 675 | indent=2, 676 | ) 677 | except Exception as e: 678 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 679 | 680 | # Prompts 681 | @mcp_instance.prompt() 682 | def deploy_service_prompt( 683 | service_name: str, 684 | image: str, 685 | min_instances: str = "0", 686 | max_instances: str = "100", 687 | env_vars: str = "{}", 688 | ) -> str: 689 | """Prompt to deploy a Cloud Run service with the given configuration""" 690 | return f""" 691 | I need to deploy a Cloud Run service with the following configuration: 692 | 693 | - Service name: {service_name} 694 | - Container image: {image} 695 | - Min instances: {min_instances} 696 | - Max instances: {max_instances} 697 | - Environment variables: {env_vars} 698 | 699 | Please help me set up this service and explain the deployment process and any best practices I should follow. 700 | """ 701 | 702 | @mcp_instance.prompt() 703 | def check_service_status_prompt(service_name: str) -> str: 704 | """Prompt to check the status of a deployed Cloud Run service""" 705 | return f""" 706 | I need to check the current status of my Cloud Run service named "{service_name}". 707 | 708 | Please provide me with: 709 | 1. Is the service currently running? 710 | 2. What is the URL to access it? 711 | 3. What revision is currently serving traffic? 712 | 4. Are there any issues with the service? 713 | """ 714 | 715 | @mcp_instance.prompt() 716 | def create_service_prompt(region: str = "us-central1") -> str: 717 | """Prompt for creating a new Cloud Run service""" 718 | return f""" 719 | I need to create a new Cloud Run service in {region}. 720 | 721 | Please help me with: 722 | 1. What container image should I use? 723 | 2. How much CPU and memory should I allocate? 724 | 3. Should I set min and max instances for scaling? 725 | 4. Do I need to set any environment variables? 726 | 5. Should I allow unauthenticated access? 727 | 6. What's the best way to deploy my service? 728 | """ 729 | 730 | @mcp_instance.prompt() 731 | def update_service_prompt() -> str: 732 | """Prompt for updating a Cloud Run service""" 733 | return """ 734 | I need to update my Cloud Run service. 735 | 736 | Please help me understand: 737 | 1. How to update the container image 738 | 2. How to change resource allocations 739 | 3. How to add or modify environment variables 740 | 4. How to update scaling settings 741 | 5. Best practices for zero-downtime updates 742 | """ 743 | ```