# Directory Structure ``` ├── .gitignore ├── .python-version ├── Dockerfile ├── LICENSE ├── pyproject.toml ├── README.md ├── requirements.txt ├── smithery.yaml ├── src │ └── gcp-mcp-server │ ├── __init__.py │ ├── core │ │ ├── __init__.py │ │ ├── auth.py │ │ ├── context.py │ │ ├── logging_handler.py │ │ ├── security.py │ │ └── server.py │ ├── main.py │ ├── prompts │ │ ├── __init__.py │ │ └── common.py │ └── services │ ├── __init__.py │ ├── artifact_registry.py │ ├── client_instances.py │ ├── cloud_audit_logs.py │ ├── cloud_bigquery.py │ ├── cloud_build.py │ ├── cloud_compute_engine.py │ ├── cloud_monitoring.py │ ├── cloud_run.py │ ├── cloud_storage.py │ └── README.md ├── utils │ ├── __init__.py │ └── helpers.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.13 ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/README.md: -------------------------------------------------------------------------------- ```markdown # GCP Services MCP Implementation This repository contains Model Context Protocol (MCP) implementations for various Google Cloud Platform (GCP) services. These modules expose GCP resources, tools, and prompts in a standardized way for AI assistants to interact with GCP infrastructure. ## Overview The MCP framework provides a consistent way to expose cloud resources to AI assistants. Each service module follows a common pattern: 1. A `register` function that takes an MCP instance as a parameter 2. Resources for retrieving information about GCP resources 3. Tools for performing operations on GCP resources 4. Prompts for guiding users through common tasks ## Implementation Pattern All service modules follow a consistent implementation pattern: ```python import json from services import client_instances from google.cloud import service_client def register(mcp_instance): """Register all resources and tools with the MCP instance.""" # Resources @mcp_instance.resource("gcp://service/{project_id}/resources") def list_resources(project_id: str = None) -> str: # Implement resource listing pass # Tools @mcp_instance.tool() def create_resource(resource_name: str, project_id: str = None) -> str: # Implement resource creation pass # Prompts @mcp_instance.prompt() def configuration_help() -> str: # Return helpful prompt text return """Help text here""" ``` ## Key Design Principles ### 1. Client Management Clients are accessed through a central `client_instances` module: ```python client = client_instances.get_clients().service_name ``` This approach: - Centralizes client creation and authentication - Avoids duplicating client instantiation logic - Enables consistent credential management ### 2. Parameter Defaults Required parameters come first, followed by optional parameters with sensible defaults: ```python def create_resource( resource_name: str, # Required parameter first project_id: str = None, # Optional parameters with defaults location: str = None ) -> str: # Use defaults from client_instances when not provided project_id = project_id or client_instances.get_project_id() location = location or client_instances.get_location() ``` ### 3. Error Handling All functions include consistent error handling: ```python try: # Implementation code return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) ``` ### 4. JSON Responses All responses are consistently formatted JSON strings with proper indentation: ```python return json.dumps( { "status": "success", "resources": result, "count": len(result) }, indent=2 ) ``` ### 5. Defensive Coding All implementations use defensive coding practices to handle potentially missing or null values: ```python "labels": dict(resource.labels) if resource.labels else {} ``` ## Available Services The following GCP services are implemented: 1. **BigQuery** - Data warehouse operations 2. **Cloud Storage** - Object storage operations 3. **Cloud Run** - Serverless container management 4. **Cloud Build** - CI/CD pipeline management 5. **Artifact Registry** - Container and package registry 6. **Compute Engine** - VM instance management 7. **Cloud Monitoring** - Metrics and alerting 8. **Cloud Audit Logs** - Logging and auditing ## Service-Specific Features ### BigQuery - Dataset and table management - Query execution - Data import/export ### Cloud Storage - Bucket management - Object operations (upload, download, delete) - Bucket configuration ### Cloud Run - Service deployment - Service configuration - Traffic management ### Cloud Build - Trigger management - Build execution - Build monitoring ### Artifact Registry - Repository management - Package operations ### Compute Engine - Instance lifecycle management - Instance configuration ### Cloud Monitoring - Metric exploration - Alert policy management - Notification channel configuration ### Cloud Audit Logs - Log querying - Log sink management ## Usage Example To register all services with an MCP instance: ```python from mcp.server.fastmcp import FastMCP from services import ( bigquery, storage, cloudrun, cloudbuild, artifactregistry, compute, monitoring, auditlogs ) # Create MCP instance mcp = FastMCP("GCP Services") # Register all services bigquery.register(mcp) storage.register(mcp) cloudrun.register(mcp) cloudbuild.register(mcp) artifactregistry.register(mcp) compute.register(mcp) monitoring.register(mcp) auditlogs.register(mcp) ``` ## Important Notes 1. **Default Project and Location**: - Services default to the project and location configured in `client_instances` - Always allow these to be overridden by parameters 2. **Authentication**: - Authentication is handled by the `client_instances` module - No credentials should be hardcoded in service implementations 3. **Error Handling**: - All operations should include robust error handling - Error responses should be informative but not expose sensitive information 4. **Response Formatting**: - All responses should be valid JSON with proper indentation - Success responses should include a "status": "success" field - Error responses should include a "status": "error" field with a "message" ## Best Practices for MCP Integration 1. **Resource Naming**: - Use consistent URI patterns for resources (gcp://service/{project_id}/resource) - Group related resources logically 2. **Tool Design**: - Design tools around specific user intents - Order parameters with required ones first, followed by optional ones - Provide sensible defaults for optional parameters 3. **Prompt Design**: - Create prompts for common user scenarios - Include enough context for the AI to provide helpful responses - Structure prompts as guiding questions or templates 4. **Documentation**: - Include descriptive docstrings for all resources, tools, and prompts - Document all parameters and return values - Include usage examples where appropriate ## Extending the Services To add a new GCP service: 1. Create a new module following the pattern above 2. Implement resources for retrieving information 3. Implement tools for operations 4. Create prompts for common user tasks 5. Register the new service in your MCP initialization ## Troubleshooting If you encounter issues: 1. Check client initialization in `client_instances` 2. Verify required permissions for the service operations 3. Look for additional error details in the returned JSON 4. Add print statements for debugging complex operations ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # This is not a Ready MCP Server # GCP MCP Server A comprehensive Model Context Protocol (MCP) server implementation for Google Cloud Platform (GCP) services, enabling AI assistants to interact with and manage GCP resources through a standardized interface. ## Overview GCP MCP Server provides AI assistants with capabilities to: - **Query GCP Resources**: Get information about your cloud infrastructure - **Manage Cloud Resources**: Create, configure, and manage GCP services - **Receive Assistance**: Get AI-guided help with GCP configurations and best practices The implementation follows the MCP specification to enable AI systems to interact with GCP services in a secure, controlled manner. ## Supported GCP Services This implementation includes support for the following GCP services: - **Artifact Registry**: Container and package management - **BigQuery**: Data warehousing and analytics - **Cloud Audit Logs**: Logging and audit trail analysis - **Cloud Build**: CI/CD pipeline management - **Cloud Compute Engine**: Virtual machine instances - **Cloud Monitoring**: Metrics, alerting, and dashboards - **Cloud Run**: Serverless container deployments - **Cloud Storage**: Object storage management ## Architecture The project is structured as follows: ``` gcp-mcp-server/ ├── core/ # Core MCP server functionality auth context logging_handler security ├── prompts/ # AI assistant prompts for GCP operations ├── services/ # GCP service implementations │ ├── README.md # Service implementation details │ └── ... # Individual service modules ├── main.py # Main server entry point └── ... ``` Key components: - **Service Modules**: Each GCP service has its own module with resources, tools, and prompts - **Client Instances**: Centralized client management for authentication and resource access - **Core Components**: Base functionality for the MCP server implementation ## Getting Started ### Prerequisites - Python 3.10+ - GCP project with enabled APIs for the services you want to use - Authenticated GCP credentials (Application Default Credentials recommended) ### Installation 1. Clone the repository: ```bash git clone https://github.com/yourusername/gcp-mcp-server.git cd gcp-mcp-server ``` 2. Set up a virtual environment: ```bash python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate ``` 3. Install dependencies: ```bash pip install -r requirements.txt ``` 4. Configure your GCP credentials: ```bash # Using gcloud gcloud auth application-default login # Or set GOOGLE_APPLICATION_CREDENTIALS export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json" ``` 5. Set up environment variables: ```bash cp .env.example .env # Edit .env with your configuration ``` ### Running the Server Start the MCP server: ```bash python main.py ``` For development and testing: ```bash # Development mode with auto-reload python main.py --dev # Run with specific configuration python main.py --config config.yaml ``` ## Docker Deployment Build and run with Docker: ```bash # Build the image docker build -t gcp-mcp-server . # Run the container docker run -p 8080:8080 -v ~/.config/gcloud:/root/.config/gcloud gcp-mcp-server ``` ## Configuration The server can be configured through environment variables or a configuration file: | Environment Variable | Description | Default | |----------------------|-------------|---------| | `GCP_PROJECT_ID` | Default GCP project ID | None (required) | | `GCP_DEFAULT_LOCATION` | Default region/zone | `us-central1` | | `MCP_SERVER_PORT` | Server port | `8080` | | `LOG_LEVEL` | Logging level | `INFO` | See `.env.example` for a complete list of configuration options. ## Development ### Adding a New GCP Service 1. Create a new file in the `services/` directory 2. Implement the service following the pattern in existing services 3. Register the service in `main.py` See the [services README](services/README.md) for detailed implementation guidance. ## Security Considerations - The server uses Application Default Credentials for authentication - Authorization is determined by the permissions of the authenticated identity - No credentials are hardcoded in the service implementations - Consider running with a service account with appropriate permissions ## Contributing Contributions are welcome! Please feel free to submit a Pull Request. 1. Fork the repository 2. Create your feature branch (`git checkout -b feature/amazing-feature`) 3. Commit your changes (`git commit -m 'Add some amazing feature'`) 4. Push to the branch (`git push origin feature/amazing-feature`) 5. Open a Pull Request ## License This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. ## Acknowledgments - Google Cloud Platform team for their comprehensive APIs - Model Context Protocol for providing a standardized way for AI to interact with services ### Using the Server To use this server: 1. Place your GCP service account key file as `service-account.json` in the same directory 2. Install the MCP package: `pip install "mcp[cli]"` 3. Install the required GCP package: `pip install google-cloud-run` 4. Run: `mcp dev gcp_cloudrun_server.py` Or install it in Claude Desktop: ``` mcp install gcp_cloudrun_server.py --name "GCP Cloud Run Manager" ``` ## MCP Server Configuration The following configuration can be added to your configuration file for GCP Cloud Tools: ```json "mcpServers": { "GCP Cloud Tools": { "command": "uv", "args": [ "run", "--with", "google-cloud-artifact-registry>=1.10.0", "--with", "google-cloud-bigquery>=3.27.0", "--with", "google-cloud-build>=3.0.0", "--with", "google-cloud-compute>=1.0.0", "--with", "google-cloud-logging>=3.5.0", "--with", "google-cloud-monitoring>=2.0.0", "--with", "google-cloud-run>=0.9.0", "--with", "google-cloud-storage>=2.10.0", "--with", "mcp[cli]", "--with", "python-dotenv>=1.0.0", "mcp", "run", "C:\\Users\\enes_\\Desktop\\mcp-repo-final\\gcp-mcp\\src\\gcp-mcp-server\\main.py" ], "env": { "GOOGLE_APPLICATION_CREDENTIALS": "C:/Users/enes_/Desktop/mcp-repo-final/gcp-mcp/service-account.json", "GCP_PROJECT_ID": "gcp-mcp-cloud-project", "GCP_LOCATION": "us-east1" } } } ``` ### Configuration Details This configuration sets up an MCP server for Google Cloud Platform tools with the following: - **Command**: Uses `uv` package manager to run the server - **Dependencies**: Includes various Google Cloud libraries (Artifact Registry, BigQuery, Cloud Build, etc.) - **Environment Variables**: - `GOOGLE_APPLICATION_CREDENTIALS`: Path to your GCP service account credentials - `GCP_PROJECT_ID`: Your Google Cloud project ID - `GCP_LOCATION`: GCP region (us-east1) ### Usage Add this configuration to your MCP configuration file to enable GCP Cloud Tools functionality. ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/prompts/common.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/core/__init__.py: -------------------------------------------------------------------------------- ```python """Core package for MCP server components.""" ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/__init__.py: -------------------------------------------------------------------------------- ```python """GCP MCP server package.""" __version__ = "0.1.0" ``` -------------------------------------------------------------------------------- /requirements.txt: -------------------------------------------------------------------------------- ``` mcp>=1.0.0 google-cloud-bigquery google-cloud-storage google-cloud-run google-cloud-artifact-registry google-cloud-logging python-dotenv google-cloud-monitoring google-cloud-compute google-cloud-build ``` -------------------------------------------------------------------------------- /utils/helpers.py: -------------------------------------------------------------------------------- ```python """Common utility functions.""" def format_resource_name(project_id: str, resource_name: str) -> str: """Format a resource name with project ID.""" return f"projects/{project_id}/resources/{resource_name}" ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/core/security.py: -------------------------------------------------------------------------------- ```python import re class DataSanitizer: PATTERNS = [ r"(?i)(token|key|secret|password)", r"\b\d{4}-\d{4}-\d{4}-\d{4}\b", # CC-like numbers ] @classmethod def sanitize(cls, text: str) -> str: for pattern in cls.PATTERNS: text = re.sub(pattern, "[REDACTED]", text) return text ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/__init__.py: -------------------------------------------------------------------------------- ```python """ GCP MCP Server services package. This package contains service modules that register tools and resources with the MCP server. """ # The following allows these modules to be imported directly from the services package from . import ( artifact_registry, client_instances, cloud_audit_logs, cloud_bigquery, cloud_build, cloud_compute_engine, cloud_monitoring, cloud_run, cloud_storage, ) ``` -------------------------------------------------------------------------------- /utils/__init__.py: -------------------------------------------------------------------------------- ```python """Utility functions for MCP server.""" import json from typing import Any def format_json_response(data: Any) -> str: """Format data as a JSON string with consistent styling.""" return json.dumps({"status": "success", "data": data}, indent=2) def format_error_response(message: str) -> str: """Format error message as a JSON string.""" return json.dumps({"status": "error", "message": message}, indent=2) ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/core/logging_handler.py: -------------------------------------------------------------------------------- ```python import logging class MCPLogger: """Simple logger for MCP server""" def __init__(self, name): self.logger = logging.getLogger(f"mcp.{name}") def info(self, message): self.logger.info(message) def warning(self, message): self.logger.warning(message) def error(self, message): self.logger.error(message) def critical(self, message): self.logger.critical(message) def debug(self, message): self.logger.debug(message) def audit_log(self, action, resource, details=None): """Simple audit logging""" self.logger.info(f"AUDIT: {action} on {resource}") ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/prompts/__init__.py: -------------------------------------------------------------------------------- ```python """Common prompts for GCP operations.""" from ..core import mcp @mcp.prompt() def gcp_service_help(service_name: str) -> str: """Get help for using a GCP service.""" return f""" I need help with using {service_name} in Google Cloud Platform. Please help me understand: 1. Common operations and best practices 2. Required parameters and configuration 3. Security considerations 4. Recommended patterns for {service_name} """ @mcp.prompt() def error_analysis(error_message: str) -> str: """Analyze a GCP error message.""" return f""" I received this error from GCP: {error_message} Please help me: 1. Understand what caused this error 2. Find potential solutions 3. Prevent similar errors in the future """ ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "gcp-mcp-server" version = "0.1.0" description = "A Model Context Protocol server that provides access to Google Cloud Platform services, enabling LLMs to manage and interact with GCP resources." readme = "README.md" requires-python = ">=3.13" dependencies = [ "mcp[cli]>=1.0.0", "google-cloud-bigquery", "google-cloud-storage", "google-cloud-run", "google-cloud-artifact-registry", "google-cloud-logging", "python-dotenv", "google-cloud-monitoring", "google-cloud-compute", "google-cloud-build", ] [[project.authors]] name = "Enes Bol" email = "[email protected]" [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["src/gcp_mcp"] [project.scripts] gcp-mcp-server = "gcp_mcp.main:main" ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/client_instances.py: -------------------------------------------------------------------------------- ```python import logging from core.context import GCPClients # Set up logging logger = logging.getLogger(__name__) # Global client instance _gcp_clients = None _project_id = None _location = None def initialize_clients(credentials=None): """Initialize GCP clients with credentials.""" global _gcp_clients, _project_id, _location try: # Initialize GCP clients _gcp_clients = GCPClients(credentials) _project_id = _gcp_clients.project_id _location = _gcp_clients.location logger.info(f"GCP clients initialized with project: {_project_id}") return True except Exception as e: logger.error(f"Failed to initialize GCP clients: {str(e)}") return False def get_clients(): """Get the initialized GCP clients.""" if _gcp_clients is None: logger.warning("GCP clients not initialized. Attempting to initialize...") initialize_clients() return _gcp_clients def get_project_id(): """Get the GCP project ID.""" return _project_id def get_location(): """Get the GCP location.""" return _location ``` -------------------------------------------------------------------------------- /smithery.yaml: -------------------------------------------------------------------------------- ```yaml # Smithery.ai configuration startCommand: type: stdio configSchema: type: object properties: project_id: type: string description: "Google Cloud Project ID" region: type: string description: "Default GCP region" default: "us-central1" timeout: type: integer description: "Default timeout for operations in seconds" default: 300 service_account_json: type: string description: "GCP Service Account key JSON (as string)" service_account_path: type: string description: "Path to GCP Service Account key file" required: - project_id commandFunction: |- (config) => ({ "command": "python", "args": [ "main.py" ], "env": { "GCP_PROJECT_ID": config.project_id, "GCP_REGION": config.region || "us-central1", "GCP_TIMEOUT": String(config.timeout || 300), "GCP_SERVICE_ACCOUNT_JSON": config.service_account_json || "", "GCP_SERVICE_ACCOUNT_KEY_PATH": config.service_account_path || "" } }) ``` -------------------------------------------------------------------------------- /Dockerfile: -------------------------------------------------------------------------------- ```dockerfile # Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile # Use a Python image with uv pre-installed FROM ghcr.io/astral-sh/uv:python3.13-bookworm-slim AS uv # Set working directory WORKDIR /app # Enable bytecode compilation ENV UV_COMPILE_BYTECODE=1 # Copy pyproject.toml and lock file for dependencies COPY pyproject.toml uv.lock ./ # Install the project's dependencies RUN --mount=type=cache,target=/root/.cache/uv \ uv sync --frozen --no-install-project --no-dev --no-editable # Add the rest of the project source code and install it ADD src /app/src # Sync and install the project RUN --mount=type=cache,target=/root/.cache/uv \ uv sync --frozen --no-dev --no-editable FROM python:3.13-slim-bookworm # Set working directory WORKDIR /app # Copy virtual environment from the builder COPY --from=uv /root/.local /root/.local COPY --from=uv --chown=app:app /app/.venv /app/.venv # Place executables in the environment at the front of the path ENV PATH="/app/.venv/bin:$PATH" # Define the entry point ENTRYPOINT ["gcp-mcp-server"] # Example command # CMD ["--project", "your-gcp-project-id", "--location", "your-gcp-location"] ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/core/server.py: -------------------------------------------------------------------------------- ```python # import logging # import os # from contextlib import asynccontextmanager # from typing import Any, AsyncIterator, Dict # from auth import get_credentials # from mcp.server.fastmcp import FastMCP # from ..services import bigquery, compute, iam, storage # logging.basicConfig(level=logging.INFO) # logger = logging.getLogger(__name__) # @asynccontextmanager # async def gcp_lifespan(server) -> AsyncIterator[Dict[str, Any]]: # """Set up GCP context with credentials.""" # logger.info("Initializing GCP MCP server...") # try: # # Get GCP credentials # credentials = get_credentials() # project_id = os.environ.get("GCP_PROJECT_ID") # if not project_id: # logger.warning( # "GCP_PROJECT_ID not set in environment. Some features may not work correctly." # ) # logger.info(f"Server initialized with project: {project_id or 'Not set'}") # # Yield context to be used by handlers # yield {"credentials": credentials, "project_id": project_id} # except Exception as e: # logger.error(f"Failed to initialize GCP context: {str(e)}") # raise # finally: # logger.info("Shutting down GCP MCP server...") # # Create main server FIRST # mcp = FastMCP( # "GCP Manager", # description="Manage Google Cloud Platform Resources", # lifespan=gcp_lifespan, # ) # # Register all services # compute.register(mcp) # storage.register(mcp) # bigquery.register(mcp) # iam.register(mcp) # # THEN define resources and tools # @mcp.resource("gcp://project") # def get_project_info(): # """Get information about the current GCP project""" # project_id = os.environ.get("GCP_PROJECT_ID") # return f"Project ID: {project_id}" # @mcp.resource("gcp://storage/buckets") # def list_buckets(): # """List GCP storage buckets""" # from google.cloud import storage # client = storage.Client() # buckets = list(client.list_buckets(max_results=10)) # return "\n".join([f"- {bucket.name}" for bucket in buckets]) # @mcp.resource("test://hello") # def hello_resource(): # """A simple test resource""" # return "Hello World" # @mcp.tool() # def list_gcp_instances(region: str = "us-central1") -> str: # """List GCP compute instances in a region""" # # Use your credentials to list instances # return f"Instances in {region}: [instance list would go here]" # @mcp.tool() # def test_gcp_auth() -> str: # """Test GCP authentication""" # try: # from google.cloud import storage # client = storage.Client() # buckets = list(client.list_buckets(max_results=5)) # return f"Authentication successful. Found {len(buckets)} buckets." # except Exception as e: # return f"Authentication failed: {str(e)}" # if __name__ == "__main__": # mcp.run() ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/core/auth.py: -------------------------------------------------------------------------------- ```python import json import os import google.auth from google.auth.exceptions import DefaultCredentialsError from google.auth.transport.requests import Request from google.oauth2 import service_account from .logging_handler import MCPLogger # Define required scopes for GCP APIs REQUIRED_SCOPES = [ "https://www.googleapis.com/auth/cloud-platform", ] logger = MCPLogger("auth") def get_credentials(): """ Get Google Cloud credentials from environment. Attempts to load credentials in the following order: 1. From GOOGLE_APPLICATION_CREDENTIALS environment variable 2. From GCP_SERVICE_ACCOUNT_JSON environment variable containing JSON 3. Default application credentials """ try: # Check for credentials file path creds_file = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") if creds_file and os.path.exists(creds_file): logger.audit_log( action="credential_load", resource="service_account", details={"method": "file", "file": creds_file}, ) logger.info(f"Loading credentials from file: {creds_file}") credentials = service_account.Credentials.from_service_account_file( creds_file, scopes=REQUIRED_SCOPES ) return validate_credentials(credentials) # Check for service account JSON in environment sa_json = os.environ.get("GCP_SERVICE_ACCOUNT_JSON") if sa_json: try: service_account_info = json.loads(sa_json) logger.info( "Loading credentials from GCP_SERVICE_ACCOUNT_JSON environment variable" ) credentials = service_account.Credentials.from_service_account_info( service_account_info, scopes=REQUIRED_SCOPES ) logger.audit_log( action="credential_load", resource="service_account", details={"method": "environment_json"}, ) return validate_credentials(credentials) except json.JSONDecodeError as e: logger.error(f"Failed to parse GCP_SERVICE_ACCOUNT_JSON: {str(e)}") # Fall back to default credentials try: logger.audit_log( action="credential_load", resource="application_default", details={"method": "default"}, ) logger.info("Loading default application credentials") credentials, project_id = google.auth.default(scopes=REQUIRED_SCOPES) if project_id and not os.environ.get("GCP_PROJECT_ID"): # Set project ID from default credentials if not already set os.environ["GCP_PROJECT_ID"] = project_id logger.info(f"Using project ID from default credentials: {project_id}") return validate_credentials(credentials) except DefaultCredentialsError as e: logger.error(f"Failed to load GCP credentials: {str(e)}") raise AuthenticationError("Failed to obtain valid credentials") except Exception as e: logger.critical(f"Authentication failure: {str(e)}") raise AuthenticationError(f"Failed to obtain valid credentials: {str(e)}") def validate_credentials(credentials): """Validate credential permissions and expiration""" # Some credentials don't have a valid attribute or may not need refresh if hasattr(credentials, "valid") and not credentials.valid: credentials.refresh(Request()) # Check expiration if hasattr(credentials, "expiry"): if hasattr(credentials, "expired") and credentials.expired: try: credentials.refresh(Request()) except Exception as e: raise AuthenticationError( f"Failed to refresh expired credentials: {str(e)}" ) return credentials class AuthenticationError(Exception): """Custom exception for authentication failures""" pass ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/main.py: -------------------------------------------------------------------------------- ```python import os # At the top of main.py, add these imports and logging statements import sys # Get the directory containing main.py current_dir = os.path.dirname(os.path.abspath(__file__)) # Add it to Python's module search path sys.path.append(current_dir) # Also add the parent directory if needed parent_dir = os.path.dirname(current_dir) sys.path.append(parent_dir) import inspect import logging from contextlib import asynccontextmanager from typing import Any, AsyncIterator, Dict # Import the GCP Clients and Context from mcp.server.fastmcp import FastMCP from services import client_instances # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # After adding paths, print them to see what's happening print(f"Current working directory: {os.getcwd()}") print(f"Updated Python path: {sys.path}") @asynccontextmanager async def gcp_lifespan(server) -> AsyncIterator[Dict[str, Any]]: """Set up GCP context with credentials.""" logger.info("Initializing GCP MCP server...") try: # Initialize GCP credentials credentials = None # Check for service account key file path sa_path = os.environ.get("GCP_SERVICE_ACCOUNT_KEY_PATH") if sa_path and os.path.exists(sa_path): logger.info(f"Using service account key from: {sa_path}") # Initialize GCP clients using the client_instances module client_instances.initialize_clients(credentials) clients = client_instances.get_clients() project_id = client_instances.get_project_id() location = client_instances.get_location() logger.info(f"Server initialized with project: {project_id}") # Yield context with clients and credentials yield { "clients": clients, "project_id": project_id, "credentials": credentials, "location": location, } except Exception as e: logger.error(f"Failed to initialize GCP context: {str(e)}") raise finally: logger.info("Shutting down GCP MCP server...") # Create the global MCP instance mcp = FastMCP( "GCP Manager", description="Manage Google Cloud Platform Resources", lifespan=gcp_lifespan, dependencies=[ "mcp>=1.0.0", "google-cloud-bigquery", "google-cloud-storage", "google-cloud-run", "google-cloud-artifact-registry", "google-cloud-logging", "python-dotenv", "google-cloud-monitoring", "google-cloud-compute", "google-cloud-build", ], ) # Basic resources and tools @mcp.resource("test://hello") def hello_resource(): """A simple test resource""" return "Hello World" @mcp.tool() def test_gcp_auth() -> str: """Test GCP authentication""" try: # Get clients from client_instances module instead of context clients = client_instances.get_clients() # Test if we can list storage buckets if hasattr(clients, "storage"): try: buckets = list(clients.storage.list_buckets(max_results=5)) return f"Authentication successful. Found {len(buckets)} buckets. {buckets}" except Exception as e: return f"Storage authentication failed: {str(e)}" except Exception as e: return f"Authentication failed: {str(e)}" # # Function to register services # def register_services(): # """Register all service modules with the MCP instance.""" # services_dir = os.path.join(os.path.dirname(__file__), "services") # logger.info(f"Loading services from {services_dir}") # if not os.path.exists(services_dir): # logger.warning(f"Services directory not found: {services_dir}") # return # # Get all Python files in the services directory # for filename in os.listdir(services_dir): # if ( # filename.endswith(".py") # and filename != "__init__.py" # and filename != "client_instances.py" # ): # module_name = filename[:-3] # Remove .py extension # try: # # Load the module using importlib # module_path = os.path.join(services_dir, filename) # spec = importlib.util.spec_from_file_location( # f"services.{module_name}", module_path # ) # module = importlib.util.module_from_spec(spec) # # Inject mcp instance into the module # module.mcp = mcp # # Execute the module # spec.loader.exec_module(module) # logger.info(f"Loaded service module: {module_name}") # # If the module has a register function, call it # if hasattr(module, "register"): # # Pass the mcp instance and the server's request_context to register # module.register(mcp) # logger.info(f"Registered service: {module_name}") # except Exception as e: # logger.error(f"Error loading service {module_name}: {e}") def register_services(): """Register all service modules with the MCP instance.""" logger.info("Explicitly registering service modules") # Print diagnostic information logger.info(f"Python sys.path: {sys.path}") logger.info(f"Current working directory: {os.getcwd()}") logger.info(f"Script location: {os.path.abspath(__file__)}") logger.info(f"Parent directory: {os.path.dirname(os.path.abspath(__file__))}") try: # Try importing a service module to check if imports work logger.info("Attempting to import artifact_registry") import services.artifact_registry as artifact_registry logger.info(f"Module location: {inspect.getfile(artifact_registry)}") from services import ( artifact_registry, cloud_audit_logs, cloud_bigquery, cloud_build, cloud_compute_engine, cloud_monitoring, cloud_run, cloud_storage, ) # Register each service module artifact_registry.register(mcp) cloud_audit_logs.register(mcp) cloud_bigquery.register(mcp) cloud_build.register(mcp) cloud_compute_engine.register(mcp) cloud_monitoring.register(mcp) cloud_run.register(mcp) cloud_storage.register(mcp) logger.info("All service modules registered successfully") except Exception as e: logger.error(f"Error registering service modules: {e}") # Add detailed error logging import traceback logger.error(traceback.format_exc()) if __name__ == "__main__": logger.info("Starting GCP MCP server") # Register services register_services() # Run the server mcp.run() ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/core/context.py: -------------------------------------------------------------------------------- ```python # context.py import json import os from typing import Any, Optional, Type import google.auth # Import other services as needed from google.cloud import ( artifactregistry_v1, bigquery, compute_v1, monitoring_v3, storage, ) from google.oauth2 import service_account class GCPClients: """Client manager for GCP services""" def __init__(self, credentials=None): self.credentials = self._get_credentials(credentials) self.project_id = self._get_project_id() self.location = self._get_location() self._clients = {} # Initialize clients self._storage_client = None self._bigquery_client = None self._run_client = None self._logging_client = None self._monitoring_client = None self._compute_client = None self._sql_client = None self._cloudbuild_client = None self._artifactregistry_client = None def _get_credentials(self, credentials=None): """Get credentials from various sources""" # If credentials are directly provided if credentials: return credentials # Check for service account JSON in environment variable sa_json = os.environ.get("GCP_SERVICE_ACCOUNT_JSON") if sa_json: try: sa_info = json.loads(sa_json) return service_account.Credentials.from_service_account_info(sa_info) except Exception as e: print(f"Error loading service account JSON: {e}") # Check for service account key file path sa_path = os.environ.get("GCP_SERVICE_ACCOUNT_KEY_PATH") if sa_path: try: return service_account.Credentials.from_service_account_file(sa_path) except Exception as e: print(f"Error loading service account key file: {e}") # Fall back to application default credentials try: credentials, project = google.auth.default() return credentials except Exception as e: print(f"Error getting default credentials: {e}") raise RuntimeError("No valid GCP credentials found") def _get_project_id(self) -> str: """Get project ID from environment or credentials""" project_id = os.environ.get("GCP_PROJECT_ID") if project_id: return project_id if hasattr(self.credentials, "project_id"): return self.credentials.project_id try: _, project_id = google.auth.default() return project_id except Exception: raise RuntimeError("Unable to determine GCP project ID") def _get_location(self) -> str: """Get default location/region from environment""" return os.environ.get("GCP_LOCATION", "us-central1") # def _init_client( # self, client_class: Type, current_client: Optional[Any], **kwargs # ) -> Any: # """Helper method to initialize clients with error handling""" # if not current_client: # try: # return client_class( # project=self.project_id, # <-- This adds project automatically # credentials=self.credentials, # <-- This adds credentials automatically # **kwargs, # <-- This adds any extra params (like database) # ) # except Exception as e: # raise RuntimeError( # f"Failed to initialize {client_class.__name__}: {str(e)}" # ) # return current_client def _init_client( self, client_class: Type, current_client: Optional[Any], **kwargs ) -> Any: """Helper method to initialize clients with error handling""" if not current_client: try: # Check if this client accepts project parameter if client_class in [ storage.Client ]: # Add other clients that accept project kwargs["project"] = self.project_id return client_class(credentials=self.credentials, **kwargs) except Exception as e: raise RuntimeError( f"Failed to initialize {client_class.__name__}: {str(e)}" ) return current_client @property def storage(self) -> storage.Client: self._storage_client = self._init_client(storage.Client, self._storage_client) return self._storage_client @property def bigquery(self) -> bigquery.Client: self._bigquery_client = self._init_client( bigquery.Client, self._bigquery_client ) return self._bigquery_client @property def artifactregistry(self) -> artifactregistry_v1.ArtifactRegistryClient: """Get the Artifact Registry client.""" if not self._artifactregistry_client: try: # ArtifactRegistryClient doesn't accept project parameter self._artifactregistry_client = ( artifactregistry_v1.ArtifactRegistryClient( credentials=self.credentials ) ) except Exception as e: raise RuntimeError( f"Failed to initialize ArtifactRegistryClient: {str(e)}" ) return self._artifactregistry_client # Uncomment and implement other client properties as needed # @property # def storage(self) -> storage.Client: # self._storage_client = self._init_client(storage.Client, self._storage_client) # return self._storage_client def close_all(self): """Close all open clients""" for client in self._clients.values(): if hasattr(client, "transport") and hasattr(client.transport, "close"): client.transport.close() elif hasattr(client, "close"): client.close() self._clients.clear() # Update Context class to handle credentials properly class Context: def __init__(self, request_context): self.request_context = request_context # Get credentials from lifespan context credentials = request_context.lifespan_context.get("credentials") # Initialize GCPClients with those credentials self.clients = GCPClients(credentials) def close(self): """Clean up when request ends""" if hasattr(self, "clients"): self.clients.close_all() # @property # def run(self) -> run_v2.CloudRunClient: # self._run_client = self._init_client(run_v2.CloudRunClient, self._run_client) # return self._run_client # @property # def logging(self) -> logging_v2.LoggingServiceV2Client: # self._logging_client = self._init_client( # logging_v2.LoggingServiceV2Client, self._logging_client # ) # return self._logging_client @property def monitoring(self) -> monitoring_v3.MetricServiceClient: self._monitoring_client = self._init_client( monitoring_v3.MetricServiceClient, self._monitoring_client ) return self._monitoring_client @property def compute(self) -> compute_v1.InstancesClient: self._compute_client = self._init_client( compute_v1.InstancesClient, self._compute_client ) return self._compute_client # @property # def sql(self) -> sql_v1.InstancesClient: # self._sql_client = self._init_client(sql_v1.InstancesClient, self._sql_client) # return self._sql_client # @property # def cloudbuild(self) -> cloudbuild_v1.CloudBuildClient: # self._cloudbuild_client = self._init_client( # cloudbuild_v1.CloudBuildClient, self._cloudbuild_client # ) # return self._cloudbuild_client ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/cloud_build.py: -------------------------------------------------------------------------------- ```python import json from typing import Dict, Optional from services import client_instances def register(mcp_instance): """Register all Cloud Build resources and tools with the MCP instance.""" # Resources @mcp_instance.resource("gcp://cloudbuild/{project_id}/builds") def list_builds_resource(project_id: str = None) -> str: """List all Cloud Build executions for a project""" try: # Get client from client_instances client = client_instances.get_clients().cloudbuild project_id = project_id or client_instances.get_project_id() builds = client.list_builds(project_id=project_id) result = [] for build in builds: result.append( { "id": build.id, "status": build.status.name if build.status else None, "source": build.source.repo_source.commit_sha if build.source and build.source.repo_source else None, "create_time": build.create_time.isoformat() if build.create_time else None, "logs_url": build.logs_url, "substitutions": dict(build.substitutions) if build.substitutions else {}, } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource("gcp://cloudbuild/{project_id}/triggers") def list_triggers_resource(project_id: str = None) -> str: """List all Cloud Build triggers for a project""" try: # Get client from client_instances client = client_instances.get_clients().cloudbuild project_id = project_id or client_instances.get_project_id() triggers = client.list_triggers(project_id=project_id) result = [] for trigger in triggers: result.append( { "id": trigger.id, "name": trigger.name, "description": trigger.description, "trigger_template": { "repo_name": trigger.trigger_template.repo_name, "branch_name": trigger.trigger_template.branch_name, } if trigger.trigger_template else None, } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) # Tools @mcp_instance.tool() def trigger_build( build_config: Dict, project_id: str = None, ) -> str: """ Trigger a new Cloud Build execution Args: build_config: Dictionary containing the build configuration project_id: GCP project ID (defaults to configured project) """ try: # Get client from client_instances client = client_instances.get_clients().cloudbuild project_id = project_id or client_instances.get_project_id() # Convert config dict to Build object build = cloudbuild_v1.Build.from_json(json.dumps(build_config)) print(f"Triggering build in project {project_id}...") operation = client.create_build(project_id=project_id, build=build) response = operation.result() return json.dumps( { "status": "success", "build_id": response.id, "status": response.status.name if response.status else None, "logs_url": response.logs_url, "build_name": response.name, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def list_builds( project_id: str = None, filter: Optional[str] = None, page_size: Optional[int] = 100, ) -> str: """ List Cloud Build executions with optional filtering Args: project_id: GCP project ID (defaults to configured project) filter: Filter expression for builds page_size: Maximum number of results to return """ try: # Get client from client_instances client = client_instances.get_clients().cloudbuild project_id = project_id or client_instances.get_project_id() request = cloudbuild_v1.ListBuildsRequest( project_id=project_id, filter=filter, page_size=page_size ) builds = [] for build in client.list_builds(request=request): builds.append( { "id": build.id, "status": build.status.name if build.status else None, "create_time": build.create_time.isoformat() if build.create_time else None, "source": build.source.repo_source.commit_sha if build.source and build.source.repo_source else None, "logs_url": build.logs_url, } ) return json.dumps( {"status": "success", "builds": builds, "count": len(builds)}, indent=2 ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def create_build_trigger( trigger_config: Dict, project_id: str = None, ) -> str: """ Create a new Cloud Build trigger Args: trigger_config: Dictionary containing the trigger configuration project_id: GCP project ID (defaults to configured project) """ try: # Get client from client_instances client = client_instances.get_clients().cloudbuild project_id = project_id or client_instances.get_project_id() # Convert config dict to Trigger object trigger = cloudbuild_v1.BuildTrigger.from_json(json.dumps(trigger_config)) print(f"Creating trigger in project {project_id}...") response = client.create_trigger(project_id=project_id, trigger=trigger) return json.dumps( { "status": "success", "trigger_id": response.id, "name": response.name, "description": response.description, "create_time": response.create_time.isoformat() if response.create_time else None, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) # Prompts @mcp_instance.prompt() def build_config_prompt(service_type: str = "docker") -> str: """Prompt for creating a Cloud Build configuration""" return f""" I need to create a Cloud Build configuration for {service_type} deployments. Please help with: 1. Recommended build steps for {service_type} 2. Proper use of substitutions 3. Caching strategies 4. Security best practices 5. Integration with other GCP services """ @mcp_instance.prompt() def trigger_setup_prompt(repo_type: str = "github") -> str: """Prompt for configuring build triggers""" return f""" I want to set up a {repo_type} trigger for Cloud Build. Please explain: 1. Required permissions and connections 2. Event types (push, PR, etc.) 3. File pattern matching 4. Branch/tag filtering 5. Approval workflows """ ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/cloud_compute_engine.py: -------------------------------------------------------------------------------- ```python import json from typing import Optional from services import client_instances def register(mcp_instance): """Register all Compute Engine resources and tools with the MCP instance.""" # Resources @mcp_instance.resource("gcp://compute/{project_id}/{zone}/instances") def list_instances_resource(project_id: str = None, zone: str = None) -> str: """List all Compute Engine instances in a zone""" try: # Get client from client_instances client = client_instances.get_clients().compute project_id = project_id or client_instances.get_project_id() zone = zone or client_instances.get_location() instance_list = client.list(project=project_id, zone=zone) result = [] for instance in instance_list: result.append( { "name": instance.name, "status": instance.status, "machine_type": instance.machine_type.split("/")[-1] if instance.machine_type else None, "internal_ip": instance.network_interfaces[0].network_i_p if instance.network_interfaces and len(instance.network_interfaces) > 0 else None, "creation_timestamp": instance.creation_timestamp, } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) # Tools @mcp_instance.tool() def create_instance( instance_name: str, machine_type: str, project_id: str = None, zone: str = None, image: str = "projects/debian-cloud/global/images/family/debian-11", network: str = "global/networks/default", disk_size_gb: int = 10, ) -> str: """ Create a new Compute Engine instance Args: instance_name: Name for the new instance machine_type: Machine type (e.g., n2-standard-2) project_id: GCP project ID (defaults to configured project) zone: GCP zone (defaults to configured location) image: Source image for boot disk network: Network to connect to disk_size_gb: Size of boot disk in GB """ try: # Get client from client_instances client = client_instances.get_clients().compute project_id = project_id or client_instances.get_project_id() zone = zone or client_instances.get_location() # Build instance configuration instance_config = { "name": instance_name, "machine_type": f"zones/{zone}/machineTypes/{machine_type}", "disks": [ { "boot": True, "auto_delete": True, "initialize_params": { "source_image": image, "disk_size_gb": disk_size_gb, }, } ], "network_interfaces": [ { "network": network, "access_configs": [ {"type": "ONE_TO_ONE_NAT", "name": "External NAT"} ], } ], } print(f"Creating instance {instance_name} in {zone}...") operation = client.insert( project=project_id, zone=zone, instance_resource=instance_config ) # Wait for operation to complete operation.result() return json.dumps( { "status": "success", "instance_name": instance_name, "zone": zone, "operation_type": "insert", "operation_status": "DONE", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def list_instances( project_id: str = None, zone: str = None, filter: Optional[str] = None, ) -> str: """ List Compute Engine instances in a zone Args: project_id: GCP project ID (defaults to configured project) zone: GCP zone (defaults to configured location) filter: Optional filter expression (e.g., "status=RUNNING") """ try: # Get client from client_instances client = client_instances.get_clients().compute project_id = project_id or client_instances.get_project_id() zone = zone or client_instances.get_location() instance_list = client.list(project=project_id, zone=zone, filter=filter) instances = [] for instance in instance_list: instances.append( { "name": instance.name, "status": instance.status, "machine_type": instance.machine_type.split("/")[-1] if instance.machine_type else None, "internal_ip": instance.network_interfaces[0].network_i_p if instance.network_interfaces and len(instance.network_interfaces) > 0 else None, "creation_timestamp": instance.creation_timestamp, } ) return json.dumps( {"status": "success", "instances": instances, "count": len(instances)}, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def start_instance( instance_name: str, project_id: str = None, zone: str = None, ) -> str: """ Start a stopped Compute Engine instance Args: instance_name: Name of the instance to start project_id: GCP project ID (defaults to configured project) zone: GCP zone (defaults to configured location) """ try: # Get client from client_instances client = client_instances.get_clients().compute project_id = project_id or client_instances.get_project_id() zone = zone or client_instances.get_location() print(f"Starting instance {instance_name}...") operation = client.start( project=project_id, zone=zone, instance=instance_name ) operation.result() return json.dumps( { "status": "success", "instance_name": instance_name, "operation_status": "DONE", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def stop_instance( instance_name: str, project_id: str = None, zone: str = None, ) -> str: """ Stop a running Compute Engine instance Args: instance_name: Name of the instance to stop project_id: GCP project ID (defaults to configured project) zone: GCP zone (defaults to configured location) """ try: # Get client from client_instances client = client_instances.get_clients().compute project_id = project_id or client_instances.get_project_id() zone = zone or client_instances.get_location() print(f"Stopping instance {instance_name}...") operation = client.stop( project=project_id, zone=zone, instance=instance_name ) operation.result() return json.dumps( { "status": "success", "instance_name": instance_name, "operation_status": "DONE", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def delete_instance( instance_name: str, project_id: str = None, zone: str = None, ) -> str: """ Delete a Compute Engine instance Args: instance_name: Name of the instance to delete project_id: GCP project ID (defaults to configured project) zone: GCP zone (defaults to configured location) """ try: # Get client from client_instances client = client_instances.get_clients().compute project_id = project_id or client_instances.get_project_id() zone = zone or client_instances.get_location() print(f"Deleting instance {instance_name}...") operation = client.delete( project=project_id, zone=zone, instance=instance_name ) operation.result() return json.dumps( { "status": "success", "instance_name": instance_name, "operation_status": "DONE", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) # Prompts @mcp_instance.prompt() def instance_config_prompt(workload_type: str = "web-server") -> str: """Prompt for creating Compute Engine configurations""" return f""" I need to configure a Compute Engine instance for {workload_type}. Please help with: 1. Recommended machine types for {workload_type} 2. Disk type and size recommendations 3. Network configuration best practices 4. Security considerations (service accounts, firewall rules) 5. Cost optimization strategies """ @mcp_instance.prompt() def troubleshooting_prompt(issue: str = "instance not responding") -> str: """Prompt for troubleshooting Compute Engine issues""" return f""" I'm experiencing {issue} with my Compute Engine instance. Please guide me through: 1. Common causes for this issue 2. Diagnostic steps using Cloud Console and CLI 3. Log analysis techniques 4. Recovery procedures 5. Prevention strategies """ ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/cloud_audit_logs.py: -------------------------------------------------------------------------------- ```python import json from google.cloud import logging_v2 from services import client_instances def register(mcp_instance): """Register all Cloud Audit Logs resources and tools with the MCP instance.""" # Resources @mcp_instance.resource("gcp://audit-logs/{project_id}") def list_audit_logs_resource(project_id: str = None) -> str: """List recent audit logs from a specific project""" try: # Get client from client_instances client = client_instances.get_clients().logging project_id = project_id or client_instances.get_project_id() # Filter for audit logs only filter_str = 'logName:"cloudaudit.googleapis.com"' # List log entries entries = client.list_log_entries( request={ "resource_names": [f"projects/{project_id}"], "filter": filter_str, "page_size": 10, # Limiting to 10 for responsiveness } ) result = [] for entry in entries: log_data = { "timestamp": entry.timestamp.isoformat() if entry.timestamp else None, "severity": entry.severity.name if entry.severity else None, "log_name": entry.log_name, "resource": { "type": entry.resource.type, "labels": dict(entry.resource.labels) if entry.resource.labels else {}, } if entry.resource else {}, } # Handle payload based on type if hasattr(entry, "json_payload") and entry.json_payload: log_data["payload"] = dict(entry.json_payload) elif hasattr(entry, "proto_payload") and entry.proto_payload: log_data["payload"] = "Proto payload (details omitted)" elif hasattr(entry, "text_payload") and entry.text_payload: log_data["payload"] = entry.text_payload result.append(log_data) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource("gcp://audit-logs/{project_id}/sinks") def list_log_sinks_resource(project_id: str = None) -> str: """List log sinks configured for a project""" try: # Get client from client_instances client = client_instances.get_clients().logging project_id = project_id or client_instances.get_project_id() parent = f"projects/{project_id}" sinks = client.list_sinks(parent=parent) result = [] for sink in sinks: result.append( { "name": sink.name, "destination": sink.destination, "filter": sink.filter, "description": sink.description, "disabled": sink.disabled, } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) # Tools @mcp_instance.tool() def list_audit_logs( project_id: str = None, filter_str: str = 'logName:"cloudaudit.googleapis.com"', page_size: int = 20, ) -> str: """ List Google Cloud Audit logs from a project Args: project_id: GCP project ID (defaults to configured project) filter_str: Log filter expression (defaults to audit logs) page_size: Maximum number of entries to return """ try: # Get client from client_instances client = client_instances.get_clients().logging project_id = project_id or client_instances.get_project_id() print(f"Retrieving audit logs from project {project_id}...") # List log entries entries = client.list_log_entries( request={ "resource_names": [f"projects/{project_id}"], "filter": filter_str, "page_size": page_size, } ) result = [] for entry in entries: log_data = { "timestamp": entry.timestamp.isoformat() if entry.timestamp else None, "severity": entry.severity.name if entry.severity else None, "log_name": entry.log_name, "resource": { "type": entry.resource.type, "labels": dict(entry.resource.labels) if entry.resource.labels else {}, } if entry.resource else {}, } # Handle payload based on type if hasattr(entry, "json_payload") and entry.json_payload: log_data["payload"] = dict(entry.json_payload) elif hasattr(entry, "proto_payload") and entry.proto_payload: log_data["payload"] = "Proto payload (details omitted)" elif hasattr(entry, "text_payload") and entry.text_payload: log_data["payload"] = entry.text_payload result.append(log_data) return json.dumps( { "status": "success", "entries": result, "count": len(result), }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def filter_admin_activity_logs( project_id: str = None, service_name: str = "", resource_type: str = "", time_range: str = "1h", page_size: int = 20, ) -> str: """ Filter Admin Activity audit logs for specific services or resource types Args: project_id: GCP project ID (defaults to configured project) service_name: Optional service name to filter by (e.g., compute.googleapis.com) resource_type: Optional resource type to filter by (e.g., gce_instance) time_range: Time range for logs (e.g., 1h, 24h, 7d) page_size: Maximum number of entries to return """ try: # Get client from client_instances client = client_instances.get_clients().logging project_id = project_id or client_instances.get_project_id() # Build filter string filter_parts = ['logName:"cloudaudit.googleapis.com%2Factivity"'] if service_name: filter_parts.append(f'protoPayload.serviceName="{service_name}"') if resource_type: filter_parts.append(f'resource.type="{resource_type}"') if time_range: filter_parts.append(f'timestamp >= "-{time_range}"') filter_str = " AND ".join(filter_parts) print(f"Filtering Admin Activity logs with: {filter_str}") # List log entries entries = client.list_log_entries( request={ "resource_names": [f"projects/{project_id}"], "filter": filter_str, "page_size": page_size, } ) result = [] for entry in entries: # Extract relevant fields for admin activity logs log_data = { "timestamp": entry.timestamp.isoformat() if entry.timestamp else None, "method_name": None, "resource_name": None, "service_name": None, "user": None, } # Extract data from proto payload if hasattr(entry, "proto_payload") and entry.proto_payload: payload = entry.proto_payload if hasattr(payload, "method_name"): log_data["method_name"] = payload.method_name if hasattr(payload, "resource_name"): log_data["resource_name"] = payload.resource_name if hasattr(payload, "service_name"): log_data["service_name"] = payload.service_name # Extract authentication info if hasattr(payload, "authentication_info"): auth_info = payload.authentication_info if hasattr(auth_info, "principal_email"): log_data["user"] = auth_info.principal_email result.append(log_data) return json.dumps( { "status": "success", "entries": result, "count": len(result), "filter": filter_str, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def create_log_sink( sink_name: str, destination: str, project_id: str = None, filter_str: str = 'logName:"cloudaudit.googleapis.com"', description: str = "", ) -> str: """ Create a log sink to export audit logs to a destination Args: sink_name: Name for the new log sink destination: Destination for logs (e.g., storage.googleapis.com/my-bucket) project_id: GCP project ID (defaults to configured project) filter_str: Log filter expression (defaults to audit logs) description: Optional description for the sink """ try: # Get client from client_instances client = client_instances.get_clients().logging project_id = project_id or client_instances.get_project_id() parent = f"projects/{project_id}" # Create sink configuration sink = logging_v2.LogSink( name=sink_name, destination=destination, filter=filter_str, description=description, ) print(f"Creating log sink '{sink_name}' to export to {destination}...") # Create the sink response = client.create_sink( request={ "parent": parent, "sink": sink, } ) # Important: Recommend setting up IAM permissions writer_identity = response.writer_identity return json.dumps( { "status": "success", "name": response.name, "destination": response.destination, "filter": response.filter, "writer_identity": writer_identity, "next_steps": f"Important: Grant {writer_identity} the appropriate permissions on the destination", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) # Prompts @mcp_instance.prompt() def audit_log_investigation() -> str: """Prompt for investigating security incidents through audit logs""" return """ I need to investigate a potential security incident in my Google Cloud project. Please help me: 1. Determine what types of audit logs I should check (Admin Activity, Data Access, System Event) 2. Create an effective filter query to find relevant logs 3. Identify key fields to examine for signs of unusual activity 4. Suggest common indicators of potential security issues in audit logs """ @mcp_instance.prompt() def log_export_setup() -> str: """Prompt for setting up log exports for compliance""" return """ I need to set up log exports for compliance requirements. Please help me: 1. Understand the different destinations available for log exports 2. Design an effective filter to capture all required audit events 3. Implement a log sink with appropriate permissions 4. Verify my setup is correctly capturing and exporting logs """ ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/cloud_storage.py: -------------------------------------------------------------------------------- ```python import json import os from typing import Dict, Optional from services import client_instances def register(mcp_instance): """Register all Cloud Storage resources and tools with the MCP instance.""" # Resources @mcp_instance.resource("gcp://storage/{project_id}/buckets") def list_buckets_resource(project_id: str = None) -> str: """List all Cloud Storage buckets in a project""" try: # Get client from client_instances client = client_instances.get_clients().storage project_id = project_id or client_instances.get_project_id() buckets = client.list_buckets() result = [] for bucket in buckets: result.append( { "name": bucket.name, "location": bucket.location, "storage_class": bucket.storage_class, "time_created": bucket.time_created.isoformat() if bucket.time_created else None, "versioning_enabled": bucket.versioning_enabled, "labels": dict(bucket.labels) if bucket.labels else {}, } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource("gcp://storage/{project_id}/bucket/{bucket_name}") def get_bucket_resource(project_id: str = None, bucket_name: str = None) -> str: """Get details for a specific Cloud Storage bucket""" try: # Get client from client_instances client = client_instances.get_clients().storage project_id = project_id or client_instances.get_project_id() bucket = client.get_bucket(bucket_name) result = { "name": bucket.name, "location": bucket.location, "storage_class": bucket.storage_class, "time_created": bucket.time_created.isoformat() if bucket.time_created else None, "versioning_enabled": bucket.versioning_enabled, "requester_pays": bucket.requester_pays, "lifecycle_rules": bucket.lifecycle_rules, "cors": bucket.cors, "labels": dict(bucket.labels) if bucket.labels else {}, } return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource("gcp://storage/{project_id}/bucket/{bucket_name}/objects") def list_objects_resource(project_id: str = None, bucket_name: str = None) -> str: """List objects in a specific Cloud Storage bucket""" prefix = "" # Move it inside the function with a default value try: # Get client from client_instances client = client_instances.get_clients().storage project_id = project_id or client_instances.get_project_id() bucket = client.get_bucket(bucket_name) blobs = bucket.list_blobs(prefix=prefix) result = [] for blob in blobs: result.append( { "name": blob.name, "size": blob.size, "updated": blob.updated.isoformat() if blob.updated else None, "content_type": blob.content_type, "md5_hash": blob.md5_hash, "generation": blob.generation, "metadata": blob.metadata, } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) # Tools @mcp_instance.tool() def create_bucket( bucket_name: str, project_id: str = None, location: str = "us-central1", storage_class: str = "STANDARD", labels: Optional[Dict[str, str]] = None, versioning_enabled: bool = False, ) -> str: """ Create a Cloud Storage bucket Args: bucket_name: Name for the new bucket project_id: GCP project ID (defaults to configured project) location: GCP region (e.g., us-central1) storage_class: Storage class (STANDARD, NEARLINE, COLDLINE, ARCHIVE) labels: Optional key-value pairs for bucket labels versioning_enabled: Whether to enable object versioning """ try: # Get client from client_instances client = client_instances.get_clients().storage project_id = project_id or client_instances.get_project_id() # Validate storage class valid_storage_classes = ["STANDARD", "NEARLINE", "COLDLINE", "ARCHIVE"] if storage_class not in valid_storage_classes: return json.dumps( { "status": "error", "message": f"Invalid storage class: {storage_class}. Valid classes are: {', '.join(valid_storage_classes)}", }, indent=2, ) # Log info (similar to ctx.info) print(f"Creating bucket {bucket_name} in {location}...") bucket = client.bucket(bucket_name) bucket.create(location=location, storage_class=storage_class, labels=labels) # Set versioning if enabled if versioning_enabled: bucket.versioning_enabled = True bucket.patch() return json.dumps( { "status": "success", "name": bucket.name, "location": bucket.location, "storage_class": bucket.storage_class, "time_created": bucket.time_created.isoformat() if bucket.time_created else None, "versioning_enabled": bucket.versioning_enabled, "url": f"gs://{bucket_name}/", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def list_buckets(project_id: str = None, prefix: str = "") -> str: """ List Cloud Storage buckets in a project Args: project_id: GCP project ID (defaults to configured project) prefix: Optional prefix to filter bucket names """ try: # Get client from client_instances client = client_instances.get_clients().storage project_id = project_id or client_instances.get_project_id() # Log info (similar to ctx.info) print(f"Listing buckets in project {project_id}...") # List buckets with optional prefix filter if prefix: buckets = [ b for b in client.list_buckets() if b.name.startswith(prefix) ] else: buckets = list(client.list_buckets()) result = [] for bucket in buckets: result.append( { "name": bucket.name, "location": bucket.location, "storage_class": bucket.storage_class, "time_created": bucket.time_created.isoformat() if bucket.time_created else None, } ) return json.dumps( {"status": "success", "buckets": result, "count": len(result)}, indent=2 ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def upload_object( bucket_name: str, source_file_path: str, destination_blob_name: Optional[str] = None, project_id: str = None, content_type: Optional[str] = None, metadata: Optional[Dict[str, str]] = None, ) -> str: """ Upload an object to a Cloud Storage bucket Args: bucket_name: Name of the bucket source_file_path: Local path to the file to upload destination_blob_name: Name to assign to the blob (defaults to file basename) project_id: GCP project ID (defaults to configured project) content_type: Content type of the object (optional) metadata: Custom metadata dictionary (optional) """ try: # Get client from client_instances client = client_instances.get_clients().storage project_id = project_id or client_instances.get_project_id() # Check if file exists if not os.path.exists(source_file_path): return json.dumps( { "status": "error", "message": f"File not found: {source_file_path}", }, indent=2, ) # Get bucket bucket = client.bucket(bucket_name) # Use filename if destination_blob_name not provided if not destination_blob_name: destination_blob_name = os.path.basename(source_file_path) # Create blob blob = bucket.blob(destination_blob_name) # Set content type if provided if content_type: blob.content_type = content_type # Set metadata if provided if metadata: blob.metadata = metadata # Get file size for progress reporting file_size = os.path.getsize(source_file_path) print( f"Uploading {source_file_path} ({file_size} bytes) to gs://{bucket_name}/{destination_blob_name}..." ) # Upload file blob.upload_from_filename(source_file_path) return json.dumps( { "status": "success", "bucket": bucket_name, "name": blob.name, "size": blob.size, "content_type": blob.content_type, "md5_hash": blob.md5_hash, "generation": blob.generation, "public_url": blob.public_url, "gsutil_uri": f"gs://{bucket_name}/{destination_blob_name}", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def download_object( bucket_name: str, source_blob_name: str, destination_file_path: str, project_id: str = None, ) -> str: """ Download an object from a Cloud Storage bucket Args: bucket_name: Name of the bucket source_blob_name: Name of the blob to download destination_file_path: Local path where the file should be saved project_id: GCP project ID (defaults to configured project) """ try: # Get client from client_instances client = client_instances.get_clients().storage project_id = project_id or client_instances.get_project_id() # Get bucket and blob bucket = client.bucket(bucket_name) blob = bucket.blob(source_blob_name) # Check if blob exists if not blob.exists(): return json.dumps( { "status": "error", "message": f"Object not found: gs://{bucket_name}/{source_blob_name}", }, indent=2, ) # Create directory if doesn't exist os.makedirs( os.path.dirname(os.path.abspath(destination_file_path)), exist_ok=True ) print( f"Downloading gs://{bucket_name}/{source_blob_name} to {destination_file_path}..." ) # Download file blob.download_to_filename(destination_file_path) return json.dumps( { "status": "success", "bucket": bucket_name, "blob_name": source_blob_name, "size": blob.size, "content_type": blob.content_type, "downloaded_to": destination_file_path, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def delete_object(bucket_name: str, blob_name: str, project_id: str = None) -> str: """ Delete an object from a Cloud Storage bucket Args: bucket_name: Name of the bucket blob_name: Name of the blob to delete project_id: GCP project ID (defaults to configured project) """ try: # Get client from client_instances client = client_instances.get_clients().storage project_id = project_id or client_instances.get_project_id() # Get bucket and blob bucket = client.bucket(bucket_name) blob = bucket.blob(blob_name) print(f"Deleting gs://{bucket_name}/{blob_name}...") # Delete the blob blob.delete() return json.dumps( { "status": "success", "message": f"Successfully deleted gs://{bucket_name}/{blob_name}", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) # Prompts @mcp_instance.prompt() def create_bucket_prompt(location: str = "us-central1") -> str: """Prompt for creating a new Cloud Storage bucket""" return f""" I need to create a new Cloud Storage bucket in {location}. Please help me with: 1. Understanding storage classes (STANDARD, NEARLINE, COLDLINE, ARCHIVE) 2. Best practices for bucket naming 3. When to enable versioning 4. Recommendations for bucket security settings 5. Steps to create the bucket """ @mcp_instance.prompt() def upload_file_prompt() -> str: """Prompt for help with uploading files to Cloud Storage""" return """ I need to upload files to a Cloud Storage bucket. Please help me understand: 1. The best way to organize files in Cloud Storage 2. When to use folders/prefixes 3. How to set appropriate permissions on uploaded files 4. How to make files publicly accessible (if needed) 5. The steps to perform the upload """ ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/artifact_registry.py: -------------------------------------------------------------------------------- ```python import json from services import client_instances def register(mcp_instance): """Register all resources and tools with the MCP instance.""" @mcp_instance.resource( "gcp://artifactregistry/{project_id}/{location}/repositories" ) def list_repositories_resource(project_id: str = None, location: str = None) -> str: """List all Artifact Registry repositories in a specific location""" try: # define the client client = client_instances.get_clients().artifactregistry project_id = project_id or client_instances.get_project_id() location = location or client_instances.get_location() # Use parameters directly from URL path parent = f"projects/{project_id}/locations/{location}" repositories = client.list_repositories(parent=parent) result = [] for repo in repositories: result.append( { "name": repo.name.split("/")[-1], "format": repo.format.name if hasattr(repo.format, "name") else str(repo.format), "description": repo.description, "create_time": repo.create_time.isoformat() if repo.create_time else None, "update_time": repo.update_time.isoformat() if repo.update_time else None, "kms_key_name": repo.kms_key_name, "labels": dict(repo.labels) if repo.labels else {}, } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) # Add a tool for creating repositories @mcp_instance.tool() def create_artifact_repository( name: str, format: str, description: str = "" ) -> str: """Create a new Artifact Registry repository""" try: # define the client client = client_instances.get_clients().artifactregistry project_id = client_instances.get_project_id() location = client_instances.get_location() parent = f"projects/{project_id}/locations/{location}" # Create repository request from google.cloud.artifactregistry_v1 import ( CreateRepositoryRequest, Repository, ) # Create the repository format enum value if format.upper() not in ["DOCKER", "MAVEN", "NPM", "PYTHON", "APT", "YUM"]: return json.dumps( { "error": f"Invalid format: {format}. Must be one of: DOCKER, MAVEN, NPM, PYTHON, APT, YUM" }, indent=2, ) repo = Repository( name=f"{parent}/repositories/{name}", format=getattr(Repository.Format, format.upper()), description=description, ) request = CreateRepositoryRequest( parent=parent, repository_id=name, repository=repo ) operation = client.create_repository(request=request) result = operation.result() # Wait for operation to complete return json.dumps( { "name": result.name.split("/")[-1], "format": result.format.name if hasattr(result.format, "name") else str(result.format), "description": result.description, "create_time": result.create_time.isoformat() if result.create_time else None, "update_time": result.update_time.isoformat() if result.update_time else None, }, indent=2, ) except Exception as e: return json.dumps({"error": str(e)}, indent=2) # @mcp_instance.tool() # def get_artifact_repository( # project_id: str = None, # location: str = None, # repository_id: str = None, # ) -> str: # """Get details about a specific Artifact Registry repository""" # try: # # Get the client from the context # clients = ctx.lifespan_context["clients"] # client = clients.artifactregistry # # Use context values if parameters not provided # project_id = project_id or ctx.lifespan_context["project_id"] # location = location or ctx.lifespan_context["location"] # if not repository_id: # return json.dumps({"error": "Repository ID is required"}, indent=2) # name = f"projects/{project_id}/locations/{location}/repositories/{repository_id}" # repo = client.get_repository(name=name) # result = { # "name": repo.name.split("/")[-1], # "format": repo.format.name # if hasattr(repo.format, "name") # else str(repo.format), # "description": repo.description, # "create_time": repo.create_time.isoformat() # if repo.create_time # else None, # "update_time": repo.update_time.isoformat() # if repo.update_time # else None, # "kms_key_name": repo.kms_key_name, # "labels": dict(repo.labels) if repo.labels else {}, # } # return json.dumps(result, indent=2) # except Exception as e: # return json.dumps({"error": str(e)}, indent=2) # @mcp_instance.tool() # def upload_artifact( # repo_name: str, # artifact_path: str, # package: str = "", # version: str = "", # project_id: str = None, # location: str = None, # ) -> str: # """ # Upload an artifact to Artifact Registry # Args: # project_id: GCP project ID # location: GCP region (e.g., us-central1) # repo_name: Name of the repository # artifact_path: Local path to the artifact file # package: Package name (optional) # version: Version string (optional) # """ # import os # try: # # Get the client from the context # clients = ctx.lifespan_context["clients"] # client = clients.artifactregistry # # Use context values if parameters not provided # project_id = project_id or ctx.lifespan_context["project_id"] # location = location or ctx.lifespan_context["location"] # if not os.path.exists(artifact_path): # return json.dumps( # {"status": "error", "message": f"File not found: {artifact_path}"} # ) # filename = os.path.basename(artifact_path) # file_size = os.path.getsize(artifact_path) # parent = ( # f"projects/{project_id}/locations/{location}/repositories/{repo_name}" # ) # with open(artifact_path, "rb") as f: # file_contents = f.read() # # Use the standard client for upload # result = client.upload_artifact( # parent=parent, contents=file_contents, artifact_path=filename # ) # return json.dumps( # { # "status": "success", # "uri": result.uri if hasattr(result, "uri") else None, # "message": f"Successfully uploaded {filename}", # }, # indent=2, # ) # except Exception as e: # return json.dumps({"status": "error", "message": str(e)}, indent=2) # # Prompts # @mcp_instance.prompt() # def create_repository_prompt(location: str = "us-central1") -> str: # """Prompt for creating a new Artifact Registry repository""" # return f""" # I need to create a new Artifact Registry repository in {location}. # Please help me with: # 1. What formats are available (Docker, NPM, Maven, etc.) # 2. Best practices for naming repositories # 3. Recommendations for labels I should apply # 4. Steps to create the repository # """ # @mcp_instance.prompt() # def upload_artifact_prompt(repo_format: str = "DOCKER") -> str: # """Prompt for help with uploading artifacts""" # return f""" # I need to upload a {repo_format.lower()} artifact to my Artifact Registry repository. # Please help me understand: # 1. The recommended way to upload {repo_format.lower()} artifacts # 2. Any naming conventions I should follow # 3. How to ensure proper versioning # 4. The steps to perform the upload # """ # import json # import os # from typing import Optional # from google.cloud import artifactregistry_v1 # from mcp.server.fastmcp import Context # # Instead of importing from core.server # # Resources # @mcp.resource("gcp://artifactregistry/{project_id}/{location}/repositories") # def list_repositories_resource( # project_id: str = None, location: str = None, ctx: Context = None # ) -> str: # """List all Artifact Registry repositories in a specific location""" # client = ctx.clients.artifactregistry # project_id = project_id or ctx.clients.project_id # location = location or ctx.clients.location # parent = f"projects/{project_id}/locations/{location}" # repositories = client.list_repositories(parent=parent) # result = [] # for repo in repositories: # result.append( # { # "name": repo.name.split("/")[-1], # "format": repo.format.name, # "description": repo.description, # "create_time": repo.create_time.isoformat() # if repo.create_time # else None, # "update_time": repo.update_time.isoformat() # if repo.update_time # else None, # "kms_key_name": repo.kms_key_name, # "labels": dict(repo.labels) if repo.labels else {}, # } # ) # return json.dumps(result, indent=2) # @mcp.resource("gcp://artifactregistry/{project_id}/{location}/repository/{repo_name}") # def get_repository_resource( # repo_name: str, project_id: str = None, location: str = None, ctx: Context = None # ) -> str: # """Get details for a specific Artifact Registry repository""" # client = ctx.clients.artifactregistry # project_id = project_id or ctx.clients.project_id # location = location or ctx.clients.location # name = f"projects/{project_id}/locations/{location}/repositories/{repo_name}" # try: # repo = client.get_repository(name=name) # result = { # "name": repo.name.split("/")[-1], # "format": repo.format.name, # "description": repo.description, # "create_time": repo.create_time.isoformat() if repo.create_time else None, # "update_time": repo.update_time.isoformat() if repo.update_time else None, # "kms_key_name": repo.kms_key_name, # "labels": dict(repo.labels) if repo.labels else {}, # } # return json.dumps(result, indent=2) # except Exception as e: # return json.dumps({"error": str(e)}) # @mcp.resource( # "gcp://artifactregistry/{project_id}/{location}/repository/{repo_name}/packages" # ) # def list_packages_resource( # repo_name: str, project_id: str = None, location: str = None, ctx: Context = None # ) -> str: # """List packages in a specific Artifact Registry repository""" # client = ctx.clients.artifactregistry # project_id = project_id or ctx.clients.project_id # location = location or ctx.clients.location # parent = f"projects/{project_id}/locations/{location}/repositories/{repo_name}" # packages = client.list_packages(parent=parent) # result = [] # for package in packages: # result.append( # { # "name": package.name.split("/")[-1], # "display_name": package.display_name, # "create_time": package.create_time.isoformat() # if package.create_time # else None, # "update_time": package.update_time.isoformat() # if package.update_time # else None, # } # ) # return json.dumps(result, indent=2) # # Tools # @mcp.tool() # async def create_repository( # repo_name: str, # format: str = "DOCKER", # description: str = "", # labels: Optional[dict] = None, # project_id: str = None, # location: str = None, # ctx: Context = None, # ) -> str: # """ # Create an Artifact Registry repository # Args: # project_id: GCP project ID # location: GCP region (e.g., us-central1) # repo_name: Name for the new repository # format: Repository format (DOCKER, NPM, PYTHON, MAVEN, APT, YUM, GO) # description: Optional description for the repository # labels: Optional key-value pairs for repository labels # """ # client = ctx.clients.artifactregistry # project_id = project_id or ctx.clients.project_id # location = location or ctx.clients.location # # Validate format # try: # format_enum = artifactregistry_v1.Repository.Format[format] # except KeyError: # return json.dumps( # { # "status": "error", # "message": f"Invalid format: {format}. Valid formats are: {', '.join(artifactregistry_v1.Repository.Format._member_names_)}", # } # ) # # Create repository # parent = f"projects/{project_id}/locations/{location}" # repository = artifactregistry_v1.Repository( # format=format_enum, # description=description, # ) # if labels: # repository.labels = labels # request = artifactregistry_v1.CreateRepositoryRequest( # parent=parent, repository_id=repo_name, repository=repository # ) # try: # ctx.info(f"Creating repository {repo_name} in {location}...") # response = client.create_repository(request=request) # return json.dumps( # { # "status": "success", # "name": response.name, # "format": response.format.name, # "description": response.description, # "create_time": response.create_time.isoformat() # if response.create_time # else None, # }, # indent=2, # ) # except Exception as e: # return json.dumps({"status": "error", "message": str(e)}) # @mcp.tool() # async def list_repositories( # project_id: str = None, location: str = None, ctx: Context = None # ) -> str: # """ # List Artifact Registry repositories in a specific location # Args: # project_id: GCP project ID # location: GCP region (e.g., us-central1) # """ # client = ctx.clients.artifactregistry # project_id = project_id or ctx.clients.project_id # location = location or ctx.clients.location # parent = f"projects/{project_id}/locations/{location}" # try: # ctx.info(f"Listing repositories in {location}...") # repositories = client.list_repositories(parent=parent) # result = [] # for repo in repositories: # result.append( # { # "name": repo.name.split("/")[-1], # "format": repo.format.name, # "description": repo.description, # "create_time": repo.create_time.isoformat() # if repo.create_time # else None, # } # ) # return json.dumps( # {"status": "success", "repositories": result, "count": len(result)}, # indent=2, # ) # except Exception as e: # return json.dumps({"status": "error", "message": str(e)}) ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/cloud_bigquery.py: -------------------------------------------------------------------------------- ```python import json from typing import Any, Dict, List, Optional from google.cloud import bigquery from google.cloud.exceptions import NotFound from services import client_instances def register(mcp_instance): """Register all BigQuery resources and tools with the MCP instance.""" # Resources @mcp_instance.resource("gcp://bigquery/{project_id}/datasets") def list_datasets_resource(project_id: str = None) -> str: """List all BigQuery datasets in a project""" try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() datasets = list(client.list_datasets()) result = [] for dataset in datasets: result.append( { "id": dataset.dataset_id, "full_id": dataset.full_dataset_id, "friendly_name": dataset.friendly_name, "location": dataset.location, "labels": dict(dataset.labels) if dataset.labels else {}, "created": dataset.created.isoformat() if dataset.created else None, } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource("gcp://bigquery/{project_id}/dataset/{dataset_id}") def get_dataset_resource(project_id: str = None, dataset_id: str = None) -> str: """Get details for a specific BigQuery dataset""" try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() dataset_ref = client.dataset(dataset_id) dataset = client.get_dataset(dataset_ref) result = { "id": dataset.dataset_id, "full_id": dataset.full_dataset_id, "friendly_name": dataset.friendly_name, "description": dataset.description, "location": dataset.location, "labels": dict(dataset.labels) if dataset.labels else {}, "created": dataset.created.isoformat() if dataset.created else None, "modified": dataset.modified.isoformat() if dataset.modified else None, "default_table_expiration_ms": dataset.default_table_expiration_ms, "default_partition_expiration_ms": dataset.default_partition_expiration_ms, } return json.dumps(result, indent=2) except NotFound: return json.dumps({"error": f"Dataset {dataset_id} not found"}, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource("gcp://bigquery/{project_id}/dataset/{dataset_id}/tables") def list_tables_resource(project_id: str = None, dataset_id: str = None) -> str: """List all tables in a BigQuery dataset""" try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() tables = list(client.list_tables(dataset_id)) result = [] for table in tables: result.append( { "id": table.table_id, "full_id": f"{table.project}.{table.dataset_id}.{table.table_id}", "table_type": table.table_type, } ) return json.dumps(result, indent=2) except NotFound: return json.dumps({"error": f"Dataset {dataset_id} not found"}, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource( "gcp://bigquery/{project_id}/dataset/{dataset_id}/table/{table_id}" ) def get_table_resource( project_id: str = None, dataset_id: str = None, table_id: str = None ) -> str: """Get details for a specific BigQuery table""" try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() table_ref = client.dataset(dataset_id).table(table_id) table = client.get_table(table_ref) # Extract schema information schema_fields = [] for field in table.schema: schema_fields.append( { "name": field.name, "type": field.field_type, "mode": field.mode, "description": field.description, } ) result = { "id": table.table_id, "full_id": f"{table.project}.{table.dataset_id}.{table.table_id}", "friendly_name": table.friendly_name, "description": table.description, "num_rows": table.num_rows, "num_bytes": table.num_bytes, "table_type": table.table_type, "created": table.created.isoformat() if table.created else None, "modified": table.modified.isoformat() if table.modified else None, "expires": table.expires.isoformat() if table.expires else None, "schema": schema_fields, "labels": dict(table.labels) if table.labels else {}, } return json.dumps(result, indent=2) except NotFound: return json.dumps( {"error": f"Table {dataset_id}.{table_id} not found"}, indent=2 ) except Exception as e: return json.dumps({"error": str(e)}, indent=2) # Tools @mcp_instance.tool() def run_query( query: str, project_id: str = None, location: str = None, max_results: int = 100, use_legacy_sql: bool = False, timeout_ms: int = 30000, ) -> str: """ Run a BigQuery query and return the results Args: query: SQL query to execute project_id: GCP project ID (defaults to configured project) location: Optional BigQuery location (us, eu, etc.) max_results: Maximum number of rows to return use_legacy_sql: Whether to use legacy SQL syntax timeout_ms: Query timeout in milliseconds """ try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() location = location or client_instances.get_location() job_config = bigquery.QueryJobConfig( use_legacy_sql=use_legacy_sql, ) # Log info (similar to ctx.info) print(f"Running query: {query[:100]}...") query_job = client.query( query, job_config=job_config, location=location, timeout=timeout_ms / 1000.0, ) # Wait for the query to complete results = query_job.result(max_results=max_results) # Get the schema schema = [field.name for field in results.schema] # Convert rows to a list of dictionaries rows = [] for row in results: row_dict = {} for key in schema: value = row[key] if hasattr(value, "isoformat"): # Handle datetime objects row_dict[key] = value.isoformat() else: row_dict[key] = value rows.append(row_dict) # Create summary statistics stats = { "total_rows": query_job.total_rows, "total_bytes_processed": query_job.total_bytes_processed, "total_bytes_billed": query_job.total_bytes_billed, "billing_tier": query_job.billing_tier, "created": query_job.created.isoformat() if query_job.created else None, "started": query_job.started.isoformat() if query_job.started else None, "ended": query_job.ended.isoformat() if query_job.ended else None, "duration_ms": (query_job.ended - query_job.started).total_seconds() * 1000 if query_job.started and query_job.ended else None, } return json.dumps( { "status": "success", "schema": schema, "rows": rows, "returned_rows": len(rows), "stats": stats, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def create_dataset( dataset_id: str, project_id: str = None, location: str = None, description: str = "", friendly_name: str = None, labels: Optional[Dict[str, str]] = None, default_table_expiration_ms: Optional[int] = None, ) -> str: """ Create a new BigQuery dataset Args: dataset_id: ID for the new dataset project_id: GCP project ID (defaults to configured project) location: Dataset location (US, EU, asia-northeast1, etc.) description: Optional dataset description friendly_name: Optional user-friendly name labels: Optional key-value pairs for dataset labels default_table_expiration_ms: Default expiration time for tables in milliseconds """ try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() location = location or client_instances.get_location() dataset = bigquery.Dataset(f"{project_id}.{dataset_id}") dataset.location = location if description: dataset.description = description if friendly_name: dataset.friendly_name = friendly_name if labels: dataset.labels = labels if default_table_expiration_ms: dataset.default_table_expiration_ms = default_table_expiration_ms # Log info (similar to ctx.info) print(f"Creating dataset {dataset_id} in {location}...") dataset = client.create_dataset(dataset) return json.dumps( { "status": "success", "dataset_id": dataset.dataset_id, "full_dataset_id": dataset.full_dataset_id, "location": dataset.location, "created": dataset.created.isoformat() if dataset.created else None, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def create_table( dataset_id: str, table_id: str, schema_fields: List[Dict[str, Any]], project_id: str = None, description: str = "", friendly_name: str = None, expiration_ms: Optional[int] = None, labels: Optional[Dict[str, str]] = None, clustering_fields: Optional[List[str]] = None, time_partitioning_field: Optional[str] = None, time_partitioning_type: str = "DAY", ) -> str: """ Create a new BigQuery table Args: dataset_id: Dataset ID where the table will be created table_id: ID for the new table schema_fields: List of field definitions, each with name, type, mode, and description project_id: GCP project ID (defaults to configured project) description: Optional table description friendly_name: Optional user-friendly name expiration_ms: Optional table expiration time in milliseconds labels: Optional key-value pairs for table labels clustering_fields: Optional list of fields to cluster by time_partitioning_field: Optional field to use for time-based partitioning time_partitioning_type: Partitioning type (DAY, HOUR, MONTH, YEAR) Example schema_fields: [ {"name": "name", "type": "STRING", "mode": "REQUIRED", "description": "Name field"}, {"name": "age", "type": "INTEGER", "mode": "NULLABLE", "description": "Age field"} ] """ try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() # Convert schema_fields to SchemaField objects schema = [] for field in schema_fields: schema.append( bigquery.SchemaField( name=field["name"], field_type=field["type"], mode=field.get("mode", "NULLABLE"), description=field.get("description", ""), ) ) # Create table reference table_ref = client.dataset(dataset_id).table(table_id) table = bigquery.Table(table_ref, schema=schema) # Set table properties if description: table.description = description if friendly_name: table.friendly_name = friendly_name if expiration_ms: table.expires = expiration_ms if labels: table.labels = labels # Set clustering if specified if clustering_fields: table.clustering_fields = clustering_fields # Set time partitioning if specified if time_partitioning_field: if time_partitioning_type == "DAY": table.time_partitioning = bigquery.TimePartitioning( type_=bigquery.TimePartitioningType.DAY, field=time_partitioning_field, ) elif time_partitioning_type == "HOUR": table.time_partitioning = bigquery.TimePartitioning( type_=bigquery.TimePartitioningType.HOUR, field=time_partitioning_field, ) elif time_partitioning_type == "MONTH": table.time_partitioning = bigquery.TimePartitioning( type_=bigquery.TimePartitioningType.MONTH, field=time_partitioning_field, ) elif time_partitioning_type == "YEAR": table.time_partitioning = bigquery.TimePartitioning( type_=bigquery.TimePartitioningType.YEAR, field=time_partitioning_field, ) # Log info (similar to ctx.info) print(f"Creating table {dataset_id}.{table_id}...") table = client.create_table(table) return json.dumps( { "status": "success", "table_id": table.table_id, "full_table_id": f"{table.project}.{table.dataset_id}.{table.table_id}", "created": table.created.isoformat() if table.created else None, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def delete_table(dataset_id: str, table_id: str, project_id: str = None) -> str: """ Delete a BigQuery table Args: dataset_id: Dataset ID containing the table table_id: ID of the table to delete project_id: GCP project ID (defaults to configured project) """ try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() table_ref = client.dataset(dataset_id).table(table_id) # Log info (similar to ctx.info) print(f"Deleting table {dataset_id}.{table_id}...") client.delete_table(table_ref) return json.dumps( { "status": "success", "message": f"Table {project_id}.{dataset_id}.{table_id} successfully deleted", }, indent=2, ) except NotFound: return json.dumps( { "status": "error", "message": f"Table {project_id}.{dataset_id}.{table_id} not found", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def load_table_from_json( dataset_id: str, table_id: str, json_data: List[Dict[str, Any]], project_id: str = None, schema_fields: Optional[List[Dict[str, Any]]] = None, write_disposition: str = "WRITE_APPEND", ) -> str: """ Load data into a BigQuery table from JSON data Args: dataset_id: Dataset ID containing the table table_id: ID of the table to load data into json_data: List of dictionaries representing rows to insert project_id: GCP project ID (defaults to configured project) schema_fields: Optional schema definition (if not using existing table schema) write_disposition: How to handle existing data (WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY) """ try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() table_ref = client.dataset(dataset_id).table(table_id) # Convert write_disposition to the appropriate enum if write_disposition == "WRITE_TRUNCATE": disposition = bigquery.WriteDisposition.WRITE_TRUNCATE elif write_disposition == "WRITE_APPEND": disposition = bigquery.WriteDisposition.WRITE_APPEND elif write_disposition == "WRITE_EMPTY": disposition = bigquery.WriteDisposition.WRITE_EMPTY else: return json.dumps( { "status": "error", "message": f"Invalid write_disposition: {write_disposition}. Use WRITE_TRUNCATE, WRITE_APPEND, or WRITE_EMPTY.", }, indent=2, ) # Create job config job_config = bigquery.LoadJobConfig( write_disposition=disposition, source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, ) # Set schema if provided if schema_fields: schema = [] for field in schema_fields: schema.append( bigquery.SchemaField( name=field["name"], field_type=field["type"], mode=field.get("mode", "NULLABLE"), description=field.get("description", ""), ) ) job_config.schema = schema # Convert JSON data to newline-delimited JSON (not needed but keeping the log) print(f"Loading {len(json_data)} rows into {dataset_id}.{table_id}...") # Create and run the load job load_job = client.load_table_from_json( json_data, table_ref, job_config=job_config ) # Wait for the job to complete load_job.result() # Get updated table info table = client.get_table(table_ref) return json.dumps( { "status": "success", "rows_loaded": len(json_data), "total_rows": table.num_rows, "message": f"Successfully loaded data into {project_id}.{dataset_id}.{table_id}", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def export_table_to_csv( dataset_id: str, table_id: str, destination_uri: str, project_id: str = None, print_header: bool = True, field_delimiter: str = ",", ) -> str: """ Export a BigQuery table to Cloud Storage as CSV Args: dataset_id: Dataset ID containing the table table_id: ID of the table to export destination_uri: GCS URI (gs://bucket/path) project_id: GCP project ID (defaults to configured project) print_header: Whether to include column headers field_delimiter: Delimiter character for fields """ try: # Get client from client_instances client = client_instances.get_clients().bigquery project_id = project_id or client_instances.get_project_id() table_ref = client.dataset(dataset_id).table(table_id) # Validate destination URI if not destination_uri.startswith("gs://"): return json.dumps( { "status": "error", "message": "destination_uri must start with gs://", }, indent=2, ) job_config = bigquery.ExtractJobConfig() job_config.destination_format = bigquery.DestinationFormat.CSV job_config.print_header = print_header job_config.field_delimiter = field_delimiter # Log info (similar to ctx.info) print(f"Exporting {dataset_id}.{table_id} to {destination_uri}...") # Create and run the extract job extract_job = client.extract_table( table_ref, destination_uri, job_config=job_config ) # Wait for the job to complete extract_job.result() return json.dumps( { "status": "success", "destination": destination_uri, "message": f"Successfully exported {project_id}.{dataset_id}.{table_id} to {destination_uri}", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) # Prompts @mcp_instance.prompt() def create_dataset_prompt() -> str: """Prompt for creating a new BigQuery dataset""" return """ I need to create a new BigQuery dataset. Please help me with: 1. Choosing an appropriate location for my dataset 2. Understanding dataset naming conventions 3. Best practices for dataset configuration (expiration, labels, etc.) 4. The process to create the dataset """ @mcp_instance.prompt() def query_optimization_prompt() -> str: """Prompt for BigQuery query optimization help""" return """ I have a BigQuery query that's slow or expensive to run. Please help me optimize it by: 1. Analyzing key factors that affect BigQuery performance 2. Identifying common patterns that lead to inefficient queries 3. Suggesting specific optimization techniques 4. Helping me understand how to use EXPLAIN plan analysis """ ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/cloud_run.py: -------------------------------------------------------------------------------- ```python import json from typing import Dict, Optional from google.cloud import run_v2 from google.protobuf import field_mask_pb2 from services import client_instances def register(mcp_instance): """Register all Cloud Run resources and tools with the MCP instance.""" # Resources @mcp_instance.resource("gcp://cloudrun/{project_id}/{region}/services") def list_services_resource(project_id: str = None, region: str = None) -> str: """List all Cloud Run services in a specific region""" try: # Get client from client_instances client = client_instances.get_clients().run project_id = project_id or client_instances.get_project_id() region = region or client_instances.get_location() parent = f"projects/{project_id}/locations/{region}" services = client.list_services(parent=parent) result = [] for service in services: result.append( { "name": service.name.split("/")[-1], "uid": service.uid, "generation": service.generation, "labels": dict(service.labels) if service.labels else {}, "annotations": dict(service.annotations) if service.annotations else {}, "create_time": service.create_time.isoformat() if service.create_time else None, "update_time": service.update_time.isoformat() if service.update_time else None, "uri": service.uri, } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource("gcp://run/{project_id}/{location}/service/{service_name}") def get_service_resource( service_name: str, project_id: str = None, location: str = None ) -> str: """Get details for a specific Cloud Run service""" try: # Get client from client_instances client = client_instances.get_clients().run project_id = project_id or client_instances.get_project_id() location = location or client_instances.get_location() name = f"projects/{project_id}/locations/{location}/services/{service_name}" service = client.get_service(name=name) # Extract container details containers = [] if service.template and service.template.containers: for container in service.template.containers: container_info = { "image": container.image, "command": list(container.command) if container.command else [], "args": list(container.args) if container.args else [], "env": [ {"name": env.name, "value": env.value} for env in container.env ] if container.env else [], "resources": { "limits": dict(container.resources.limits) if container.resources and container.resources.limits else {}, "cpu_idle": container.resources.cpu_idle if container.resources else None, } if container.resources else {}, "ports": [ {"name": port.name, "container_port": port.container_port} for port in container.ports ] if container.ports else [], } containers.append(container_info) # Extract scaling details scaling = None if service.template and service.template.scaling: scaling = { "min_instance_count": service.template.scaling.min_instance_count, "max_instance_count": service.template.scaling.max_instance_count, } result = { "name": service.name.split("/")[-1], "uid": service.uid, "generation": service.generation, "labels": dict(service.labels) if service.labels else {}, "annotations": dict(service.annotations) if service.annotations else {}, "create_time": service.create_time.isoformat() if service.create_time else None, "update_time": service.update_time.isoformat() if service.update_time else None, "creator": service.creator, "last_modifier": service.last_modifier, "client": service.client, "client_version": service.client_version, "ingress": service.ingress.name if service.ingress else None, "launch_stage": service.launch_stage.name if service.launch_stage else None, "traffic": [ { "type": traffic.type_.name if traffic.type_ else None, "revision": traffic.revision.split("/")[-1] if traffic.revision else None, "percent": traffic.percent, "tag": traffic.tag, } for traffic in service.traffic ] if service.traffic else [], "uri": service.uri, "template": { "containers": containers, "execution_environment": service.template.execution_environment.name if service.template and service.template.execution_environment else None, "vpc_access": { "connector": service.template.vpc_access.connector, "egress": service.template.vpc_access.egress.name if service.template.vpc_access.egress else None, } if service.template and service.template.vpc_access else None, "scaling": scaling, "timeout": f"{service.template.timeout.seconds}s {service.template.timeout.nanos}ns" if service.template and service.template.timeout else None, "service_account": service.template.service_account if service.template else None, } if service.template else {}, } return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource( "gcp://run/{project_id}/{location}/service/{service_name}/revisions" ) def list_revisions_resource( service_name: str, project_id: str = None, location: str = None ) -> str: """List revisions for a specific Cloud Run service""" try: # Get client from client_instances client = client_instances.get_clients().run project_id = project_id or client_instances.get_project_id() location = location or client_instances.get_location() parent = f"projects/{project_id}/locations/{location}" # List all revisions revisions = client.list_revisions(parent=parent) # Filter revisions for the specified service service_revisions = [] for revision in revisions: # Check if this revision belongs to the specified service if service_name in revision.name: service_revisions.append( { "name": revision.name.split("/")[-1], "uid": revision.uid, "generation": revision.generation, "service": revision.service.split("/")[-1] if revision.service else None, "create_time": revision.create_time.isoformat() if revision.create_time else None, "update_time": revision.update_time.isoformat() if revision.update_time else None, "scaling": { "min_instance_count": revision.scaling.min_instance_count if revision.scaling else None, "max_instance_count": revision.scaling.max_instance_count if revision.scaling else None, } if revision.scaling else None, "conditions": [ { "type": condition.type, "state": condition.state.name if condition.state else None, "message": condition.message, "last_transition_time": condition.last_transition_time.isoformat() if condition.last_transition_time else None, "severity": condition.severity.name if condition.severity else None, } for condition in revision.conditions ] if revision.conditions else [], } ) return json.dumps(service_revisions, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) # Tools @mcp_instance.tool() def create_service( service_name: str, image: str, project_id: str = None, location: str = None, env_vars: Optional[Dict[str, str]] = None, memory_limit: str = "512Mi", cpu_limit: str = "1", min_instances: int = 0, max_instances: int = 100, port: int = 8080, allow_unauthenticated: bool = True, service_account: str = None, vpc_connector: str = None, timeout_seconds: int = 300, ) -> str: """ Create a new Cloud Run service Args: service_name: Name for the new service image: Container image to deploy (e.g., gcr.io/project/image:tag) project_id: GCP project ID (defaults to configured project) location: GCP region (e.g., us-central1) (defaults to configured location) env_vars: Environment variables as key-value pairs memory_limit: Memory limit (e.g., 512Mi, 1Gi) cpu_limit: CPU limit (e.g., 1, 2) min_instances: Minimum number of instances max_instances: Maximum number of instances port: Container port allow_unauthenticated: Whether to allow unauthenticated access service_account: Service account email vpc_connector: VPC connector name timeout_seconds: Request timeout in seconds """ try: # Get client from client_instances client = client_instances.get_clients().run project_id = project_id or client_instances.get_project_id() location = location or client_instances.get_location() print(f"Creating Cloud Run service {service_name} in {location}...") # Create container container = run_v2.Container( image=image, resources=run_v2.ResourceRequirements( limits={ "memory": memory_limit, "cpu": cpu_limit, } ), ) # Add environment variables if provided if env_vars: container.env = [ run_v2.EnvVar(name=key, value=value) for key, value in env_vars.items() ] # Add port container.ports = [run_v2.ContainerPort(container_port=port)] # Create template template = run_v2.RevisionTemplate( containers=[container], scaling=run_v2.RevisionScaling( min_instance_count=min_instances, max_instance_count=max_instances, ), timeout=run_v2.Duration(seconds=timeout_seconds), ) # Add service account if provided if service_account: template.service_account = service_account # Add VPC connector if provided if vpc_connector: template.vpc_access = run_v2.VpcAccess( connector=vpc_connector, egress=run_v2.VpcAccess.VpcEgress.ALL_TRAFFIC, ) # Create service service = run_v2.Service( template=template, ingress=run_v2.IngressTraffic.INGRESS_TRAFFIC_ALL if allow_unauthenticated else run_v2.IngressTraffic.INGRESS_TRAFFIC_INTERNAL_ONLY, ) # Create the service parent = f"projects/{project_id}/locations/{location}" operation = client.create_service( parent=parent, service_id=service_name, service=service, ) # Wait for the operation to complete print(f"Waiting for service {service_name} to be created...") result = operation.result() return json.dumps( { "status": "success", "name": result.name.split("/")[-1], "uri": result.uri, "create_time": result.create_time.isoformat() if result.create_time else None, "update_time": result.update_time.isoformat() if result.update_time else None, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def list_services(project_id: str = None, region: str = None) -> str: """ List Cloud Run services in a specific region Args: project_id: GCP project ID (defaults to configured project) region: GCP region (e.g., us-central1) (defaults to configured location) """ try: # Get client from client_instances client = client_instances.get_clients().run project_id = project_id or client_instances.get_project_id() region = region or client_instances.get_location() parent = f"projects/{project_id}/locations/{region}" print(f"Listing Cloud Run services in {region}...") services = client.list_services(parent=parent) result = [] for service in services: result.append( { "name": service.name.split("/")[-1], "uri": service.uri, "create_time": service.create_time.isoformat() if service.create_time else None, "update_time": service.update_time.isoformat() if service.update_time else None, "labels": dict(service.labels) if service.labels else {}, } ) return json.dumps( {"status": "success", "services": result, "count": len(result)}, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def update_service( service_name: str, project_id: str = None, region: str = None, image: Optional[str] = None, memory: Optional[str] = None, cpu: Optional[str] = None, max_instances: Optional[int] = None, min_instances: Optional[int] = None, env_vars: Optional[Dict[str, str]] = None, concurrency: Optional[int] = None, timeout_seconds: Optional[int] = None, service_account: Optional[str] = None, labels: Optional[Dict[str, str]] = None, ) -> str: """ Update an existing Cloud Run service Args: service_name: Name of the service to update project_id: GCP project ID (defaults to configured project) region: GCP region (e.g., us-central1) (defaults to configured location) image: New container image URL (optional) memory: New memory allocation (optional) cpu: New CPU allocation (optional) max_instances: New maximum number of instances (optional) min_instances: New minimum number of instances (optional) env_vars: New environment variables (optional) concurrency: New maximum concurrent requests per instance (optional) timeout_seconds: New request timeout in seconds (optional) service_account: New service account email (optional) labels: New key-value labels to apply to the service (optional) """ try: # Get client from client_instances client = client_instances.get_clients().run project_id = project_id or client_instances.get_project_id() region = region or client_instances.get_location() # Get the existing service name = f"projects/{project_id}/locations/{region}/services/{service_name}" print(f"Getting current service configuration for {service_name}...") service = client.get_service(name=name) # Track which fields are being updated update_mask_fields = [] # Update the container image if specified if image and service.template and service.template.containers: service.template.containers[0].image = image update_mask_fields.append("template.containers[0].image") # Update resources if specified if (memory or cpu) and service.template and service.template.containers: if not service.template.containers[0].resources: service.template.containers[ 0 ].resources = run_v2.ResourceRequirements(limits={}) if not service.template.containers[0].resources.limits: service.template.containers[0].resources.limits = {} if memory: service.template.containers[0].resources.limits["memory"] = memory update_mask_fields.append( "template.containers[0].resources.limits.memory" ) if cpu: service.template.containers[0].resources.limits["cpu"] = cpu update_mask_fields.append( "template.containers[0].resources.limits.cpu" ) # Update scaling parameters if max_instances is not None and service.template: service.template.max_instance_count = max_instances update_mask_fields.append("template.max_instance_count") if min_instances is not None and service.template: service.template.min_instance_count = min_instances update_mask_fields.append("template.min_instance_count") # Update concurrency if concurrency is not None and service.template: service.template.max_instance_request_concurrency = concurrency update_mask_fields.append("template.max_instance_request_concurrency") # Update timeout if timeout_seconds is not None and service.template: service.template.timeout = {"seconds": timeout_seconds} update_mask_fields.append("template.timeout") # Update service account if service_account is not None and service.template: service.template.service_account = service_account update_mask_fields.append("template.service_account") # Update environment variables if ( env_vars is not None and service.template and service.template.containers ): # Create new env var list new_env_vars = [ run_v2.EnvVar(name=key, value=value) for key, value in env_vars.items() ] service.template.containers[0].env = new_env_vars update_mask_fields.append("template.containers[0].env") # Update labels if labels is not None: service.labels = labels update_mask_fields.append("labels") # Only proceed if there are fields to update if not update_mask_fields: return json.dumps( {"status": "info", "message": "No updates specified"}, indent=2 ) # Create update mask update_mask = field_mask_pb2.FieldMask(paths=update_mask_fields) # Create the request request = run_v2.UpdateServiceRequest( service=service, update_mask=update_mask ) print(f"Updating Cloud Run service {service_name}...") operation = client.update_service(request=request) print("Waiting for service update to complete...") response = operation.result() return json.dumps( { "status": "success", "name": response.name, "uri": response.uri, "updated_fields": update_mask_fields, "conditions": [ { "type": condition.type_, "state": condition.state.name if hasattr(condition.state, "name") else str(condition.state), "message": condition.message, } for condition in response.conditions ] if response.conditions else [], }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def delete_service( service_name: str, project_id: str = None, region: str = None ) -> str: """ Delete a Cloud Run service Args: service_name: Name of the service to delete project_id: GCP project ID (defaults to configured project) region: GCP region (e.g., us-central1) (defaults to configured location) """ try: # Get client from client_instances client = client_instances.get_clients().run project_id = project_id or client_instances.get_project_id() region = region or client_instances.get_location() name = f"projects/{project_id}/locations/{region}/services/{service_name}" print(f"Deleting Cloud Run service {service_name} in {region}...") operation = client.delete_service(name=name) print("Waiting for service deletion to complete...") operation.result() return json.dumps( { "status": "success", "message": f"Service {service_name} successfully deleted", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def get_service( service_name: str, project_id: str = None, region: str = None ) -> str: """ Get details of a specific Cloud Run service Args: service_name: Name of the service project_id: GCP project ID (defaults to configured project) region: GCP region (e.g., us-central1) (defaults to configured location) """ try: # Get client from client_instances client = client_instances.get_clients().run project_id = project_id or client_instances.get_project_id() region = region or client_instances.get_location() name = f"projects/{project_id}/locations/{region}/services/{service_name}" print(f"Getting details for Cloud Run service {service_name}...") service = client.get_service(name=name) # Parse container details containers = [] if service.template and service.template.containers: for container in service.template.containers: container_info = { "image": container.image, "resources": { "limits": dict(container.resources.limits) if container.resources and container.resources.limits else {} } if container.resources else {}, "env_vars": {env.name: env.value for env in container.env} if container.env else {}, } containers.append(container_info) # Parse traffic traffic_result = [] if service.traffic: for traffic in service.traffic: traffic_info = { "type": traffic.type_.name if hasattr(traffic.type_, "name") else str(traffic.type_), "revision": traffic.revision.split("/")[-1] if traffic.revision else None, "percent": traffic.percent, "tag": traffic.tag, } traffic_result.append(traffic_info) return json.dumps( { "status": "success", "name": service.name.split("/")[-1], "uri": service.uri, "containers": containers, "traffic": traffic_result, "update_time": service.update_time.isoformat() if service.update_time else None, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) # Prompts @mcp_instance.prompt() def deploy_service_prompt( service_name: str, image: str, min_instances: str = "0", max_instances: str = "100", env_vars: str = "{}", ) -> str: """Prompt to deploy a Cloud Run service with the given configuration""" return f""" I need to deploy a Cloud Run service with the following configuration: - Service name: {service_name} - Container image: {image} - Min instances: {min_instances} - Max instances: {max_instances} - Environment variables: {env_vars} Please help me set up this service and explain the deployment process and any best practices I should follow. """ @mcp_instance.prompt() def check_service_status_prompt(service_name: str) -> str: """Prompt to check the status of a deployed Cloud Run service""" return f""" I need to check the current status of my Cloud Run service named "{service_name}". Please provide me with: 1. Is the service currently running? 2. What is the URL to access it? 3. What revision is currently serving traffic? 4. Are there any issues with the service? """ @mcp_instance.prompt() def create_service_prompt(region: str = "us-central1") -> str: """Prompt for creating a new Cloud Run service""" return f""" I need to create a new Cloud Run service in {region}. Please help me with: 1. What container image should I use? 2. How much CPU and memory should I allocate? 3. Should I set min and max instances for scaling? 4. Do I need to set any environment variables? 5. Should I allow unauthenticated access? 6. What's the best way to deploy my service? """ @mcp_instance.prompt() def update_service_prompt() -> str: """Prompt for updating a Cloud Run service""" return """ I need to update my Cloud Run service. Please help me understand: 1. How to update the container image 2. How to change resource allocations 3. How to add or modify environment variables 4. How to update scaling settings 5. Best practices for zero-downtime updates """ ``` -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/cloud_monitoring.py: -------------------------------------------------------------------------------- ```python import datetime import json from typing import Dict, List, Optional from google.cloud import monitoring_v3 from google.protobuf.duration_pb2 import Duration from google.protobuf.timestamp_pb2 import Timestamp from services import client_instances def register(mcp_instance): """Register all Cloud Monitoring resources and tools with the MCP instance.""" # Resources @mcp_instance.resource("gcp://monitoring/{project_id}/metrics") def list_metrics_resource(project_id: str = None) -> str: """List all available metrics for a GCP project""" try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() name = f"projects/{project_id}" metrics = client.list_metric_descriptors(name=name) result = [] for metric in metrics: result.append( { "name": metric.name, "type": metric.type, "display_name": metric.display_name, "description": metric.description, "kind": monitoring_v3.MetricDescriptor.MetricKind.Name( metric.metric_kind ), "value_type": monitoring_v3.MetricDescriptor.ValueType.Name( metric.value_type ), "unit": metric.unit, } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource("gcp://monitoring/{project_id}/alerts") def list_alerts_resource(project_id: str = None) -> str: """List all alert policies for a GCP project""" try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() name = f"projects/{project_id}" policies = client.list_alert_policies(name=name) result = [] for policy in policies: result.append( { "name": policy.name, "display_name": policy.display_name, "enabled": policy.enabled, "conditions_count": len(policy.conditions), "creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat() if policy.creation_record and policy.creation_record.mutate_time else None, "notification_channels": [ chan.split("/")[-1] for chan in policy.notification_channels ] if policy.notification_channels else [], } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource("gcp://monitoring/{project_id}/alert/{alert_id}") def get_alert_resource(project_id: str = None, alert_id: str = None) -> str: """Get details for a specific alert policy""" try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() name = f"projects/{project_id}/alertPolicies/{alert_id}" policy = client.get_alert_policy(name=name) conditions = [] for condition in policy.conditions: condition_data = { "name": condition.name, "display_name": condition.display_name, "type": condition.condition_type_name if hasattr(condition, "condition_type_name") else None, } # Add condition-specific details if condition.HasField("condition_threshold"): threshold = condition.condition_threshold condition_data["threshold"] = { "filter": threshold.filter, "comparison": monitoring_v3.ComparisonType.Name( threshold.comparison ) if threshold.comparison else None, "threshold_value": threshold.threshold_value, "duration": f"{threshold.duration.seconds}s" if threshold.duration else None, "aggregation": { "alignment_period": f"{threshold.aggregations[0].alignment_period.seconds}s" if threshold.aggregations and threshold.aggregations[0].alignment_period else None, "per_series_aligner": monitoring_v3.Aggregation.Aligner.Name( threshold.aggregations[0].per_series_aligner ) if threshold.aggregations and threshold.aggregations[0].per_series_aligner else None, "cross_series_reducer": monitoring_v3.Aggregation.Reducer.Name( threshold.aggregations[0].cross_series_reducer ) if threshold.aggregations and threshold.aggregations[0].cross_series_reducer else None, } if threshold.aggregations and len(threshold.aggregations) > 0 else None, } conditions.append(condition_data) result = { "name": policy.name, "display_name": policy.display_name, "enabled": policy.enabled, "conditions": conditions, "combiner": monitoring_v3.AlertPolicy.ConditionCombinerType.Name( policy.combiner ) if policy.combiner else None, "notification_channels": policy.notification_channels, "creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat() if policy.creation_record and policy.creation_record.mutate_time else None, "documentation": { "content": policy.documentation.content, "mime_type": policy.documentation.mime_type, } if policy.HasField("documentation") else None, } return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) @mcp_instance.resource("gcp://monitoring/{project_id}/notification_channels") def list_notification_channels_resource(project_id: str = None) -> str: """List notification channels for a GCP project""" try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() name = f"projects/{project_id}" channels = client.list_notification_channels(name=name) result = [] for channel in channels: result.append( { "name": channel.name, "type": channel.type, "display_name": channel.display_name, "description": channel.description, "verification_status": monitoring_v3.NotificationChannel.VerificationStatus.Name( channel.verification_status ) if channel.verification_status else None, "enabled": channel.enabled, "labels": dict(channel.labels) if channel.labels else {}, } ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2) # Tools @mcp_instance.tool() def list_metrics(project_id: str = None, filter_str: str = "") -> str: """ List metrics in a GCP project with optional filtering Args: project_id: GCP project ID (defaults to configured project) filter_str: Optional filter string to narrow results (e.g., "metric.type = starts_with(\"compute.googleapis.com\")") """ try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() parent = f"projects/{project_id}" request = monitoring_v3.ListMetricDescriptorsRequest( name=parent, filter=filter_str ) print(f"Listing metrics for project {project_id}...") metrics = client.list_metric_descriptors(request=request) result = [] for metric in metrics: result.append( { "name": metric.name, "type": metric.type, "display_name": metric.display_name, "description": metric.description, "kind": monitoring_v3.MetricDescriptor.MetricKind.Name( metric.metric_kind ), "value_type": monitoring_v3.MetricDescriptor.ValueType.Name( metric.value_type ), "unit": metric.unit, } ) return json.dumps( {"status": "success", "metrics": result, "count": len(result)}, indent=2 ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def fetch_metric_timeseries( metric_type: str, project_id: str = None, filter_additions: str = "", hours: int = 1, alignment_period_seconds: int = 60, ) -> str: """ Fetch time series data for a specific metric Args: metric_type: The metric type (e.g., "compute.googleapis.com/instance/cpu/utilization") project_id: GCP project ID (defaults to configured project) filter_additions: Additional filter criteria (e.g., "resource.labels.instance_id = \"my-instance\"") hours: Number of hours of data to fetch (default: 1) alignment_period_seconds: Data point alignment period in seconds (default: 60) """ try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() # Build the filter filter_str = f'metric.type = "{metric_type}"' if filter_additions: filter_str += f" AND {filter_additions}" # Calculate time interval now = datetime.datetime.utcnow() seconds = int(now.timestamp()) end_time = Timestamp(seconds=seconds) start_time = Timestamp(seconds=seconds - hours * 3600) # Create interval interval = monitoring_v3.TimeInterval( end_time=end_time, start_time=start_time ) # Create aggregation aggregation = monitoring_v3.Aggregation( alignment_period=Duration(seconds=alignment_period_seconds), per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN, ) # Build request request = monitoring_v3.ListTimeSeriesRequest( name=f"projects/{project_id}", filter=filter_str, interval=interval, view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, aggregation=aggregation, ) print(f"Fetching time series data for {metric_type}...") time_series = client.list_time_series(request=request) result = [] for series in time_series: data_points = [] for point in series.points: point_time = point.interval.end_time.ToDatetime().isoformat() # Handle different value types if point.value.HasField("double_value"): value = point.value.double_value elif point.value.HasField("int64_value"): value = point.value.int64_value elif point.value.HasField("bool_value"): value = point.value.bool_value elif point.value.HasField("string_value"): value = point.value.string_value elif point.value.HasField("distribution_value"): value = ( "distribution" # Simplified, as distributions are complex ) else: value = None data_points.append({"time": point_time, "value": value}) series_data = { "metric": dict(series.metric.labels) if series.metric and series.metric.labels else {}, "resource": { "type": series.resource.type, "labels": dict(series.resource.labels) if series.resource and series.resource.labels else {}, } if series.resource else {}, "points": data_points, } result.append(series_data) return json.dumps( { "status": "success", "metric_type": metric_type, "time_range": { "start": start_time.ToDatetime().isoformat(), "end": end_time.ToDatetime().isoformat(), }, "time_series": result, "count": len(result), }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def list_alert_policies(project_id: str = None, filter_str: str = "") -> str: """ List alert policies in a GCP project with optional filtering Args: project_id: GCP project ID (defaults to configured project) filter_str: Optional filter string (e.g., "display_name = \"High CPU Alert\"") """ try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() parent = f"projects/{project_id}" request = monitoring_v3.ListAlertPoliciesRequest( name=parent, filter=filter_str ) print(f"Listing alert policies for project {project_id}...") policies = client.list_alert_policies(request=request) result = [] for policy in policies: policy_data = { "name": policy.name, "display_name": policy.display_name, "enabled": policy.enabled, "conditions_count": len(policy.conditions), "combiner": monitoring_v3.AlertPolicy.ConditionCombinerType.Name( policy.combiner ) if policy.combiner else None, "notification_channels": [ chan.split("/")[-1] for chan in policy.notification_channels ] if policy.notification_channels else [], "creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat() if policy.creation_record and policy.creation_record.mutate_time else None, } result.append(policy_data) return json.dumps( {"status": "success", "alert_policies": result, "count": len(result)}, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def create_metric_threshold_alert( display_name: str, metric_type: str, filter_str: str, threshold_value: float, project_id: str = None, comparison: str = "COMPARISON_GT", duration_seconds: int = 300, alignment_period_seconds: int = 60, aligner: str = "ALIGN_MEAN", reducer: str = "REDUCE_MEAN", notification_channels: Optional[List[str]] = None, documentation: str = "", enabled: bool = True, ) -> str: """ Create a metric threshold alert policy Args: display_name: Human-readable name for the alert metric_type: The metric to alert on (e.g., "compute.googleapis.com/instance/cpu/utilization") filter_str: Filter string to define which resources to monitor threshold_value: The threshold value to trigger the alert project_id: GCP project ID (defaults to configured project) comparison: Comparison type (COMPARISON_GT, COMPARISON_GE, COMPARISON_LT, COMPARISON_LE, COMPARISON_EQ, COMPARISON_NE) duration_seconds: Duration in seconds the condition must be met to trigger alignment_period_seconds: Period in seconds for data point alignment aligner: Per-series aligner (ALIGN_MEAN, ALIGN_MAX, ALIGN_MIN, etc.) reducer: Cross-series reducer (REDUCE_MEAN, REDUCE_MAX, REDUCE_MIN, etc.) notification_channels: List of notification channel IDs documentation: Documentation for the alert (markdown supported) enabled: Whether the alert should be enabled """ try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() # Validate and convert enums try: comparison_enum = getattr(monitoring_v3.ComparisonType, comparison) aligner_enum = getattr(monitoring_v3.Aggregation.Aligner, aligner) reducer_enum = getattr(monitoring_v3.Aggregation.Reducer, reducer) except AttributeError as e: return json.dumps( { "status": "error", "message": f"Invalid enum value: {str(e)}. Please check documentation for valid values.", }, indent=2, ) # Prepare notification channels with full paths full_notification_channels = [] if notification_channels: for channel in notification_channels: if not channel.startswith( f"projects/{project_id}/notificationChannels/" ): channel = ( f"projects/{project_id}/notificationChannels/{channel}" ) full_notification_channels.append(channel) # Create aggregation aggregation = monitoring_v3.Aggregation( alignment_period=Duration(seconds=alignment_period_seconds), per_series_aligner=aligner_enum, cross_series_reducer=reducer_enum, group_by_fields=[], ) # Create condition threshold condition_threshold = monitoring_v3.AlertPolicy.Condition.MetricThreshold( filter=f'metric.type = "{metric_type}" AND {filter_str}', comparison=comparison_enum, threshold_value=threshold_value, duration=Duration(seconds=duration_seconds), aggregations=[aggregation], ) # Create condition condition = monitoring_v3.AlertPolicy.Condition( display_name=f"Threshold condition for {display_name}", condition_threshold=condition_threshold, ) # Create alert policy alert_policy = monitoring_v3.AlertPolicy( display_name=display_name, conditions=[condition], combiner=monitoring_v3.AlertPolicy.ConditionCombinerType.OR, notification_channels=full_notification_channels, enabled=monitoring_v3.BoolValue(value=enabled), ) # Add documentation if provided if documentation: alert_policy.documentation = monitoring_v3.AlertPolicy.Documentation( content=documentation, mime_type="text/markdown" ) # Create the request request = monitoring_v3.CreateAlertPolicyRequest( name=f"projects/{project_id}", alert_policy=alert_policy ) print(f"Creating alert policy: {display_name}...") response = client.create_alert_policy(request=request) return json.dumps( { "status": "success", "name": response.name, "display_name": response.display_name, "enabled": response.enabled, "conditions_count": len(response.conditions), "notification_channels": response.notification_channels, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def update_alert_policy( alert_id: str, project_id: str = None, display_name: Optional[str] = None, notification_channels: Optional[List[str]] = None, enabled: Optional[bool] = None, documentation: Optional[str] = None, ) -> str: """ Update an existing alert policy Args: alert_id: ID of the alert to update project_id: GCP project ID (defaults to configured project) display_name: New name for the alert notification_channels: List of notification channel IDs enabled: Whether the alert should be enabled documentation: Documentation for the alert (markdown supported) """ try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() # Get the existing policy name = f"projects/{project_id}/alertPolicies/{alert_id}" policy = client.get_alert_policy(name=name) # Update fields if provided if display_name: policy.display_name = display_name if notification_channels is not None: full_notification_channels = [] for channel in notification_channels: if not channel.startswith( f"projects/{project_id}/notificationChannels/" ): channel = ( f"projects/{project_id}/notificationChannels/{channel}" ) full_notification_channels.append(channel) policy.notification_channels = full_notification_channels if enabled is not None: policy.enabled = monitoring_v3.BoolValue(value=enabled) if documentation is not None: policy.documentation = monitoring_v3.AlertPolicy.Documentation( content=documentation, mime_type="text/markdown" ) # Create update mask update_mask = [] if display_name: update_mask.append("display_name") if notification_channels is not None: update_mask.append("notification_channels") if enabled is not None: update_mask.append("enabled") if documentation is not None: update_mask.append("documentation") # Update the policy request = monitoring_v3.UpdateAlertPolicyRequest( alert_policy=policy, update_mask={"paths": update_mask} ) print(f"Updating alert policy: {policy.name}...") response = client.update_alert_policy(request=request) return json.dumps( { "status": "success", "name": response.name, "display_name": response.display_name, "enabled": response.enabled, "conditions_count": len(response.conditions), "notification_channels": [ chan.split("/")[-1] for chan in response.notification_channels ] if response.notification_channels else [], }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def delete_alert_policy(alert_id: str, project_id: str = None) -> str: """ Delete an alert policy Args: alert_id: ID of the alert to delete project_id: GCP project ID (defaults to configured project) """ try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() name = f"projects/{project_id}/alertPolicies/{alert_id}" print(f"Deleting alert policy: {alert_id}...") client.delete_alert_policy(name=name) return json.dumps( { "status": "success", "message": f"Alert policy {alert_id} successfully deleted", }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) @mcp_instance.tool() def create_notification_channel( display_name: str, channel_type: str, labels: Dict[str, str], project_id: str = None, description: str = "", enabled: bool = True, ) -> str: """ Create a notification channel Args: display_name: Human-readable name for the channel channel_type: Type of channel (email, sms, slack, pagerduty, etc.) labels: Channel-specific configuration (e.g., {"email_address": "[email protected]"}) project_id: GCP project ID (defaults to configured project) description: Optional description for the channel enabled: Whether the channel should be enabled """ try: # Get client from client_instances client = client_instances.get_clients().monitoring project_id = project_id or client_instances.get_project_id() # Create notification channel notification_channel = monitoring_v3.NotificationChannel( type=channel_type, display_name=display_name, description=description, labels=labels, enabled=monitoring_v3.BoolValue(value=enabled), ) # Create the request request = monitoring_v3.CreateNotificationChannelRequest( name=f"projects/{project_id}", notification_channel=notification_channel ) print(f"Creating notification channel: {display_name} ({channel_type})...") response = client.create_notification_channel(request=request) return json.dumps( { "status": "success", "name": response.name, "type": response.type, "display_name": response.display_name, "description": response.description, "verification_status": monitoring_v3.NotificationChannel.VerificationStatus.Name( response.verification_status ) if response.verification_status else None, "enabled": response.enabled, "labels": dict(response.labels) if response.labels else {}, }, indent=2, ) except Exception as e: return json.dumps({"status": "error", "message": str(e)}, indent=2) # Prompts @mcp_instance.prompt() def create_alert_prompt() -> str: """Prompt for creating a new alert policy""" return """ I need to create a new alert policy in Cloud Monitoring. Please help me with: 1. Selecting the appropriate metric type for my alert 2. Setting up sensible thresholds and durations 3. Understanding the different comparison types 4. Best practices for alert documentation 5. Setting up notification channels I'd like to create an alert that triggers when: """ @mcp_instance.prompt() def monitor_resources_prompt() -> str: """Prompt for guidance on monitoring GCP resources""" return """ I need to set up monitoring for my GCP resources. Please help me understand: 1. What are the most important metrics I should be monitoring for: - Compute Engine instances - Cloud SQL databases - Cloud Storage buckets - App Engine applications - Kubernetes Engine clusters 2. What are recommended thresholds for alerts on these resources? 3. How should I organize my monitoring to keep it manageable? 4. What visualization options do I have in Cloud Monitoring? """ ```