# Directory Structure ``` ├── .gitignore ├── .python-version ├── pyproject.toml ├── pytest.ini ├── README.md ├── src │ └── gcp_mcp │ ├── __init__.py │ ├── __main__.py │ ├── gcp_modules │ │ ├── __init__.py │ │ ├── auth │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── billing │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── compute │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── databases │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── deployment │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── iam │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── kubernetes │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── monitoring │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── networking │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── resource_management │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ └── storage │ │ ├── __init__.py │ │ └── tools.py │ └── server.py ├── tests │ ├── __init__.py │ ├── conftest.py │ ├── mock_gcp.py │ └── unit │ ├── __init__.py │ └── test_gcp_functions.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 3.11.8 ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` # Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # GCP MCP Application ## Claude Desktop Integration To enable GCP management capabilities in Claude Desktop, simply add the following configuration to your Claude Desktop MCP configuration: ```json { "gcp-mcp": { "command": "uvx", "args": [ "gcp-mcp" ] } } ``` That's it! No additional setup or credential manipulation is required. When you first ask Claude to interact with your GCP resources, a browser window will automatically open for you to authenticate and grant access. Once you approve the access, Claude will be able to manage your GCP resources through natural language commands. Here are some example requests you can make: Basic Operations: - "Could you list my GCP projects?" - "Show me my compute instances" - "What storage buckets do I have?" Resource Creation: - "Please create a compute instance with 2GB RAM and 10GB storage, name it MCP-engine" - "Create a new storage bucket called my-backup-bucket in us-central1" - "Set up a new VPC network named prod-network with subnet 10.0.0.0/24" Resource Management: - "Stop all compute instances in the dev project" - "Show me all instances that have been running for more than 24 hours" - "What's the current CPU usage of my instance named backend-server?" - "Create a snapshot of my database disk" Monitoring and Alerts: - "Set up an alert for when CPU usage goes above 80%" - "Show me all critical alerts from the last 24 hours" - "What's the current status of my GKE clusters?" ## Features The application provides comprehensive coverage of GCP services: ### Resource Management - Projects and quotas management - Asset inventory - IAM permissions ### Compute & Infrastructure - Compute Engine instances - Storage buckets and disks - VPC networks and firewall rules - Kubernetes Engine (GKE) clusters ### Databases & Storage - Cloud SQL instances - Firestore databases - Cloud Storage - Database backups ### Monitoring & Billing - Metrics and alerts - Billing information - Uptime monitoring - Resource usage tracking ### Coming Soon - Deployment manager and infrastructure as code ## Installation ```bash pip install gcp-mcp ``` ## License [MIT License](LICENSE) Your contributions and issues are welcome ! ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/billing/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/compute/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/databases/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/deployment/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/iam/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/kubernetes/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/monitoring/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/networking/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/resource_management/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/storage/__init__.py: -------------------------------------------------------------------------------- ```python ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/auth/__init__.py: -------------------------------------------------------------------------------- ```python # Authentication module for Google Cloud Platform ``` -------------------------------------------------------------------------------- /tests/__init__.py: -------------------------------------------------------------------------------- ```python # This file is intentionally left empty to make the directory a proper Python package. ``` -------------------------------------------------------------------------------- /tests/unit/__init__.py: -------------------------------------------------------------------------------- ```python # This file is intentionally left empty to make the directory a proper Python package. ``` -------------------------------------------------------------------------------- /src/gcp_mcp/__init__.py: -------------------------------------------------------------------------------- ```python from . import server import asyncio import os import sys def main(): server.mcp.run(transport='stdio') __all__ = ['main', 'server'] ``` -------------------------------------------------------------------------------- /src/gcp_mcp/__main__.py: -------------------------------------------------------------------------------- ```python from . import main if __name__ == "__main__": print("Starting gcp_mcp") main() else: print("Starting gcp_mcp from import") main() ``` -------------------------------------------------------------------------------- /pytest.ini: -------------------------------------------------------------------------------- ``` [pytest] testpaths = tests python_files = test_*.py python_classes = Test* python_functions = test_* markers = asyncio: mark a test as an asyncio test addopts = -v ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [project] name = "gcp-mcp" version = "0.1.0" description = "MCP interface for Google Cloud Platform" readme = "README.md" requires-python = ">=3.11.8" dependencies = [ "httpx>=0.28.0", "mcp>=1.3.0", "google-cloud-resource-manager>=1.14.0", "google-cloud-compute>=1.26.0", "google-cloud-storage>=3.1.0", "google-cloud-service-usage>=1.13.0", "google-cloud-billing>=1.16.0", "google-api-python-client>=2.163.0", "google-cloud-monitoring>=2.22.0", "google-cloud-logging>=3.9.0", "google-cloud-container>=2.35.0", "google-cloud-firestore>=2.16.0", "google-cloud-bigtable>=2.19.0", "google-cloud-spanner>=3.39.0", "google-cloud-iam>=2.17.0", "google-cloud-vpc-access>=1.4.0", "google-cloud-asset>=3.25.0", "google-auth>=2.16.0", "google-auth-oauthlib>=1.2.0", "google-auth-httplib2>=0.1.0" ] [project.entry-points.console] gcp = "gcp_mcp:main" gcp-mcp = "gcp_mcp:main" [project.scripts] gcp-mcp = "gcp_mcp:main" ``` -------------------------------------------------------------------------------- /tests/conftest.py: -------------------------------------------------------------------------------- ```python import pytest import sys from unittest.mock import MagicMock # Mock the Google Cloud libraries MOCK_MODULES = [ 'google.cloud.resourcemanager_v3', 'google.cloud.service_usage', 'google.cloud.compute_v1', 'google.cloud.storage', 'google.cloud.billing.v1', 'google.auth', 'google.iam.v1', 'google.auth.exceptions', 'googleapiclient', 'googleapiclient.discovery' ] for mod_name in MOCK_MODULES: sys.modules[mod_name] = MagicMock() # Mock specific classes sys.modules['google.cloud.resourcemanager_v3'].ProjectsClient = MagicMock sys.modules['google.cloud.service_usage'].ServiceUsageClient = MagicMock sys.modules['google.cloud.compute_v1'].InstancesClient = MagicMock sys.modules['google.cloud.compute_v1'].ZonesClient = MagicMock sys.modules['google.cloud.storage'].Client = MagicMock sys.modules['google.cloud.billing.v1'].CloudBillingClient = MagicMock sys.modules['google.cloud.billing.v1'].CloudCatalogClient = MagicMock sys.modules['google.auth'].default = MagicMock sys.modules['google.iam.v1'].iam_policy_pb2 = MagicMock sys.modules['googleapiclient.discovery'].build = MagicMock ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/deployment/tools.py: -------------------------------------------------------------------------------- ```python """ Google Cloud Platform Deployment tools. """ from typing import List, Dict, Any, Optional def register_tools(mcp): """Register all deployment tools with the MCP server.""" @mcp.tool() def list_deployment_manager_deployments(project_id: str) -> str: """ List Deployment Manager deployments in a GCP project. Args: project_id: The ID of the GCP project to list deployments for Returns: List of Deployment Manager deployments in the specified GCP project """ # TODO: Implement this function return f"Not yet implemented: listing deployments for project {project_id}" @mcp.tool() def get_deployment_details(project_id: str, deployment_name: str) -> str: """ Get details of a specific Deployment Manager deployment. Args: project_id: The ID of the GCP project deployment_name: The name of the deployment to get details for Returns: Details of the specified deployment """ # TODO: Implement this function return f"Not yet implemented: getting details for deployment {deployment_name} in project {project_id}" @mcp.tool() def list_cloud_build_triggers(project_id: str) -> str: """ List Cloud Build triggers in a GCP project. Args: project_id: The ID of the GCP project to list build triggers for Returns: List of Cloud Build triggers in the specified GCP project """ # TODO: Implement this function return f"Not yet implemented: listing Cloud Build triggers for project {project_id}" ``` -------------------------------------------------------------------------------- /src/gcp_mcp/server.py: -------------------------------------------------------------------------------- ```python from typing import Any import httpx from mcp.server.fastmcp import FastMCP # Import all modules from .gcp_modules.resource_management import tools as resource_tools from .gcp_modules.iam import tools as iam_tools from .gcp_modules.compute import tools as compute_tools from .gcp_modules.storage import tools as storage_tools from .gcp_modules.billing import tools as billing_tools from .gcp_modules.networking import tools as networking_tools from .gcp_modules.kubernetes import tools as kubernetes_tools from .gcp_modules.monitoring import tools as monitoring_tools from .gcp_modules.databases import tools as databases_tools from .gcp_modules.deployment import tools as deployment_tools from .gcp_modules.auth import tools as auth_tools # Initialize FastMCP server mcp = FastMCP("gcp") # A simple test function @mcp.tool() async def say_hello(name: str) -> str: """Say hello to a person.""" return f"Hello, {name}!" # Register all module tools def register_tools(): # Register authentication tools (placed first for visibility) auth_tools.register_tools(mcp) # Register resource management tools resource_tools.register_tools(mcp) # Register IAM tools iam_tools.register_tools(mcp) # Register compute tools compute_tools.register_tools(mcp) # Register storage tools storage_tools.register_tools(mcp) # Register billing tools billing_tools.register_tools(mcp) # Register networking tools networking_tools.register_tools(mcp) # Register kubernetes tools kubernetes_tools.register_tools(mcp) # Register monitoring tools monitoring_tools.register_tools(mcp) # Register databases tools databases_tools.register_tools(mcp) # Register deployment tools deployment_tools.register_tools(mcp) # Register all tools register_tools() # if __name__ == "__main__": # mcp.run(transport='stdio') ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/billing/tools.py: -------------------------------------------------------------------------------- ```python """ Google Cloud Platform Billing tools. """ from typing import List, Dict, Any, Optional def register_tools(mcp): """Register all billing tools with the MCP server.""" @mcp.tool() def get_billing_info(project_id: str) -> str: """ Get billing information for a GCP project. Args: project_id: The ID of the GCP project to get billing information for Returns: Billing information for the specified GCP project """ try: try: from google.cloud import billing_v1 except ImportError: return "Error: The Google Cloud billing library is not installed. Please install it with 'pip install google-cloud-billing'." # Initialize the Cloud Billing client billing_client = billing_v1.CloudBillingClient() # Get the billing account for the project project_name = f"projects/{project_id}" billing_info = billing_client.get_project_billing_info(name=project_name) # If billing is enabled, get more details about the billing account if billing_info.billing_account_name: billing_account = billing_client.get_billing_account( name=billing_info.billing_account_name ) # Initialize the Cloud Catalog client to get pricing information catalog_client = billing_v1.CloudCatalogClient() # Format the response return f""" Billing Information for GCP Project {project_id}: Billing Enabled: {billing_info.billing_enabled} Billing Account: {billing_info.billing_account_name} Display Name: {billing_account.display_name} Open: {billing_account.open} """ else: return f"Billing is not enabled for project {project_id}." except Exception as e: return f"Error getting billing information: {str(e)}" ``` -------------------------------------------------------------------------------- /tests/mock_gcp.py: -------------------------------------------------------------------------------- ```python """ Mock functions for testing GCP functionality. """ def list_gcp_projects(): """Mock function to list GCP projects.""" return ["test-project-1", "test-project-2", "test-project-3"] def get_gcp_project_details(project_id): """Mock function to get details of a GCP project.""" return f""" Project ID: {project_id} Name: Test Project Created: 2023-01-01T00:00:00Z Status: ACTIVE Labels: - env: test - department: engineering """ def list_gcp_services(project_id): """Mock function to list enabled services in a GCP project.""" return f""" Enabled services in project {project_id}: - compute.googleapis.com: Compute Engine API - storage.googleapis.com: Cloud Storage API - iam.googleapis.com: Identity and Access Management (IAM) API """ def list_compute_instances(project_id, zone=None): """Mock function to list Compute Engine instances.""" zone_str = f" in zone {zone}" if zone else "" return f""" Compute Engine instances in project {project_id}{zone_str}: - instance-1 (n1-standard-1): RUNNING Zone: us-central1-a Created: 2023-01-01 00:00:00 UTC - instance-2 (n1-standard-2): STOPPED Zone: us-central1-a Created: 2023-02-01 00:00:00 UTC """ def check_iam_permissions(project_id): """Mock function to check IAM permissions in a GCP project.""" return f""" IAM permissions in project {project_id}: - roles/viewer: [email protected] - roles/editor: [email protected] """ def list_storage_buckets(project_id): """Mock function to list Cloud Storage buckets in a GCP project.""" return f""" Cloud Storage buckets in project {project_id}: - test-bucket-1 Location: us-central1 Storage class: STANDARD Created: 2023-01-01 00:00:00 UTC - test-bucket-2 Location: us-east1 Storage class: NEARLINE Created: 2023-02-01 00:00:00 UTC """ def get_billing_info(project_id): """Mock function to get billing information for a GCP project.""" return f""" Billing information for project {project_id}: - Billing account: 123456-ABCDEF-123456 - Billing account name: My Billing Account - Billing account status: Open - Billing enabled: Yes - Currency: USD """ ``` -------------------------------------------------------------------------------- /tests/unit/test_gcp_functions.py: -------------------------------------------------------------------------------- ```python import pytest from unittest.mock import patch, MagicMock import sys import os # Add the parent directory to the Python path so we can import the mock modules sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) # Import the functions to test from the mock modules from tests.mock_gcp import ( list_gcp_projects, get_gcp_project_details, list_gcp_services, list_compute_instances, check_iam_permissions, list_storage_buckets, get_billing_info ) class TestGCPFunctions: """Test class for GCP-related functions.""" def test_list_gcp_projects(self): """Test the list_gcp_projects function.""" # Call the function result = list_gcp_projects() # Assertions assert isinstance(result, list) assert "test-project-1" in result assert "test-project-2" in result assert "test-project-3" in result assert len(result) == 3 def test_get_gcp_project_details(self): """Test the get_gcp_project_details function.""" # Call the function result = get_gcp_project_details("test-project-id") # Assertions assert isinstance(result, str) assert "Test Project" in result assert "2023-01-01T00:00:00Z" in result assert "ACTIVE" in result assert "env: test" in result assert "department: engineering" in result def test_list_gcp_services(self): """Test the list_gcp_services function.""" # Call the function result = list_gcp_services("test-project") # Assertions assert isinstance(result, str) assert "compute.googleapis.com: Compute Engine API" in result assert "storage.googleapis.com: Cloud Storage API" in result assert "iam.googleapis.com: Identity and Access Management (IAM) API" in result def test_list_compute_instances_with_zone(self): """Test the list_compute_instances function with a specified zone.""" # Call the function with a specified zone result = list_compute_instances("test-project", "us-central1-a") # Assertions assert isinstance(result, str) assert "instance-1" in result assert "instance-2" in result assert "n1-standard-1" in result assert "n1-standard-2" in result assert "RUNNING" in result assert "STOPPED" in result assert "us-central1-a" in result def test_check_iam_permissions(self): """Test the check_iam_permissions function.""" # Call the function result = check_iam_permissions("test-project") # Assertions assert isinstance(result, str) assert "roles/viewer" in result assert "roles/editor" in result assert "[email protected]" in result def test_list_storage_buckets(self): """Test the list_storage_buckets function.""" # Call the function result = list_storage_buckets("test-project") # Assertions assert isinstance(result, str) assert "test-bucket-1" in result assert "test-bucket-2" in result assert "us-central1" in result assert "us-east1" in result assert "STANDARD" in result assert "NEARLINE" in result assert "2023-01-01 00:00:00 UTC" in result assert "2023-02-01 00:00:00 UTC" in result def test_get_billing_info(self): """Test the get_billing_info function.""" # Call the function result = get_billing_info("test-project") # Assertions assert isinstance(result, str) assert "123456-ABCDEF-123456" in result assert "My Billing Account" in result assert "Open" in result assert "Yes" in result # billing_enabled assert "USD" in result ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/resource_management/tools.py: -------------------------------------------------------------------------------- ```python """ Google Cloud Platform Resource Management tools. """ from typing import List, Dict, Any, Optional def register_tools(mcp): """Register all resource management tools with the MCP server.""" @mcp.tool() def list_gcp_projects(): """ List all available GCP projects for the authenticated user. Returns: List of project IDs """ try: from google.cloud import resourcemanager_v3 client = resourcemanager_v3.ProjectsClient() request = resourcemanager_v3.SearchProjectsRequest() response = client.search_projects(request=request) return [project.project_id for project in response] except Exception as e: return [f"Error listing GCP projects: {str(e)}"] @mcp.tool() def get_gcp_project_details(project_id: str) -> str: """ Get detailed information about a specific GCP project. Args: project_id: The ID of the GCP project to get details for Returns: Detailed information about the specified GCP project """ try: from google.cloud import resourcemanager_v3 # Initialize the Resource Manager client client = resourcemanager_v3.ProjectsClient() # Get the project details name = f"projects/{project_id}" project = client.get_project(name=name) # Format the response project_number = project.name.split('/')[-1] if project.name else "N/A" display_name = project.display_name or "N/A" create_time = project.create_time.isoformat() if project.create_time else "N/A" state = project.state.name if project.state else "N/A" labels = dict(project.labels) if project.labels else {} labels_str = "\n".join([f" {k}: {v}" for k, v in labels.items()]) if labels else " None" return f""" GCP Project Details for {project_id}: Project Number: {project_number} Name: {display_name} Creation Time: {create_time} State: {state} Labels: {labels_str} """ except Exception as e: return f"Error getting GCP project details: {str(e)}" @mcp.tool() def list_assets(project_id: str, asset_types: Optional[List[str]] = None, page_size: int = 50) -> str: """ List assets in a GCP project using Cloud Asset Inventory API. Args: project_id: The ID of the GCP project to list assets for asset_types: Optional list of asset types to filter by (e.g., ["compute.googleapis.com/Instance"]) page_size: Number of assets to return per page (default: 50, max: 1000) Returns: List of assets in the specified GCP project """ try: try: from google.cloud import asset_v1 except ImportError: return "Error: The Google Cloud Asset Inventory library is not installed. Please install it with 'pip install google-cloud-asset'." # Initialize the Asset client client = asset_v1.AssetServiceClient() # Format the parent resource parent = f"projects/{project_id}" # Create the request request = asset_v1.ListAssetsRequest( parent=parent, content_type=asset_v1.ContentType.RESOURCE, page_size=min(page_size, 1000) # API limit is 1000 ) # Add asset types filter if provided if asset_types: request.asset_types = asset_types # List assets response = client.list_assets(request=request) # Format the response assets_list = [] for asset in response: asset_type = asset.asset_type name = asset.name display_name = asset.display_name if hasattr(asset, 'display_name') and asset.display_name else name.split('/')[-1] # Extract location if available location = "global" if hasattr(asset.resource, 'location') and asset.resource.location: location = asset.resource.location assets_list.append(f"- {display_name} ({asset_type})\n Location: {location}\n Name: {name}") if not assets_list: filter_msg = f" with types {asset_types}" if asset_types else "" return f"No assets found{filter_msg} in project {project_id}." # Add pagination info if there's a next page token pagination_info = "" if hasattr(response, 'next_page_token') and response.next_page_token: pagination_info = "\n\nMore assets are available. Refine your search or increase page_size to see more." return f"Assets in GCP Project {project_id}:\n\n" + "\n\n".join(assets_list) + pagination_info except Exception as e: return f"Error listing assets: {str(e)}" @mcp.tool() def set_quota_project(project_id: str) -> str: """ Set a quota project for Google Cloud API requests. This helps resolve the warning: "Your application has authenticated using end user credentials from Google Cloud SDK without a quota project." Args: project_id: The ID of the GCP project to use for quota attribution Returns: Confirmation message if successful, error message otherwise """ try: try: import google.auth from google.auth import exceptions import os except ImportError: return "Error: Required libraries not installed. Please install with 'pip install google-auth'." # Set the quota project in the environment variable os.environ["GOOGLE_CLOUD_QUOTA_PROJECT"] = project_id # Try to get credentials with the quota project try: # Get the current credentials credentials, project = google.auth.default() # Set the quota project on the credentials if supported if hasattr(credentials, "with_quota_project"): credentials = credentials.with_quota_project(project_id) # Save the credentials back (this depends on the credential type) # This is a best-effort approach try: if hasattr(google.auth, "_default_credentials"): google.auth._default_credentials = credentials except: pass return f"Successfully set quota project to '{project_id}'. New API requests will use this project for quota attribution." else: return f"Set environment variable GOOGLE_CLOUD_QUOTA_PROJECT={project_id}, but your credential type doesn't support quota projects directly." except exceptions.GoogleAuthError as e: return f"Error setting quota project: {str(e)}" except Exception as e: return f"Error setting quota project: {str(e)}" ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/storage/tools.py: -------------------------------------------------------------------------------- ```python """ Google Cloud Platform Storage tools. """ from typing import List, Dict, Any, Optional def register_tools(mcp): """Register all storage tools with the MCP server.""" @mcp.tool() def list_storage_buckets(project_id: str) -> str: """ List Cloud Storage buckets in a GCP project. Args: project_id: The ID of the GCP project to list buckets for Returns: List of Cloud Storage buckets in the specified GCP project """ try: from google.cloud import storage # Initialize the Storage client client = storage.Client(project=project_id) # List buckets buckets = client.list_buckets() # Format the response buckets_list = [] for bucket in buckets: location = bucket.location or "Unknown" storage_class = bucket.storage_class or "Unknown" created = bucket.time_created.strftime("%Y-%m-%d %H:%M:%S UTC") if bucket.time_created else "Unknown" buckets_list.append(f"- {bucket.name} (Location: {location}, Class: {storage_class}, Created: {created})") if not buckets_list: return f"No Cloud Storage buckets found in project {project_id}." buckets_str = "\n".join(buckets_list) return f""" Cloud Storage Buckets in GCP Project {project_id}: {buckets_str} """ except Exception as e: return f"Error listing Cloud Storage buckets: {str(e)}" @mcp.tool() def get_bucket_details(project_id: str, bucket_name: str) -> str: """ Get detailed information about a specific Cloud Storage bucket. Args: project_id: The ID of the GCP project bucket_name: The name of the bucket to get details for Returns: Detailed information about the specified Cloud Storage bucket """ try: from google.cloud import storage # Initialize the Storage client client = storage.Client(project=project_id) # Get the bucket bucket = client.get_bucket(bucket_name) # Format the response details = [] details.append(f"Name: {bucket.name}") details.append(f"Project: {project_id}") details.append(f"Location: {bucket.location or 'Unknown'}") details.append(f"Storage Class: {bucket.storage_class or 'Unknown'}") details.append(f"Created: {bucket.time_created.strftime('%Y-%m-%d %H:%M:%S UTC') if bucket.time_created else 'Unknown'}") details.append(f"Versioning Enabled: {bucket.versioning_enabled}") details.append(f"Requester Pays: {bucket.requester_pays}") details.append(f"Lifecycle Rules: {len(bucket.lifecycle_rules) if bucket.lifecycle_rules else 0} rules defined") details.append(f"Labels: {bucket.labels or 'None'}") details.append(f"CORS: {bucket.cors or 'None'}") details_str = "\n".join(details) return f""" Cloud Storage Bucket Details: {details_str} """ except Exception as e: return f"Error getting bucket details: {str(e)}" @mcp.tool() def list_objects(project_id: str, bucket_name: str, prefix: Optional[str] = None, limit: int = 100) -> str: """ List objects in a Cloud Storage bucket. Args: project_id: The ID of the GCP project bucket_name: The name of the bucket to list objects from prefix: Optional prefix to filter objects by limit: Maximum number of objects to list (default: 100) Returns: List of objects in the specified Cloud Storage bucket """ try: from google.cloud import storage # Initialize the Storage client client = storage.Client(project=project_id) # Get the bucket bucket = client.get_bucket(bucket_name) # List blobs blobs = bucket.list_blobs(prefix=prefix, max_results=limit) # Format the response objects_list = [] for blob in blobs: size_mb = blob.size / (1024 * 1024) updated = blob.updated.strftime("%Y-%m-%d %H:%M:%S UTC") if blob.updated else "Unknown" objects_list.append(f"- {blob.name} (Size: {size_mb:.2f} MB, Updated: {updated}, Content-Type: {blob.content_type})") if not objects_list: return f"No objects found in bucket {bucket_name}{' with prefix ' + prefix if prefix else ''}." objects_str = "\n".join(objects_list) return f""" Objects in Cloud Storage Bucket {bucket_name}{' with prefix ' + prefix if prefix else ''}: {objects_str} """ except Exception as e: return f"Error listing objects: {str(e)}" @mcp.tool() def upload_object(project_id: str, bucket_name: str, source_file_path: str, destination_blob_name: Optional[str] = None, content_type: Optional[str] = None) -> str: """ Upload a file to a Cloud Storage bucket. Args: project_id: The ID of the GCP project bucket_name: The name of the bucket to upload to source_file_path: The local file path to upload destination_blob_name: The name to give the file in GCS (default: filename from source) content_type: The content type of the file (default: auto-detect) Returns: Result of the upload operation """ try: import os from google.cloud import storage # Initialize the Storage client client = storage.Client(project=project_id) # Get the bucket bucket = client.get_bucket(bucket_name) # If no destination name is provided, use the source filename if not destination_blob_name: destination_blob_name = os.path.basename(source_file_path) # Create a blob object blob = bucket.blob(destination_blob_name) # Upload the file blob.upload_from_filename(source_file_path, content_type=content_type) return f""" File successfully uploaded: - Source: {source_file_path} - Destination: gs://{bucket_name}/{destination_blob_name} - Size: {blob.size / (1024 * 1024):.2f} MB - Content-Type: {blob.content_type} """ except Exception as e: return f"Error uploading file: {str(e)}" @mcp.tool() def download_object(project_id: str, bucket_name: str, source_blob_name: str, destination_file_path: str) -> str: """ Download a file from a Cloud Storage bucket. Args: project_id: The ID of the GCP project bucket_name: The name of the bucket to download from source_blob_name: The name of the file in the bucket destination_file_path: The local path to save the file to Returns: Result of the download operation """ try: from google.cloud import storage # Initialize the Storage client client = storage.Client(project=project_id) # Get the bucket bucket = client.get_bucket(bucket_name) # Get the blob blob = bucket.blob(source_blob_name) # Download the file blob.download_to_filename(destination_file_path) return f""" File successfully downloaded: - Source: gs://{bucket_name}/{source_blob_name} - Destination: {destination_file_path} - Size: {blob.size / (1024 * 1024):.2f} MB - Content-Type: {blob.content_type} """ except Exception as e: return f"Error downloading file: {str(e)}" @mcp.tool() def delete_object(project_id: str, bucket_name: str, blob_name: str) -> str: """ Delete an object from a Cloud Storage bucket. Args: project_id: The ID of the GCP project bucket_name: The name of the bucket to delete from blob_name: The name of the file to delete Returns: Result of the delete operation """ try: from google.cloud import storage # Initialize the Storage client client = storage.Client(project=project_id) # Get the bucket bucket = client.get_bucket(bucket_name) # Delete the blob blob = bucket.blob(blob_name) blob.delete() return f"Object gs://{bucket_name}/{blob_name} has been successfully deleted." except Exception as e: return f"Error deleting object: {str(e)}" ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/iam/tools.py: -------------------------------------------------------------------------------- ```python """ Google Cloud Platform IAM tools. """ from typing import List, Dict, Any, Optional def register_tools(mcp): """Register all IAM tools with the MCP server.""" @mcp.tool() def check_iam_permissions(project_id: str) -> str: """ Check IAM permissions for the current user in a GCP project. Args: project_id: The ID of the GCP project to check permissions for Returns: List of IAM permissions for the current user in the specified GCP project """ try: from google.cloud import resourcemanager_v3 from google.iam.v1 import iam_policy_pb2 # Initialize the Resource Manager client client = resourcemanager_v3.ProjectsClient() # Get the IAM policy for the project request = iam_policy_pb2.GetIamPolicyRequest( resource=f"projects/{project_id}" ) policy = client.get_iam_policy(request=request) # Get the current user import google.auth credentials, _ = google.auth.default() user = credentials.service_account_email if hasattr(credentials, 'service_account_email') else "current user" # Check which roles the user has user_bindings = [] for binding in policy.bindings: role = binding.role members = binding.members # Check if the current user is in the members list for member in members: if member == f"user:{user}" or member == "serviceAccount:{user}" or member == "allUsers" or member == "allAuthenticatedUsers": user_bindings.append(f"- {role}") break if not user_bindings: return f"No explicit IAM permissions found for {user} in project {project_id}." user_bindings_str = "\n".join(user_bindings) return f""" IAM Permissions for {user} in GCP Project {project_id}: {user_bindings_str} """ except Exception as e: return f"Error checking IAM permissions: {str(e)}" @mcp.tool() def list_roles(project_id: Optional[str] = None) -> str: """ List IAM roles (predefined or custom). Args: project_id: Optional project ID for listing custom roles. If not provided, lists predefined roles. Returns: List of IAM roles """ try: from google.cloud import iam_v1 # Initialize the IAM client client = iam_v1.IAMClient() roles_list = [] if project_id: # List custom roles for the project request = iam_v1.ListRolesRequest( parent=f"projects/{project_id}", view=iam_v1.ListRolesRequest.RoleView.FULL ) roles = client.list_roles(request=request) for role in roles: description = role.description or "No description" roles_list.append(f"- {role.name} - {description}") if not roles_list: return f"No custom IAM roles found in project {project_id}." return f""" Custom IAM Roles in GCP Project {project_id}: {chr(10).join(roles_list)} """ else: # List predefined roles request = iam_v1.ListRolesRequest( view=iam_v1.ListRolesRequest.RoleView.BASIC ) roles = client.list_roles(request=request) for role in roles: if role.name.startswith("roles/"): description = role.description or "No description" roles_list.append(f"- {role.name} - {description}") if not roles_list: return "No predefined IAM roles found." return f""" Predefined IAM Roles in GCP: {chr(10).join(roles_list[:100])} (Limited to 100 roles. To see more specific roles, narrow your search criteria.) """ except Exception as e: return f"Error listing IAM roles: {str(e)}" @mcp.tool() def get_role_permissions(role_name: str, project_id: Optional[str] = None) -> str: """ Get detailed information about an IAM role, including its permissions. Args: role_name: The name of the role (e.g., "roles/compute.admin" or "projects/my-project/roles/myCustomRole") project_id: Optional project ID for custom roles. Not needed if role_name is fully qualified. Returns: Detailed information about the IAM role """ try: from google.cloud import iam_v1 # Initialize the IAM client client = iam_v1.IAMClient() # If project_id is provided and role_name doesn't include it, create fully qualified role name if project_id and not role_name.startswith("projects/") and not role_name.startswith("roles/"): role_name = f"projects/{project_id}/roles/{role_name}" elif not role_name.startswith("projects/") and not role_name.startswith("roles/"): role_name = f"roles/{role_name}" # Get role details request = iam_v1.GetRoleRequest(name=role_name) role = client.get_role(request=request) details = [] details.append(f"Name: {role.name}") details.append(f"Title: {role.title}") details.append(f"Description: {role.description or 'No description'}") if role.included_permissions: permissions_str = "\n".join([f"- {permission}" for permission in role.included_permissions]) details.append(f"Permissions ({len(role.included_permissions)}):\n{permissions_str}") else: details.append("Permissions: None") if hasattr(role, 'stage'): details.append(f"Stage: {role.stage}") if hasattr(role, 'etag'): details.append(f"ETag: {role.etag}") return f""" IAM Role Details for {role.name}: {chr(10).join(details)} """ except Exception as e: return f"Error getting role permissions: {str(e)}" @mcp.tool() def list_service_accounts(project_id: str) -> str: """ List service accounts in a GCP project. Args: project_id: The ID of the GCP project Returns: List of service accounts in the project """ try: from google.cloud import iam_v1 # Initialize the IAM client client = iam_v1.IAMClient() # List service accounts request = iam_v1.ListServiceAccountsRequest( name=f"projects/{project_id}" ) service_accounts = client.list_service_accounts(request=request) accounts_list = [] for account in service_accounts: display_name = account.display_name or "No display name" accounts_list.append(f"- {account.email} ({display_name})") if not accounts_list: return f"No service accounts found in project {project_id}." accounts_str = "\n".join(accounts_list) return f""" Service Accounts in GCP Project {project_id}: {accounts_str} """ except Exception as e: return f"Error listing service accounts: {str(e)}" @mcp.tool() def create_service_account(project_id: str, account_id: str, display_name: str, description: Optional[str] = None) -> str: """ Create a new service account in a GCP project. Args: project_id: The ID of the GCP project account_id: The ID for the service account (must be between 6 and 30 characters) display_name: A user-friendly name for the service account description: Optional description for the service account Returns: Result of the service account creation """ try: from google.cloud import iam_v1 # Initialize the IAM client client = iam_v1.IAMClient() # Create service account request = iam_v1.CreateServiceAccountRequest( name=f"projects/{project_id}", account_id=account_id, service_account=iam_v1.ServiceAccount( display_name=display_name, description=description ) ) service_account = client.create_service_account(request=request) return f""" Service Account created successfully: - Email: {service_account.email} - Name: {service_account.name} - Display Name: {service_account.display_name} - Description: {service_account.description or 'None'} """ except Exception as e: return f"Error creating service account: {str(e)}" @mcp.tool() def add_iam_policy_binding(project_id: str, role: str, member: str) -> str: """ Add an IAM policy binding to a GCP project. Args: project_id: The ID of the GCP project role: The role to grant (e.g., "roles/compute.admin") member: The member to grant the role to (e.g., "user:[email protected]", "serviceAccount:[email protected]") Returns: Result of the policy binding operation """ try: from google.cloud import resourcemanager_v3 from google.iam.v1 import iam_policy_pb2, policy_pb2 # Initialize the Resource Manager client client = resourcemanager_v3.ProjectsClient() # Get the current IAM policy get_request = iam_policy_pb2.GetIamPolicyRequest( resource=f"projects/{project_id}" ) policy = client.get_iam_policy(request=get_request) # Check if the binding already exists binding_exists = False for binding in policy.bindings: if binding.role == role and member in binding.members: binding_exists = True break if binding_exists: return f"IAM policy binding already exists: {member} already has role {role} in project {project_id}." # Add the new binding binding = policy_pb2.Binding() binding.role = role binding.members.append(member) policy.bindings.append(binding) # Set the updated IAM policy set_request = iam_policy_pb2.SetIamPolicyRequest( resource=f"projects/{project_id}", policy=policy ) updated_policy = client.set_iam_policy(request=set_request) return f""" IAM policy binding added successfully: - Project: {project_id} - Role: {role} - Member: {member} """ except Exception as e: return f"Error adding IAM policy binding: {str(e)}" ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/databases/tools.py: -------------------------------------------------------------------------------- ```python """ Google Cloud Platform Database tools. """ from typing import List, Dict, Any, Optional def register_tools(mcp): """Register all database tools with the MCP server.""" @mcp.tool() def list_cloud_sql_instances(project_id: str) -> str: """ List Cloud SQL instances in a GCP project. Args: project_id: The ID of the GCP project to list Cloud SQL instances for Returns: List of Cloud SQL instances in the specified GCP project """ try: from googleapiclient import discovery # Initialize the Cloud SQL Admin API client service = discovery.build('sqladmin', 'v1') # List SQL instances request = service.instances().list(project=project_id) response = request.execute() # Format the response instances_list = [] if 'items' in response: for instance in response['items']: name = instance.get('name', 'Unknown') db_version = instance.get('databaseVersion', 'Unknown') state = instance.get('state', 'Unknown') region = instance.get('region', 'Unknown') tier = instance.get('settings', {}).get('tier', 'Unknown') storage_size = instance.get('settings', {}).get('dataDiskSizeGb', 'Unknown') instances_list.append(f"- {name} (Type: {db_version}, Region: {region}, Tier: {tier}, Storage: {storage_size}GB, State: {state})") if not instances_list: return f"No Cloud SQL instances found in project {project_id}." instances_str = "\n".join(instances_list) return f""" Cloud SQL Instances in GCP Project {project_id}: {instances_str} """ except Exception as e: return f"Error listing Cloud SQL instances: {str(e)}" @mcp.tool() def get_sql_instance_details(project_id: str, instance_id: str) -> str: """ Get detailed information about a specific Cloud SQL instance. Args: project_id: The ID of the GCP project instance_id: The ID of the Cloud SQL instance Returns: Detailed information about the specified Cloud SQL instance """ try: from googleapiclient import discovery # Initialize the Cloud SQL Admin API client service = discovery.build('sqladmin', 'v1') # Get instance details request = service.instances().get(project=project_id, instance=instance_id) instance = request.execute() # Format the response details = [] details.append(f"Name: {instance.get('name', 'Unknown')}") details.append(f"Self Link: {instance.get('selfLink', 'Unknown')}") details.append(f"Database Version: {instance.get('databaseVersion', 'Unknown')}") details.append(f"State: {instance.get('state', 'Unknown')}") details.append(f"Region: {instance.get('region', 'Unknown')}") # Settings information if 'settings' in instance: settings = instance['settings'] details.append(f"Tier: {settings.get('tier', 'Unknown')}") details.append(f"Storage Size: {settings.get('dataDiskSizeGb', 'Unknown')}GB") details.append(f"Availability Type: {settings.get('availabilityType', 'Unknown')}") # Backup configuration if 'backupConfiguration' in settings: backup = settings['backupConfiguration'] enabled = backup.get('enabled', False) details.append(f"Automated Backups: {'Enabled' if enabled else 'Disabled'}") if enabled: details.append(f"Backup Start Time: {backup.get('startTime', 'Unknown')}") details.append(f"Binary Log Enabled: {backup.get('binaryLogEnabled', False)}") # IP configuration if 'ipConfiguration' in settings: ip_config = settings['ipConfiguration'] public_ip = "Enabled" if not ip_config.get('privateNetwork') else "Disabled" details.append(f"Public IP: {public_ip}") if 'authorizedNetworks' in ip_config: networks = [] for network in ip_config['authorizedNetworks']: name = network.get('name', 'Unnamed') value = network.get('value', 'Unknown') networks.append(f" - {name}: {value}") if networks: details.append("Authorized Networks:") details.extend(networks) # IP Addresses if 'ipAddresses' in instance: ip_addresses = [] for ip in instance['ipAddresses']: ip_type = ip.get('type', 'Unknown') ip_address = ip.get('ipAddress', 'Unknown') ip_addresses.append(f" - {ip_type}: {ip_address}") if ip_addresses: details.append("IP Addresses:") details.extend(ip_addresses) details_str = "\n".join(details) return f""" Cloud SQL Instance Details: {details_str} """ except Exception as e: return f"Error getting SQL instance details: {str(e)}" @mcp.tool() def list_databases(project_id: str, instance_id: str) -> str: """ List databases in a Cloud SQL instance. Args: project_id: The ID of the GCP project instance_id: The ID of the Cloud SQL instance Returns: List of databases in the specified Cloud SQL instance """ try: from googleapiclient import discovery # Initialize the Cloud SQL Admin API client service = discovery.build('sqladmin', 'v1') # List databases request = service.databases().list(project=project_id, instance=instance_id) response = request.execute() # Format the response databases_list = [] if 'items' in response: for database in response['items']: name = database.get('name', 'Unknown') charset = database.get('charset', 'Unknown') collation = database.get('collation', 'Unknown') databases_list.append(f"- {name} (Charset: {charset}, Collation: {collation})") if not databases_list: return f"No databases found in Cloud SQL instance {instance_id}." databases_str = "\n".join(databases_list) return f""" Databases in Cloud SQL Instance {instance_id}: {databases_str} """ except Exception as e: return f"Error listing databases: {str(e)}" @mcp.tool() def create_backup(project_id: str, instance_id: str, description: Optional[str] = None) -> str: """ Create a backup for a Cloud SQL instance. Args: project_id: The ID of the GCP project instance_id: The ID of the Cloud SQL instance description: Optional description for the backup Returns: Result of the backup operation """ try: from googleapiclient import discovery # Initialize the Cloud SQL Admin API client service = discovery.build('sqladmin', 'v1') # Create backup backup_run_body = {} if description: backup_run_body['description'] = description request = service.backupRuns().insert(project=project_id, instance=instance_id, body=backup_run_body) operation = request.execute() # Get operation ID and status operation_id = operation.get('name', 'Unknown') status = operation.get('status', 'Unknown') return f""" Backup operation initiated: - Instance: {instance_id} - Project: {project_id} - Description: {description or 'None provided'} Operation ID: {operation_id} Status: {status} The backup process may take some time to complete. You can check the status of the backup using the Cloud SQL Admin API or Google Cloud Console. """ except Exception as e: return f"Error creating backup: {str(e)}" @mcp.tool() def list_firestore_databases(project_id: str) -> str: """ List Firestore databases in a GCP project. Args: project_id: The ID of the GCP project to list Firestore databases for Returns: List of Firestore databases in the specified GCP project """ try: from google.cloud import firestore_admin_v1 # Initialize the Firestore Admin client client = firestore_admin_v1.FirestoreAdminClient() # List databases parent = f"projects/{project_id}" databases = client.list_databases(parent=parent) # Format the response databases_list = [] for database in databases: name = database.name.split('/')[-1] db_type = "Firestore Native" if database.type_ == firestore_admin_v1.Database.DatabaseType.FIRESTORE_NATIVE else "Datastore Mode" location = database.location_id databases_list.append(f"- {name} (Type: {db_type}, Location: {location})") if not databases_list: return f"No Firestore databases found in project {project_id}." databases_str = "\n".join(databases_list) return f""" Firestore Databases in GCP Project {project_id}: {databases_str} """ except Exception as e: return f"Error listing Firestore databases: {str(e)}" @mcp.tool() def list_firestore_collections(project_id: str, database_id: str = "(default)") -> str: """ List collections in a Firestore database. Args: project_id: The ID of the GCP project database_id: The ID of the Firestore database (default is "(default)") Returns: List of collections in the specified Firestore database """ try: from google.cloud import firestore # Initialize the Firestore client client = firestore.Client(project=project_id, database=database_id) # List collections collections = client.collections() # Format the response collections_list = [] for collection in collections: collections_list.append(f"- {collection.id}") if not collections_list: return f"No collections found in Firestore database {database_id}." collections_str = "\n".join(collections_list) return f""" Collections in Firestore Database {database_id} (Project: {project_id}): {collections_str} """ except Exception as e: return f"Error listing Firestore collections: {str(e)}" ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/kubernetes/tools.py: -------------------------------------------------------------------------------- ```python """ Google Cloud Platform Kubernetes Engine (GKE) tools. """ from typing import List, Dict, Any, Optional def register_tools(mcp): """Register all kubernetes tools with the MCP server.""" @mcp.tool() def list_gke_clusters(project_id: str, region: str = "") -> str: """ List Google Kubernetes Engine (GKE) clusters in a GCP project. Args: project_id: The ID of the GCP project to list GKE clusters for region: Optional region to filter clusters (e.g., "us-central1") Returns: List of GKE clusters in the specified GCP project """ try: from google.cloud import container_v1 # Initialize the GKE client client = container_v1.ClusterManagerClient() clusters_list = [] if region: # List clusters in the specified region parent = f"projects/{project_id}/locations/{region}" response = client.list_clusters(parent=parent) for cluster in response.clusters: version = cluster.current_master_version node_count = sum(pool.initial_node_count for pool in cluster.node_pools) status = "Running" if cluster.status == container_v1.Cluster.Status.RUNNING else cluster.status.name clusters_list.append(f"- {cluster.name} (Region: {region}, Version: {version}, Nodes: {node_count}, Status: {status})") else: # List clusters in all regions from google.cloud import compute_v1 # Get all regions regions_client = compute_v1.RegionsClient() regions_request = compute_v1.ListRegionsRequest(project=project_id) regions = regions_client.list(request=regions_request) for region_item in regions: region_name = region_item.name parent = f"projects/{project_id}/locations/{region_name}" try: response = client.list_clusters(parent=parent) for cluster in response.clusters: version = cluster.current_master_version node_count = sum(pool.initial_node_count for pool in cluster.node_pools) status = "Running" if cluster.status == container_v1.Cluster.Status.RUNNING else cluster.status.name clusters_list.append(f"- {cluster.name} (Region: {region_name}, Version: {version}, Nodes: {node_count}, Status: {status})") except Exception: # Skip regions where we can't list clusters continue # Also check zonal clusters zones_client = compute_v1.ZonesClient() zones_request = compute_v1.ListZonesRequest(project=project_id) zones = zones_client.list(request=zones_request) for zone_item in zones: zone_name = zone_item.name parent = f"projects/{project_id}/locations/{zone_name}" try: response = client.list_clusters(parent=parent) for cluster in response.clusters: version = cluster.current_master_version node_count = sum(pool.initial_node_count for pool in cluster.node_pools) status = "Running" if cluster.status == container_v1.Cluster.Status.RUNNING else cluster.status.name clusters_list.append(f"- {cluster.name} (Zone: {zone_name}, Version: {version}, Nodes: {node_count}, Status: {status})") except Exception: # Skip zones where we can't list clusters continue if not clusters_list: region_msg = f" in region {region}" if region else "" return f"No GKE clusters found{region_msg} for project {project_id}." clusters_str = "\n".join(clusters_list) region_msg = f" in region {region}" if region else "" return f""" Google Kubernetes Engine (GKE) Clusters{region_msg} in GCP Project {project_id}: {clusters_str} """ except Exception as e: return f"Error listing GKE clusters: {str(e)}" @mcp.tool() def get_cluster_details(project_id: str, cluster_name: str, location: str) -> str: """ Get detailed information about a specific GKE cluster. Args: project_id: The ID of the GCP project cluster_name: The name of the GKE cluster location: The location (region or zone) of the cluster Returns: Detailed information about the specified GKE cluster """ try: from google.cloud import container_v1 # Initialize the GKE client client = container_v1.ClusterManagerClient() # Get cluster details cluster_path = f"projects/{project_id}/locations/{location}/clusters/{cluster_name}" cluster = client.get_cluster(name=cluster_path) # Format the response details = [] details.append(f"Name: {cluster.name}") details.append(f"Description: {cluster.description or 'None'}") details.append(f"Location: {location}") details.append(f"Location Type: {'Regional' if '-' not in location else 'Zonal'}") details.append(f"Status: {'Running' if cluster.status == container_v1.Cluster.Status.RUNNING else cluster.status.name}") details.append(f"Kubernetes Version: {cluster.current_master_version}") details.append(f"Network: {cluster.network}") details.append(f"Subnetwork: {cluster.subnetwork}") details.append(f"Cluster CIDR: {cluster.cluster_ipv4_cidr}") details.append(f"Services CIDR: {cluster.services_ipv4_cidr}") details.append(f"Endpoint: {cluster.endpoint}") # Add Node Pools information node_pools = [] for pool in cluster.node_pools: machine_type = pool.config.machine_type disk_size_gb = pool.config.disk_size_gb autoscaling = "Enabled" if pool.autoscaling and pool.autoscaling.enabled else "Disabled" min_nodes = pool.autoscaling.min_node_count if pool.autoscaling and pool.autoscaling.enabled else "N/A" max_nodes = pool.autoscaling.max_node_count if pool.autoscaling and pool.autoscaling.enabled else "N/A" initial_nodes = pool.initial_node_count node_pools.append(f" - {pool.name} (Machine: {machine_type}, Disk: {disk_size_gb}GB, Initial Nodes: {initial_nodes})") if autoscaling == "Enabled": node_pools.append(f" Autoscaling: {autoscaling} (Min: {min_nodes}, Max: {max_nodes})") if node_pools: details.append(f"Node Pools ({len(cluster.node_pools)}):\n" + "\n".join(node_pools)) # Add Addons information addons = [] if cluster.addons_config: config = cluster.addons_config addons.append(f" - HTTP Load Balancing: {'Enabled' if not config.http_load_balancing or not config.http_load_balancing.disabled else 'Disabled'}") addons.append(f" - Horizontal Pod Autoscaling: {'Enabled' if not config.horizontal_pod_autoscaling or not config.horizontal_pod_autoscaling.disabled else 'Disabled'}") addons.append(f" - Kubernetes Dashboard: {'Enabled' if not config.kubernetes_dashboard or not config.kubernetes_dashboard.disabled else 'Disabled'}") addons.append(f" - Network Policy: {'Enabled' if config.network_policy_config and not config.network_policy_config.disabled else 'Disabled'}") if addons: details.append(f"Addons:\n" + "\n".join(addons)) details_str = "\n".join(details) return f""" GKE Cluster Details: {details_str} """ except Exception as e: return f"Error getting cluster details: {str(e)}" @mcp.tool() def list_node_pools(project_id: str, cluster_name: str, location: str) -> str: """ List node pools in a GKE cluster. Args: project_id: The ID of the GCP project cluster_name: The name of the GKE cluster location: The location (region or zone) of the cluster Returns: List of node pools in the specified GKE cluster """ try: from google.cloud import container_v1 # Initialize the GKE client client = container_v1.ClusterManagerClient() # List node pools cluster_path = f"projects/{project_id}/locations/{location}/clusters/{cluster_name}" node_pools = client.list_node_pools(parent=cluster_path) # Format the response pools_list = [] for pool in node_pools.node_pools: machine_type = pool.config.machine_type disk_size_gb = pool.config.disk_size_gb autoscaling = "Enabled" if pool.autoscaling and pool.autoscaling.enabled else "Disabled" min_nodes = pool.autoscaling.min_node_count if pool.autoscaling and pool.autoscaling.enabled else "N/A" max_nodes = pool.autoscaling.max_node_count if pool.autoscaling and pool.autoscaling.enabled else "N/A" initial_nodes = pool.initial_node_count pool_info = [ f"- {pool.name}:", f" Machine Type: {machine_type}", f" Disk Size: {disk_size_gb}GB", f" Initial Node Count: {initial_nodes}", f" Autoscaling: {autoscaling}" ] if autoscaling == "Enabled": pool_info.append(f" Min Nodes: {min_nodes}") pool_info.append(f" Max Nodes: {max_nodes}") if pool.config.labels: labels = [f"{k}: {v}" for k, v in pool.config.labels.items()] pool_info.append(f" Labels: {', '.join(labels)}") pools_list.append("\n".join(pool_info)) if not pools_list: return f"No node pools found in GKE cluster {cluster_name} in location {location}." pools_str = "\n".join(pools_list) return f""" Node Pools in GKE Cluster {cluster_name} (Location: {location}): {pools_str} """ except Exception as e: return f"Error listing node pools: {str(e)}" @mcp.tool() def resize_node_pool(project_id: str, cluster_name: str, location: str, node_pool_name: str, node_count: int) -> str: """ Resize a node pool in a GKE cluster. Args: project_id: The ID of the GCP project cluster_name: The name of the GKE cluster location: The location (region or zone) of the cluster node_pool_name: The name of the node pool to resize node_count: The new node count for the pool Returns: Result of the node pool resize operation """ try: from google.cloud import container_v1 # Initialize the GKE client client = container_v1.ClusterManagerClient() # Create the node pool path node_pool_path = f"projects/{project_id}/locations/{location}/clusters/{cluster_name}/nodePools/{node_pool_name}" # Get the current node pool node_pool = client.get_node_pool(name=node_pool_path) current_node_count = node_pool.initial_node_count # Check if autoscaling is enabled if node_pool.autoscaling and node_pool.autoscaling.enabled: return f""" Cannot resize node pool {node_pool_name} because autoscaling is enabled. To manually set the node count, you must first disable autoscaling for this node pool. Current autoscaling settings: - Min nodes: {node_pool.autoscaling.min_node_count} - Max nodes: {node_pool.autoscaling.max_node_count} """ # Resize the node pool request = container_v1.SetNodePoolSizeRequest( name=node_pool_path, node_count=node_count ) operation = client.set_node_pool_size(request=request) return f""" Node pool resize operation initiated: - Cluster: {cluster_name} - Location: {location} - Node Pool: {node_pool_name} - Current Node Count: {current_node_count} - New Node Count: {node_count} Operation ID: {operation.name} Status: {operation.status.name if hasattr(operation.status, 'name') else operation.status} """ except Exception as e: return f"Error resizing node pool: {str(e)}" ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/monitoring/tools.py: -------------------------------------------------------------------------------- ```python """ Google Cloud Platform Monitoring tools. """ from typing import List, Dict, Any, Optional def register_tools(mcp): """Register all monitoring tools with the MCP server.""" @mcp.tool() def list_monitoring_metrics(project_id: str, filter_str: str = "") -> str: """ List available monitoring metrics for a GCP project. Args: project_id: The ID of the GCP project to list metrics for filter_str: Optional filter string to narrow down the metrics Returns: List of available monitoring metrics in the specified GCP project """ try: try: from google.cloud import monitoring_v3 except ImportError: return "Error: The Google Cloud monitoring library is not installed. Please install it with 'pip install google-cloud-monitoring'." # Initialize the Monitoring client client = monitoring_v3.MetricServiceClient() # Format the project name project_name = f"projects/{project_id}" # Create the request object with the filter request = monitoring_v3.ListMetricDescriptorsRequest( name=project_name ) # Add filter if provided if filter_str: request.filter = filter_str # List metric descriptors with optional filter descriptors = client.list_metric_descriptors(request=request) # Format the response metrics_list = [] for descriptor in descriptors: metric_type = descriptor.type display_name = descriptor.display_name or metric_type.split('/')[-1] description = descriptor.description or "No description" metrics_list.append(f"- {display_name}: {metric_type}\n {description}") if not metrics_list: filter_msg = f" with filter '{filter_str}'" if filter_str else "" return f"No metrics found{filter_msg} for project {project_id}." # Limit to 50 metrics to avoid overwhelming response if len(metrics_list) > 50: metrics_str = "\n".join(metrics_list[:50]) return f"Found {len(metrics_list)} metrics for project {project_id}. Showing first 50:\n\n{metrics_str}\n\nUse a filter to narrow down results." else: metrics_str = "\n".join(metrics_list) return f"Found {len(metrics_list)} metrics for project {project_id}:\n\n{metrics_str}" except Exception as e: return f"Error listing monitoring metrics: {str(e)}" @mcp.tool() def get_monitoring_alerts(project_id: str) -> str: """ Get active monitoring alerts for a GCP project. Args: project_id: The ID of the GCP project to get alerts for Returns: Active alerts for the specified GCP project """ try: from google.cloud import monitoring_v3 from google.protobuf.json_format import MessageToDict # Initialize the Alert Policy Service client alert_client = monitoring_v3.AlertPolicyServiceClient() # Format the project name project_name = f"projects/{project_id}" # Create the request object request = monitoring_v3.ListAlertPoliciesRequest( name=project_name ) # List all alert policies alert_policies = alert_client.list_alert_policies(request=request) # Initialize the Metric Service client for metric data metric_client = monitoring_v3.MetricServiceClient() # Format the response active_alerts = [] for policy in alert_policies: # Check if the policy is enabled if not policy.enabled: continue # Check for active incidents filter_str = f'resource.type="alerting_policy" AND metric.type="monitoring.googleapis.com/alert_policy/incident_count" AND metric.label.policy_name="{policy.name.split("/")[-1]}"' # Create a time interval for the last hour import datetime from google.protobuf import timestamp_pb2 now = datetime.datetime.utcnow() seconds = int(now.timestamp()) end_time = timestamp_pb2.Timestamp(seconds=seconds) start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=1) seconds = int(start_time.timestamp()) start_time_proto = timestamp_pb2.Timestamp(seconds=seconds) # Create the time interval interval = monitoring_v3.TimeInterval( start_time=start_time_proto, end_time=end_time ) # List the time series try: # Create the request object request = monitoring_v3.ListTimeSeriesRequest( name=project_name, filter=filter_str, interval=interval, view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL ) # List the time series time_series = metric_client.list_time_series(request=request) is_active = False for series in time_series: # Check if there's a non-zero value in the time series for point in series.points: if point.value.int64_value > 0: is_active = True break if is_active: break if is_active: # Format conditions conditions = [] for condition in policy.conditions: if condition.display_name: conditions.append(f" - {condition.display_name}: {condition.condition_threshold.filter}") # Add to active alerts alert_info = [ f"- {policy.display_name} (ID: {policy.name.split('/')[-1]})", f" Description: {policy.documentation.content if policy.documentation else 'No description'}", f" Severity: {policy.alert_strategy.notification_rate_limit.period.seconds}s between notifications" if policy.alert_strategy.notification_rate_limit else " No rate limiting" ] if conditions: alert_info.append(" Conditions:") alert_info.extend(conditions) active_alerts.append("\n".join(alert_info)) except Exception: # Skip if we can't check for active incidents continue if not active_alerts: return f"No active alerts found for project {project_id}." alerts_str = "\n".join(active_alerts) return f""" Active Monitoring Alerts in GCP Project {project_id}: {alerts_str} """ except Exception as e: return f"Error getting monitoring alerts: {str(e)}" @mcp.tool() def create_alert_policy(project_id: str, display_name: str, metric_type: str, filter_str: str, duration_seconds: int = 60, threshold_value: float = 0.0, comparison: str = "COMPARISON_GT", notification_channels: Optional[List[str]] = None) -> str: """ Create a new alert policy in a GCP project. Args: project_id: The ID of the GCP project display_name: The display name for the alert policy metric_type: The metric type to monitor (e.g., "compute.googleapis.com/instance/cpu/utilization") filter_str: The filter for the metric data duration_seconds: The duration in seconds over which to evaluate the condition (default: 60) threshold_value: The threshold value for the condition (default: 0.0) comparison: The comparison type (COMPARISON_GT, COMPARISON_LT, etc.) (default: COMPARISON_GT) notification_channels: Optional list of notification channel IDs Returns: Result of the alert policy creation """ try: from google.cloud import monitoring_v3 from google.protobuf import duration_pb2 # Initialize the Alert Policy Service client client = monitoring_v3.AlertPolicyServiceClient() # Format the project name project_name = f"projects/{project_id}" # Create a duration object duration = duration_pb2.Duration(seconds=duration_seconds) # Create the alert condition condition = monitoring_v3.AlertPolicy.Condition( display_name=f"Condition for {display_name}", condition_threshold=monitoring_v3.AlertPolicy.Condition.MetricThreshold( filter=filter_str, comparison=getattr(monitoring_v3.ComparisonType, comparison), threshold_value=threshold_value, duration=duration, trigger=monitoring_v3.AlertPolicy.Condition.Trigger( count=1 ), aggregations=[ monitoring_v3.Aggregation( alignment_period=duration_pb2.Duration(seconds=60), per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN, cross_series_reducer=monitoring_v3.Aggregation.Reducer.REDUCE_MEAN ) ] ) ) # Create the alert policy alert_policy = monitoring_v3.AlertPolicy( display_name=display_name, conditions=[condition], combiner=monitoring_v3.AlertPolicy.ConditionCombinerType.OR ) # Add notification channels if provided if notification_channels: alert_policy.notification_channels = [ f"projects/{project_id}/notificationChannels/{channel_id}" for channel_id in notification_channels ] # Create the policy policy = client.create_alert_policy(name=project_name, alert_policy=alert_policy) # Format response conditions_str = "\n".join([ f"- {c.display_name}: {c.condition_threshold.filter}" for c in policy.conditions ]) notifications_str = "None" if policy.notification_channels: notifications_str = "\n".join([ f"- {channel.split('/')[-1]}" for channel in policy.notification_channels ]) return f""" Alert Policy created successfully: - Name: {policy.display_name} - Policy ID: {policy.name.split('/')[-1]} - Combiner: {policy.combiner.name} Conditions: {conditions_str} Notification Channels: {notifications_str} """ except Exception as e: return f"Error creating alert policy: {str(e)}" @mcp.tool() def list_uptime_checks(project_id: str) -> str: """ List Uptime checks in a GCP project. Args: project_id: The ID of the GCP project to list Uptime checks for Returns: List of Uptime checks in the specified GCP project """ try: from google.cloud import monitoring_v3 # Initialize the Uptime Check Service client client = monitoring_v3.UptimeCheckServiceClient() # Format the project name project_name = f"projects/{project_id}" # Create the request object request = monitoring_v3.ListUptimeCheckConfigsRequest( parent=project_name ) # List uptime checks uptime_checks = client.list_uptime_check_configs(request=request) # Format the response checks_list = [] for check in uptime_checks: check_id = check.name.split('/')[-1] display_name = check.display_name period_seconds = check.period.seconds timeout_seconds = check.timeout.seconds # Get check type and details check_details = [] if check.HasField('http_check'): check_type = "HTTP" url = check.http_check.path if check.resource.HasField('monitored_resource'): host = check.monitored_resource.labels.get('host', 'Unknown') url = f"{host}{url}" elif check.http_check.HasField('host'): url = f"{check.http_check.host}{url}" check_details.append(f"URL: {url}") check_details.append(f"Port: {check.http_check.port}") elif check.HasField('tcp_check'): check_type = "TCP" if check.resource.HasField('monitored_resource'): host = check.monitored_resource.labels.get('host', 'Unknown') else: host = check.tcp_check.host check_details.append(f"Host: {host}") check_details.append(f"Port: {check.tcp_check.port}") else: check_type = "Unknown" checks_list.append(f"- {display_name} (ID: {check_id}, Type: {check_type})") checks_list.append(f" Frequency: {period_seconds}s, Timeout: {timeout_seconds}s") checks_list.extend([f" {detail}" for detail in check_details]) if not checks_list: return f"No Uptime checks found for project {project_id}." checks_str = "\n".join(checks_list) return f""" Uptime Checks in GCP Project {project_id}: {checks_str} """ except Exception as e: return f"Error listing Uptime checks: {str(e)}" ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/networking/tools.py: -------------------------------------------------------------------------------- ```python """ Google Cloud Platform Networking tools. """ from typing import List, Dict, Any, Optional def register_tools(mcp): """Register all networking tools with the MCP server.""" @mcp.tool() def list_vpc_networks(project_id: str) -> str: """ List Virtual Private Cloud (VPC) networks in a GCP project. Args: project_id: The ID of the GCP project to list VPC networks for Returns: List of VPC networks in the specified GCP project """ try: from google.cloud import compute_v1 # Initialize the Compute Engine client for networks client = compute_v1.NetworksClient() # List networks request = compute_v1.ListNetworksRequest(project=project_id) networks = client.list(request=request) # Format the response networks_list = [] for network in networks: subnet_mode = "Auto" if network.auto_create_subnetworks else "Custom" creation_time = network.creation_timestamp if network.creation_timestamp else "Unknown" # Get subnet information if available subnets = [] if not network.auto_create_subnetworks and network.subnetworks: for subnet_url in network.subnetworks: subnet_name = subnet_url.split('/')[-1] subnet_region = subnet_url.split('/')[-3] subnets.append(f" - {subnet_name} (Region: {subnet_region})") network_info = f"- {network.name} (Mode: {subnet_mode}, Created: {creation_time})" if subnets: network_info += "\n Subnets:\n" + "\n".join(subnets) networks_list.append(network_info) if not networks_list: return f"No VPC networks found in project {project_id}." networks_str = "\n".join(networks_list) return f""" VPC Networks in GCP Project {project_id}: {networks_str} """ except Exception as e: return f"Error listing VPC networks: {str(e)}" @mcp.tool() def get_vpc_details(project_id: str, network_name: str) -> str: """ Get detailed information about a specific VPC network. Args: project_id: The ID of the GCP project network_name: The name of the VPC network Returns: Detailed information about the specified VPC network """ try: from google.cloud import compute_v1 # Initialize the Compute Engine client for networks network_client = compute_v1.NetworksClient() subnet_client = compute_v1.SubnetworksClient() # Get network details network = network_client.get(project=project_id, network=network_name) # Format the response details = [] details.append(f"Name: {network.name}") details.append(f"ID: {network.id}") details.append(f"Description: {network.description or 'None'}") details.append(f"Self Link: {network.self_link}") details.append(f"Creation Time: {network.creation_timestamp}") details.append(f"Subnet Mode: {'Auto' if network.auto_create_subnetworks else 'Custom'}") details.append(f"Routing Mode: {network.routing_config.routing_mode if network.routing_config else 'Unknown'}") details.append(f"MTU: {network.mtu}") # If it's a custom subnet mode network, get all subnets if not network.auto_create_subnetworks: # List all subnets in this network request = compute_v1.ListSubnetworksRequest(project=project_id) subnets = [] for item in subnet_client.list(request=request): # Check if the subnet belongs to this network if network.name in item.network: cidr = item.ip_cidr_range region = item.region.split('/')[-1] purpose = f", Purpose: {item.purpose}" if item.purpose else "" private_ip = ", Private Google Access: Enabled" if item.private_ip_google_access else "" subnets.append(f" - {item.name} (Region: {region}, CIDR: {cidr}{purpose}{private_ip})") if subnets: details.append(f"Subnets ({len(subnets)}):\n" + "\n".join(subnets)) # List peering connections if any if network.peerings: peerings = [] for peering in network.peerings: state = peering.state network_name = peering.network.split('/')[-1] peerings.append(f" - {network_name} (State: {state})") if peerings: details.append(f"Peerings ({len(peerings)}):\n" + "\n".join(peerings)) details_str = "\n".join(details) return f""" VPC Network Details: {details_str} """ except Exception as e: return f"Error getting VPC network details: {str(e)}" @mcp.tool() def list_subnets(project_id: str, region: Optional[str] = None) -> str: """ List subnets in a GCP project, optionally filtered by region. Args: project_id: The ID of the GCP project region: Optional region to filter subnets by Returns: List of subnets in the specified GCP project """ try: from google.cloud import compute_v1 # Initialize the Compute Engine client for subnets client = compute_v1.SubnetworksClient() # List subnets request = compute_v1.ListSubnetworksRequest(project=project_id, region=region) if region else compute_v1.ListSubnetworksRequest(project=project_id) subnets = client.list(request=request) # Format the response subnets_list = [] for subnet in subnets: network_name = subnet.network.split('/')[-1] region_name = subnet.region.split('/')[-1] cidr = subnet.ip_cidr_range purpose = f", Purpose: {subnet.purpose}" if subnet.purpose else "" private_ip = ", Private Google Access: Enabled" if subnet.private_ip_google_access else "" subnet_info = f"- {subnet.name} (Network: {network_name}, Region: {region_name}, CIDR: {cidr}{purpose}{private_ip})" subnets_list.append(subnet_info) if not subnets_list: return f"No subnets found in project {project_id}{' for region ' + region if region else ''}." subnets_str = "\n".join(subnets_list) return f""" Subnets in GCP Project {project_id}{' for region ' + region if region else ''}: {subnets_str} """ except Exception as e: return f"Error listing subnets: {str(e)}" @mcp.tool() def create_firewall_rule(project_id: str, name: str, network: str, direction: str, priority: int, source_ranges: Optional[List[str]] = None, destination_ranges: Optional[List[str]] = None, allowed_protocols: Optional[List[Dict[str, Any]]] = None, denied_protocols: Optional[List[Dict[str, Any]]] = None, target_tags: Optional[List[str]] = None, source_tags: Optional[List[str]] = None, description: Optional[str] = None) -> str: """ Create a firewall rule in a GCP project. Args: project_id: The ID of the GCP project name: The name of the firewall rule network: The name of the network to create the firewall rule for direction: The direction of traffic to match ('INGRESS' or 'EGRESS') priority: The priority of the rule (lower number = higher priority, 0-65535) source_ranges: Optional list of source IP ranges (for INGRESS) destination_ranges: Optional list of destination IP ranges (for EGRESS) allowed_protocols: Optional list of allowed protocols, e.g. [{"IPProtocol": "tcp", "ports": ["80", "443"]}] denied_protocols: Optional list of denied protocols, e.g. [{"IPProtocol": "tcp", "ports": ["22"]}] target_tags: Optional list of target instance tags source_tags: Optional list of source instance tags (for INGRESS) description: Optional description for the firewall rule Returns: Result of the firewall rule creation """ try: from google.cloud import compute_v1 # Initialize the Compute Engine client for firewall client = compute_v1.FirewallsClient() # Create the firewall resource firewall = compute_v1.Firewall() firewall.name = name firewall.network = f"projects/{project_id}/global/networks/{network}" firewall.direction = direction firewall.priority = priority if description: firewall.description = description # Set source/destination ranges based on direction if direction == "INGRESS" and source_ranges: firewall.source_ranges = source_ranges elif direction == "EGRESS" and destination_ranges: firewall.destination_ranges = destination_ranges # Set allowed protocols if allowed_protocols: firewall.allowed = [] for protocol in allowed_protocols: allowed = compute_v1.Allowed() allowed.I_p_protocol = protocol["IPProtocol"] if "ports" in protocol: allowed.ports = protocol["ports"] firewall.allowed.append(allowed) # Set denied protocols if denied_protocols: firewall.denied = [] for protocol in denied_protocols: denied = compute_v1.Denied() denied.I_p_protocol = protocol["IPProtocol"] if "ports" in protocol: denied.ports = protocol["ports"] firewall.denied.append(denied) # Set target tags if target_tags: firewall.target_tags = target_tags # Set source tags if source_tags and direction == "INGRESS": firewall.source_tags = source_tags # Create the firewall rule operation = client.insert(project=project_id, firewall_resource=firewall) return f""" Firewall rule creation initiated: - Name: {name} - Network: {network} - Direction: {direction} - Priority: {priority} - Description: {description or 'None'} - Source Ranges: {source_ranges or 'None'} - Destination Ranges: {destination_ranges or 'None'} - Allowed Protocols: {allowed_protocols or 'None'} - Denied Protocols: {denied_protocols or 'None'} - Target Tags: {target_tags or 'None'} - Source Tags: {source_tags or 'None'} Operation ID: {operation.id} Status: {operation.status} """ except Exception as e: return f"Error creating firewall rule: {str(e)}" @mcp.tool() def list_firewall_rules(project_id: str, network: Optional[str] = None) -> str: """ List firewall rules in a GCP project, optionally filtered by network. Args: project_id: The ID of the GCP project network: Optional network name to filter firewall rules by Returns: List of firewall rules in the specified GCP project """ try: from google.cloud import compute_v1 # Initialize the Compute Engine client for firewall client = compute_v1.FirewallsClient() # List firewall rules request = compute_v1.ListFirewallsRequest(project=project_id) firewalls = client.list(request=request) # Format the response firewalls_list = [] for firewall in firewalls: # If network filter is applied, skip firewalls not in that network if network and network not in firewall.network: continue # Get network name from the full URL network_name = firewall.network.split('/')[-1] # Get allowed/denied protocols allowed = [] for allow in firewall.allowed: ports = f":{','.join(allow.ports)}" if allow.ports else "" allowed.append(f"{allow.I_p_protocol}{ports}") denied = [] for deny in firewall.denied: ports = f":{','.join(deny.ports)}" if deny.ports else "" denied.append(f"{deny.I_p_protocol}{ports}") # Format sources/destinations based on direction if firewall.direction == "INGRESS": sources = firewall.source_ranges or firewall.source_tags or ["Any"] destinations = ["Any"] else: # EGRESS sources = ["Any"] destinations = firewall.destination_ranges or ["Any"] # Create the firewall rule info string rule_info = [ f"- {firewall.name}", f" Network: {network_name}", f" Direction: {firewall.direction}", f" Priority: {firewall.priority}", f" Action: {'Allow' if firewall.allowed else 'Deny'}" ] if allowed: rule_info.append(f" Allowed: {', '.join(allowed)}") if denied: rule_info.append(f" Denied: {', '.join(denied)}") rule_info.append(f" Sources: {', '.join(sources)}") rule_info.append(f" Destinations: {', '.join(destinations)}") if firewall.target_tags: rule_info.append(f" Target Tags: {', '.join(firewall.target_tags)}") firewalls_list.append("\n".join(rule_info)) if not firewalls_list: return f"No firewall rules found in project {project_id}{' for network ' + network if network else ''}." firewalls_str = "\n".join(firewalls_list) return f""" Firewall Rules in GCP Project {project_id}{' for network ' + network if network else ''}: {firewalls_str} """ except Exception as e: return f"Error listing firewall rules: {str(e)}" @mcp.tool() def list_gcp_services(project_id: str) -> str: """ List enabled services/APIs in a GCP project. Args: project_id: The ID of the GCP project to list services for Returns: List of enabled services in the specified GCP project """ try: try: from google.cloud import service_usage except ImportError: return "Error: The Google Cloud service usage library is not installed. Please install it with 'pip install google-cloud-service-usage'." # Initialize the Service Usage client client = service_usage.ServiceUsageClient() # Create the request request = service_usage.ListServicesRequest( parent=f"projects/{project_id}", filter="state:ENABLED" ) # List enabled services services = client.list_services(request=request) # Format the response services_list = [] for service in services: name = service.name.split('/')[-1] if service.name else "Unknown" title = service.config.title if service.config else "Unknown" services_list.append(f"- {name}: {title}") if not services_list: return f"No services are enabled in project {project_id}." services_str = "\n".join(services_list) return f""" Enabled Services in GCP Project {project_id}: {services_str} """ except Exception as e: return f"Error listing GCP services: {str(e)}" ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/auth/tools.py: -------------------------------------------------------------------------------- ```python """ Google Cloud Platform Authentication tools. """ import os import json from typing import Dict, Any, Optional, List import webbrowser import tempfile from pathlib import Path def register_tools(mcp): """Register all authentication tools with the MCP server.""" # Global variable to store the current project ID _current_project_id = None @mcp.tool() def auth_login(project_id: str = "") -> str: """ Authenticate with Google Cloud Platform using browser-based OAuth flow. Args: project_id: Optional project ID to set as default after login Returns: Status message indicating whether authentication was successful """ nonlocal _current_project_id try: from google.auth.transport.requests import Request from google.auth.exceptions import DefaultCredentialsError from google_auth_oauthlib.flow import InstalledAppFlow import google.auth # First, attempt to use existing credentials to see if we're already authenticated try: credentials, project = google.auth.default() # Test if credentials are valid if hasattr(credentials, 'refresh'): credentials.refresh(Request()) # If we get here, credentials are valid if project_id: # Update global project ID _current_project_id = project_id # Create a credential configuration file for the project _set_project_id_in_config(project_id) return f"Using existing credentials. Project set to {project_id}." else: return f"Using existing credentials. Current project: {project or 'Not set'}" except (DefaultCredentialsError, Exception) as e: # If we can't use existing credentials, proceed with login pass # Set up the OAuth flow print("Opening browser for authentication...") # Create a temporary client_secrets.json file for OAuth flow client_secrets = { "installed": { "client_id": "764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com", "project_id": "gcp-mcp", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_secret": "d-FL95Q19q7MQmFpd7hHD0Ty", "redirect_uris": ["http://localhost", "urn:ietf:wg:oauth:2.0:oob"] } } with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp: temp_client_secrets_path = temp.name json.dump(client_secrets, temp) try: # Create the OAuth flow flow = InstalledAppFlow.from_client_secrets_file( temp_client_secrets_path, scopes=['https://www.googleapis.com/auth/cloud-platform'] ) # Run the flow creds = flow.run_local_server(port=0) # Save the credentials as application default credentials adc_path = _get_adc_path() os.makedirs(os.path.dirname(adc_path), exist_ok=True) # Write credentials to ADC file with open(adc_path, 'w') as f: creds_data = { "client_id": creds.client_id, "client_secret": creds.client_secret, "refresh_token": creds.refresh_token, "type": "authorized_user" } json.dump(creds_data, f) # Set project if specified if project_id: _current_project_id = project_id _set_project_id_in_config(project_id) success_msg = "Authentication successful!" if project_id: success_msg += f" Default project set to {project_id}." # Test by listing accessible projects try: from google.cloud import resourcemanager_v3 # Get fresh credentials after login credentials, _ = google.auth.default() client = resourcemanager_v3.ProjectsClient(credentials=credentials) request = resourcemanager_v3.ListProjectsRequest() projects = list(client.list_projects(request=request)) project_count = len(projects) if project_count > 0: project_list = "\n".join([f"- {project.display_name} (ID: {project.project_id})" for project in projects[:5]]) if project_count > 5: project_list += f"\n... and {project_count - 5} more" success_msg += f"\n\nFound {project_count} accessible projects:\n{project_list}" except Exception as e: # Don't fail if we can't list projects pass return success_msg finally: # Clean up the temporary file try: os.unlink(temp_client_secrets_path) except: pass except Exception as e: return f"Authentication error: {str(e)}" @mcp.tool() def auth_list() -> str: """ List active Google Cloud credentials. Returns: List of active credentials and the current default account """ try: import google.auth # Check application default credentials try: credentials, project = google.auth.default() # Try to get email from credentials email = None if hasattr(credentials, 'service_account_email'): email = credentials.service_account_email elif hasattr(credentials, 'refresh_token') and credentials.refresh_token: # This is a user credential adc_path = _get_adc_path() if os.path.exists(adc_path): try: with open(adc_path, 'r') as f: data = json.load(f) if 'refresh_token' in data: # This is a user auth, but we can't get the email directly email = "User account (ADC)" except: pass credential_type = type(credentials).__name__ output = "Active Credentials:\n" if email: output += f"- {email} (Application Default Credentials, type: {credential_type})\n" else: output += f"- Application Default Credentials (type: {credential_type})\n" if project: output += f"\nCurrent Project: {project}\n" else: output += "\nNo project set in default credentials.\n" # Check for other credentials in well-known locations credentials_dir = os.path.expanduser("~/.config/gcloud/credentials") if os.path.isdir(credentials_dir): cred_files = [f for f in os.listdir(credentials_dir) if f.endswith('.json')] if cred_files: output += "\nOther available credentials:\n" for cred_file in cred_files: try: with open(os.path.join(credentials_dir, cred_file), 'r') as f: data = json.load(f) if 'client_id' in data: output += f"- User account ({cred_file})\n" elif 'private_key_id' in data: output += f"- Service account: {data.get('client_email', 'Unknown')} ({cred_file})\n" except: output += f"- Unknown credential type ({cred_file})\n" return output except Exception as e: return f"No active credentials found. Please run auth_login() to authenticate.\nError: {str(e)}" except Exception as e: return f"Error listing credentials: {str(e)}" @mcp.tool() def auth_revoke() -> str: """ Revoke Google Cloud credentials. Returns: Status message indicating whether the credentials were revoked """ try: import google.auth from google.auth.transport.requests import Request # Check if we have application default credentials try: credentials, _ = google.auth.default() # If credentials have a revoke method, use it if hasattr(credentials, 'revoke'): credentials.revoke(Request()) # Remove the application default credentials file adc_path = _get_adc_path() if os.path.exists(adc_path): os.remove(adc_path) return "Application default credentials have been revoked and removed." else: return "No application default credentials file found to remove." except Exception as e: return f"No active credentials found or failed to revoke: {str(e)}" except Exception as e: return f"Error revoking credentials: {str(e)}" @mcp.tool() def config_set_project(project_id: str) -> str: """ Set the default Google Cloud project. Args: project_id: The ID of the project to set as default Returns: Status message indicating whether the project was set """ nonlocal _current_project_id try: # Update global project ID _current_project_id = project_id # Create or update the config file _set_project_id_in_config(project_id) # Verify the project exists try: from google.cloud import resourcemanager_v3 import google.auth credentials, _ = google.auth.default() client = resourcemanager_v3.ProjectsClient(credentials=credentials) name = f"projects/{project_id}" try: project = client.get_project(name=name) return f"Default project set to {project_id} ({project.display_name})." except Exception: # Project might not exist or user might not have access return f"Default project set to {project_id}. Note: Could not verify if this project exists or if you have access to it." except Exception as e: # Don't fail if we can't verify the project return f"Default project set to {project_id}." except Exception as e: return f"Error setting project: {str(e)}" @mcp.tool() def config_list() -> str: """ List the current Google Cloud configuration. Returns: Current configuration settings """ try: # Get project ID from config project_id = _get_project_id_from_config() # Get project ID from global variable if set if _current_project_id: project_id = _current_project_id output = "Current Configuration:\n" if project_id: output += f"- Project ID: {project_id}\n" else: output += "- Project ID: Not set\n" # Check if we have active credentials try: import google.auth credentials, default_project = google.auth.default() if hasattr(credentials, 'service_account_email'): output += f"- Authenticated as: {credentials.service_account_email} (Service Account)\n" else: output += "- Authenticated as: User Account\n" if default_project and default_project != project_id: output += f"- Default Project in Credentials: {default_project}\n" except Exception: output += "- Authentication: Not authenticated or credentials not found\n" # Get additional configuration config_file = os.path.join(_get_config_path(), 'configurations', 'config_default') if os.path.exists(config_file): try: with open(config_file, 'r') as f: config_lines = f.readlines() if config_lines: output += "\nAdditional Configuration Settings:\n" for line in config_lines: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) key = key.strip() value = value.strip() # Skip project since we already displayed it if key != 'project': output += f"- {key}: {value}\n" except: pass return output except Exception as e: return f"Error listing configuration: {str(e)}" # Helper functions def _get_adc_path() -> str: """Get the path to the application default credentials file.""" # Standard ADC paths by platform if os.name == 'nt': # Windows return os.path.join(os.environ.get('APPDATA', ''), 'gcloud', 'application_default_credentials.json') else: # Linux/Mac return os.path.expanduser('~/.config/gcloud/application_default_credentials.json') def _get_config_path() -> str: """Get the path to the configuration directory.""" if os.name == 'nt': # Windows return os.path.join(os.environ.get('APPDATA', ''), 'gcloud') else: # Linux/Mac return os.path.expanduser('~/.config/gcloud') def _set_project_id_in_config(project_id: str) -> None: """Set the project ID in the configuration file.""" config_dir = _get_config_path() os.makedirs(config_dir, exist_ok=True) config_file = os.path.join(config_dir, 'configurations', 'config_default') os.makedirs(os.path.dirname(config_file), exist_ok=True) # Read existing config if it exists config_data = {} if os.path.exists(config_file): try: with open(config_file, 'r') as f: for line in f: if '=' in line: key, value = line.strip().split('=', 1) config_data[key.strip()] = value.strip() except: pass # Update project config_data['project'] = project_id # Write back config with open(config_file, 'w') as f: for key, value in config_data.items(): f.write(f"{key} = {value}\n") def _get_project_id_from_config() -> Optional[str]: """Get the project ID from the configuration file.""" config_file = os.path.join(_get_config_path(), 'configurations', 'config_default') if os.path.exists(config_file): try: with open(config_file, 'r') as f: for line in f: if line.strip().startswith('project ='): return line.split('=', 1)[1].strip() except: pass return None ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/compute/tools.py: -------------------------------------------------------------------------------- ```python """ Google Cloud Platform Compute Engine tools. """ from typing import List, Dict, Any, Optional def register_tools(mcp): """Register all compute tools with the MCP server.""" @mcp.tool() def list_compute_instances(project_id: str, zone: str = "") -> str: """ List Compute Engine instances in a GCP project. Args: project_id: The ID of the GCP project to list instances for zone: Optional zone to filter instances (e.g., "us-central1-a") Returns: List of Compute Engine instances in the specified GCP project """ try: from google.cloud import compute_v1 # Initialize the Compute Engine client client = compute_v1.InstancesClient() instances_list = [] if zone: # List instances in the specified zone request = compute_v1.ListInstancesRequest( project=project_id, zone=zone ) instances = client.list(request=request) for instance in instances: machine_type = instance.machine_type.split('/')[-1] if instance.machine_type else "Unknown" status = instance.status ext_ip = "None" int_ip = "None" # Get IP addresses if instance.network_interfaces: int_ip = instance.network_interfaces[0].network_i_p if instance.network_interfaces[0].access_configs: ext_ip = instance.network_interfaces[0].access_configs[0].nat_i_p or "None" instances_list.append(f"- {instance.name} (Zone: {zone}, Type: {machine_type}, Internal IP: {int_ip}, External IP: {ext_ip}, Status: {status})") else: # List instances in all zones zones_client = compute_v1.ZonesClient() zones_request = compute_v1.ListZonesRequest(project=project_id) zones = zones_client.list(request=zones_request) for zone_item in zones: zone_name = zone_item.name request = compute_v1.ListInstancesRequest( project=project_id, zone=zone_name ) try: instances = client.list(request=request) for instance in instances: machine_type = instance.machine_type.split('/')[-1] if instance.machine_type else "Unknown" status = instance.status ext_ip = "None" int_ip = "None" # Get IP addresses if instance.network_interfaces: int_ip = instance.network_interfaces[0].network_i_p if instance.network_interfaces[0].access_configs: ext_ip = instance.network_interfaces[0].access_configs[0].nat_i_p or "None" instances_list.append(f"- {instance.name} (Zone: {zone_name}, Type: {machine_type}, Internal IP: {int_ip}, External IP: {ext_ip}, Status: {status})") except Exception: # Skip zones where we can't list instances continue if not instances_list: zone_msg = f" in zone {zone}" if zone else "" return f"No Compute Engine instances found{zone_msg} for project {project_id}." instances_str = "\n".join(instances_list) zone_msg = f" in zone {zone}" if zone else "" return f""" Compute Engine Instances{zone_msg} in GCP Project {project_id}: {instances_str} """ except Exception as e: return f"Error listing Compute Engine instances: {str(e)}" @mcp.tool() def get_instance_details(project_id: str, zone: str, instance_name: str) -> str: """ Get detailed information about a specific Compute Engine instance. Args: project_id: The ID of the GCP project zone: The zone where the instance is located (e.g., "us-central1-a") instance_name: The name of the instance to get details for Returns: Detailed information about the specified Compute Engine instance """ try: from google.cloud import compute_v1 # Initialize the Compute Engine client client = compute_v1.InstancesClient() # Get the instance details instance = client.get(project=project_id, zone=zone, instance=instance_name) # Format machine type machine_type = instance.machine_type.split('/')[-1] if instance.machine_type else "Unknown" # Format creation timestamp creation_timestamp = instance.creation_timestamp if instance.creation_timestamp else "Unknown" # Format boot disk boot_disk = "None" if instance.disks: for disk in instance.disks: if disk.boot: boot_disk = disk.source.split('/')[-1] if disk.source else "Unknown" break # Get IP addresses network_interfaces = [] if instance.network_interfaces: for i, iface in enumerate(instance.network_interfaces): network = iface.network.split('/')[-1] if iface.network else "Unknown" subnetwork = iface.subnetwork.split('/')[-1] if iface.subnetwork else "Unknown" internal_ip = iface.network_i_p or "None" # Check for external IP external_ip = "None" if iface.access_configs: external_ip = iface.access_configs[0].nat_i_p or "None" network_interfaces.append(f" Interface {i}:\n Network: {network}\n Subnetwork: {subnetwork}\n Internal IP: {internal_ip}\n External IP: {external_ip}") networks_str = "\n".join(network_interfaces) if network_interfaces else " None" # Get attached disks disks = [] if instance.disks: for i, disk in enumerate(instance.disks): disk_name = disk.source.split('/')[-1] if disk.source else "Unknown" disk_type = "Boot" if disk.boot else "Data" auto_delete = "Yes" if disk.auto_delete else "No" mode = disk.mode if disk.mode else "Unknown" disks.append(f" Disk {i}:\n Name: {disk_name}\n Type: {disk_type}\n Mode: {mode}\n Auto-delete: {auto_delete}") disks_str = "\n".join(disks) if disks else " None" # Get labels labels = [] if instance.labels: for key, value in instance.labels.items(): labels.append(f" {key}: {value}") labels_str = "\n".join(labels) if labels else " None" # Get metadata metadata_items = [] if instance.metadata and instance.metadata.items: for item in instance.metadata.items: metadata_items.append(f" {item.key}: {item.value}") metadata_str = "\n".join(metadata_items) if metadata_items else " None" return f""" Compute Engine Instance Details for {instance_name}: Project: {project_id} Zone: {zone} Machine Type: {machine_type} Status: {instance.status} Creation Time: {creation_timestamp} CPU Platform: {instance.cpu_platform} Boot Disk: {boot_disk} Network Interfaces: {networks_str} Disks: {disks_str} Labels: {labels_str} Metadata: {metadata_str} Service Accounts: {"Yes" if instance.service_accounts else "None"} """ except Exception as e: return f"Error getting instance details: {str(e)}" @mcp.tool() def start_instance(project_id: str, zone: str, instance_name: str) -> str: """ Start a Compute Engine instance. Args: project_id: The ID of the GCP project zone: The zone where the instance is located (e.g., "us-central1-a") instance_name: The name of the instance to start Returns: Status message indicating whether the instance was started successfully """ try: from google.cloud import compute_v1 # Initialize the Compute Engine client client = compute_v1.InstancesClient() # Start the instance operation = client.start(project=project_id, zone=zone, instance=instance_name) # Wait for the operation to complete operation_client = compute_v1.ZoneOperationsClient() # This is a synchronous call that will wait until the operation is complete while operation.status != compute_v1.Operation.Status.DONE: operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1]) import time time.sleep(1) if operation.error: return f"Error starting instance {instance_name}: {operation.error.errors[0].message}" return f"Instance {instance_name} in zone {zone} started successfully." except Exception as e: return f"Error starting instance: {str(e)}" @mcp.tool() def stop_instance(project_id: str, zone: str, instance_name: str) -> str: """ Stop a Compute Engine instance. Args: project_id: The ID of the GCP project zone: The zone where the instance is located (e.g., "us-central1-a") instance_name: The name of the instance to stop Returns: Status message indicating whether the instance was stopped successfully """ try: from google.cloud import compute_v1 # Initialize the Compute Engine client client = compute_v1.InstancesClient() # Stop the instance operation = client.stop(project=project_id, zone=zone, instance=instance_name) # Wait for the operation to complete operation_client = compute_v1.ZoneOperationsClient() # This is a synchronous call that will wait until the operation is complete while operation.status != compute_v1.Operation.Status.DONE: operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1]) import time time.sleep(1) if operation.error: return f"Error stopping instance {instance_name}: {operation.error.errors[0].message}" return f"Instance {instance_name} in zone {zone} stopped successfully." except Exception as e: return f"Error stopping instance: {str(e)}" @mcp.tool() def list_machine_types(project_id: str, zone: str) -> str: """ List available machine types in a specific zone. Args: project_id: The ID of the GCP project zone: The zone to check machine types in (e.g., "us-central1-a") Returns: List of available machine types in the specified zone """ try: from google.cloud import compute_v1 # Initialize the Machine Types client client = compute_v1.MachineTypesClient() # List machine types request = compute_v1.ListMachineTypesRequest( project=project_id, zone=zone ) machine_types = client.list(request=request) # Format the response types_list = [] # Group by series series = {} for mt in machine_types: # Determine series (e.g., e2, n1, c2) name = mt.name series_name = "custom" if name.startswith("custom") else name.split("-")[0] if series_name not in series: series[series_name] = [] # Format the machine type details vcpus = mt.guest_cpus memory_gb = mt.memory_mb / 1024 # Convert MB to GB series[series_name].append(f" {name}: {vcpus} vCPUs, {memory_gb:.1f} GB RAM") # Create formatted output by series for s_name in sorted(series.keys()): types_list.append(f" {s_name} series:") types_list.extend(sorted(series[s_name])) if not types_list: return f"No machine types found in zone {zone} for project {project_id}." types_str = "\n".join(types_list) return f""" Available Machine Types in Zone {zone} for Project {project_id}: {types_str} """ except Exception as e: return f"Error listing machine types: {str(e)}" @mcp.tool() def list_disks(project_id: str, zone: str = "") -> str: """ List Compute Engine persistent disks in a GCP project. Args: project_id: The ID of the GCP project to list disks for zone: Optional zone to filter disks (e.g., "us-central1-a") Returns: List of persistent disks in the specified GCP project """ try: from google.cloud import compute_v1 # Initialize the Disks client client = compute_v1.DisksClient() disks_list = [] if zone: # List disks in the specified zone request = compute_v1.ListDisksRequest( project=project_id, zone=zone ) disks = client.list(request=request) for disk in disks: size_gb = disk.size_gb disk_type = disk.type.split('/')[-1] if disk.type else "Unknown" status = disk.status users = len(disk.users) if disk.users else 0 users_str = f"Attached to {users} instance(s)" if users > 0 else "Not attached" disks_list.append(f"- {disk.name} (Zone: {zone}, Type: {disk_type}, Size: {size_gb} GB, Status: {status}, {users_str})") else: # List disks in all zones zones_client = compute_v1.ZonesClient() zones_request = compute_v1.ListZonesRequest(project=project_id) zones = zones_client.list(request=zones_request) for zone_item in zones: zone_name = zone_item.name request = compute_v1.ListDisksRequest( project=project_id, zone=zone_name ) try: disks = client.list(request=request) for disk in disks: size_gb = disk.size_gb disk_type = disk.type.split('/')[-1] if disk.type else "Unknown" status = disk.status users = len(disk.users) if disk.users else 0 users_str = f"Attached to {users} instance(s)" if users > 0 else "Not attached" disks_list.append(f"- {disk.name} (Zone: {zone_name}, Type: {disk_type}, Size: {size_gb} GB, Status: {status}, {users_str})") except Exception: # Skip zones where we can't list disks continue if not disks_list: zone_msg = f" in zone {zone}" if zone else "" return f"No persistent disks found{zone_msg} for project {project_id}." disks_str = "\n".join(disks_list) zone_msg = f" in zone {zone}" if zone else "" return f""" Persistent Disks{zone_msg} in GCP Project {project_id}: {disks_str} """ except Exception as e: return f"Error listing persistent disks: {str(e)}" @mcp.tool() def create_instance(project_id: str, zone: str, instance_name: str, machine_type: str, source_image: str, boot_disk_size_gb: int = 10, network: str = "default", subnet: str = "", external_ip: bool = True) -> str: """ Create a new Compute Engine instance. Args: project_id: The ID of the GCP project zone: The zone to create the instance in (e.g., "us-central1-a") instance_name: The name for the new instance machine_type: The machine type (e.g., "e2-medium") source_image: The source image for the boot disk (e.g., "projects/debian-cloud/global/images/family/debian-11") boot_disk_size_gb: The size of the boot disk in GB (default: 10) network: The network to connect to (default: "default") subnet: The subnetwork to connect to (optional) external_ip: Whether to allocate an external IP (default: True) Returns: Status message indicating whether the instance was created successfully """ try: from google.cloud import compute_v1 # Initialize the clients instances_client = compute_v1.InstancesClient() # Format the machine type machine_type_url = f"projects/{project_id}/zones/{zone}/machineTypes/{machine_type}" # Create the disk configuration boot_disk = compute_v1.AttachedDisk() boot_disk.boot = True initialize_params = compute_v1.AttachedDiskInitializeParams() initialize_params.source_image = source_image initialize_params.disk_size_gb = boot_disk_size_gb boot_disk.initialize_params = initialize_params boot_disk.auto_delete = True # Create the network configuration network_interface = compute_v1.NetworkInterface() if network.startswith("projects/"): network_interface.network = network else: network_interface.network = f"projects/{project_id}/global/networks/{network}" if subnet: if subnet.startswith("projects/"): network_interface.subnetwork = subnet else: network_interface.subnetwork = f"projects/{project_id}/regions/{zone.rsplit('-', 1)[0]}/subnetworks/{subnet}" if external_ip: access_config = compute_v1.AccessConfig() access_config.name = "External NAT" access_config.type_ = "ONE_TO_ONE_NAT" access_config.network_tier = "PREMIUM" network_interface.access_configs = [access_config] # Create the instance instance = compute_v1.Instance() instance.name = instance_name instance.machine_type = machine_type_url instance.disks = [boot_disk] instance.network_interfaces = [network_interface] # Create a default service account for the instance service_account = compute_v1.ServiceAccount() service_account.email = "default" service_account.scopes = ["https://www.googleapis.com/auth/cloud-platform"] instance.service_accounts = [service_account] # Create the instance operation = instances_client.insert( project=project_id, zone=zone, instance_resource=instance ) # Wait for the create operation to complete operation_client = compute_v1.ZoneOperationsClient() # This is a synchronous call that will wait until the operation is complete while operation.status != compute_v1.Operation.Status.DONE: operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1]) import time time.sleep(1) if operation.error: return f"Error creating instance {instance_name}: {operation.error.errors[0].message}" # Get the created instance to return its details created_instance = instances_client.get(project=project_id, zone=zone, instance=instance_name) # Get the instance IP addresses internal_ip = "None" external_ip = "None" if created_instance.network_interfaces: internal_ip = created_instance.network_interfaces[0].network_i_p or "None" if created_instance.network_interfaces[0].access_configs: external_ip = created_instance.network_interfaces[0].access_configs[0].nat_i_p or "None" return f""" Instance {instance_name} created successfully in zone {zone}. Details: - Machine Type: {machine_type} - Internal IP: {internal_ip} - External IP: {external_ip} - Status: {created_instance.status} """ except Exception as e: return f"Error creating instance: {str(e)}" @mcp.tool() def delete_instance(project_id: str, zone: str, instance_name: str) -> str: """ Delete a Compute Engine instance. Args: project_id: The ID of the GCP project zone: The zone where the instance is located (e.g., "us-central1-a") instance_name: The name of the instance to delete Returns: Status message indicating whether the instance was deleted successfully """ try: from google.cloud import compute_v1 # Initialize the Compute Engine client client = compute_v1.InstancesClient() # Delete the instance operation = client.delete(project=project_id, zone=zone, instance=instance_name) # Wait for the operation to complete operation_client = compute_v1.ZoneOperationsClient() # This is a synchronous call that will wait until the operation is complete while operation.status != compute_v1.Operation.Status.DONE: operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1]) import time time.sleep(1) if operation.error: return f"Error deleting instance {instance_name}: {operation.error.errors[0].message}" return f"Instance {instance_name} in zone {zone} deleted successfully." except Exception as e: return f"Error deleting instance: {str(e)}" @mcp.tool() def create_snapshot(project_id: str, zone: str, disk_name: str, snapshot_name: str, description: str = "") -> str: """ Create a snapshot of a Compute Engine disk. Args: project_id: The ID of the GCP project zone: The zone where the disk is located (e.g., "us-central1-a") disk_name: The name of the disk to snapshot snapshot_name: The name for the new snapshot description: Optional description for the snapshot Returns: Status message indicating whether the snapshot was created successfully """ try: from google.cloud import compute_v1 # Initialize the Disks client disks_client = compute_v1.DisksClient() # Create the snapshot request snapshot = compute_v1.Snapshot() snapshot.name = snapshot_name if description: snapshot.description = description # Create the snapshot operation = disks_client.create_snapshot( project=project_id, zone=zone, disk=disk_name, snapshot_resource=snapshot ) # Wait for the operation to complete operation_client = compute_v1.ZoneOperationsClient() # This is a synchronous call that will wait until the operation is complete while operation.status != compute_v1.Operation.Status.DONE: operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1]) import time time.sleep(1) if operation.error: return f"Error creating snapshot {snapshot_name}: {operation.error.errors[0].message}" return f"Snapshot {snapshot_name} of disk {disk_name} in zone {zone} created successfully." except Exception as e: return f"Error creating snapshot: {str(e)}" @mcp.tool() def list_snapshots(project_id: str) -> str: """ List disk snapshots in a GCP project. Args: project_id: The ID of the GCP project to list snapshots for Returns: List of disk snapshots in the specified GCP project """ try: from google.cloud import compute_v1 # Initialize the Snapshots client client = compute_v1.SnapshotsClient() # List snapshots request = compute_v1.ListSnapshotsRequest(project=project_id) snapshots = client.list(request=request) # Format the response snapshots_list = [] for snapshot in snapshots: size_gb = snapshot.disk_size_gb status = snapshot.status source_disk = snapshot.source_disk.split('/')[-1] if snapshot.source_disk else "Unknown" creation_time = snapshot.creation_timestamp if snapshot.creation_timestamp else "Unknown" snapshots_list.append(f"- {snapshot.name} (Source: {source_disk}, Size: {size_gb} GB, Status: {status}, Created: {creation_time})") if not snapshots_list: return f"No snapshots found for project {project_id}." snapshots_str = "\n".join(snapshots_list) return f""" Disk Snapshots in GCP Project {project_id}: {snapshots_str} """ except Exception as e: return f"Error listing snapshots: {str(e)}" ```