# Directory Structure ``` ├── .gitignore ├── .python-version ├── pyproject.toml ├── pytest.ini ├── README.md ├── src │ └── gcp_mcp │ ├── __init__.py │ ├── __main__.py │ ├── gcp_modules │ │ ├── __init__.py │ │ ├── auth │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── billing │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── compute │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── databases │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── deployment │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── iam │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── kubernetes │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── monitoring │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── networking │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ ├── resource_management │ │ │ ├── __init__.py │ │ │ └── tools.py │ │ └── storage │ │ ├── __init__.py │ │ └── tools.py │ └── server.py ├── tests │ ├── __init__.py │ ├── conftest.py │ ├── mock_gcp.py │ └── unit │ ├── __init__.py │ └── test_gcp_functions.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.python-version: -------------------------------------------------------------------------------- ``` 1 | 3.11.8 2 | ``` -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` 1 | # Python-generated files 2 | __pycache__/ 3 | *.py[oc] 4 | build/ 5 | dist/ 6 | wheels/ 7 | *.egg-info 8 | 9 | # Virtual environments 10 | .venv 11 | ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # GCP MCP Application 2 | 3 | ## Claude Desktop Integration 4 | 5 | To enable GCP management capabilities in Claude Desktop, simply add the following configuration to your Claude Desktop MCP configuration: 6 | 7 | ```json 8 | { 9 | "gcp-mcp": { 10 | "command": "uvx", 11 | "args": [ 12 | "gcp-mcp" 13 | ] 14 | } 15 | } 16 | ``` 17 | 18 | That's it! No additional setup or credential manipulation is required. When you first ask Claude to interact with your GCP resources, a browser window will automatically open for you to authenticate and grant access. Once you approve the access, Claude will be able to manage your GCP resources through natural language commands. 19 | 20 | Here are some example requests you can make: 21 | 22 | Basic Operations: 23 | - "Could you list my GCP projects?" 24 | - "Show me my compute instances" 25 | - "What storage buckets do I have?" 26 | 27 | Resource Creation: 28 | - "Please create a compute instance with 2GB RAM and 10GB storage, name it MCP-engine" 29 | - "Create a new storage bucket called my-backup-bucket in us-central1" 30 | - "Set up a new VPC network named prod-network with subnet 10.0.0.0/24" 31 | 32 | Resource Management: 33 | - "Stop all compute instances in the dev project" 34 | - "Show me all instances that have been running for more than 24 hours" 35 | - "What's the current CPU usage of my instance named backend-server?" 36 | - "Create a snapshot of my database disk" 37 | 38 | Monitoring and Alerts: 39 | - "Set up an alert for when CPU usage goes above 80%" 40 | - "Show me all critical alerts from the last 24 hours" 41 | - "What's the current status of my GKE clusters?" 42 | 43 | ## Features 44 | 45 | The application provides comprehensive coverage of GCP services: 46 | 47 | ### Resource Management 48 | - Projects and quotas management 49 | - Asset inventory 50 | - IAM permissions 51 | 52 | ### Compute & Infrastructure 53 | - Compute Engine instances 54 | - Storage buckets and disks 55 | - VPC networks and firewall rules 56 | - Kubernetes Engine (GKE) clusters 57 | 58 | ### Databases & Storage 59 | - Cloud SQL instances 60 | - Firestore databases 61 | - Cloud Storage 62 | - Database backups 63 | 64 | ### Monitoring & Billing 65 | - Metrics and alerts 66 | - Billing information 67 | - Uptime monitoring 68 | - Resource usage tracking 69 | 70 | ### Coming Soon 71 | - Deployment manager and infrastructure as code 72 | 73 | ## Installation 74 | 75 | ```bash 76 | pip install gcp-mcp 77 | ``` 78 | 79 | ## License 80 | 81 | [MIT License](LICENSE) 82 | 83 | Your contributions and issues are welcome ! 84 | ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/billing/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/compute/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/databases/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/deployment/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/iam/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/kubernetes/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/monitoring/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/networking/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/resource_management/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/storage/__init__.py: -------------------------------------------------------------------------------- ```python 1 | ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/auth/__init__.py: -------------------------------------------------------------------------------- ```python 1 | # Authentication module for Google Cloud Platform ``` -------------------------------------------------------------------------------- /tests/__init__.py: -------------------------------------------------------------------------------- ```python 1 | # This file is intentionally left empty to make the directory a proper Python package. ``` -------------------------------------------------------------------------------- /tests/unit/__init__.py: -------------------------------------------------------------------------------- ```python 1 | # This file is intentionally left empty to make the directory a proper Python package. ``` -------------------------------------------------------------------------------- /src/gcp_mcp/__init__.py: -------------------------------------------------------------------------------- ```python 1 | from . import server 2 | import asyncio 3 | import os 4 | import sys 5 | 6 | def main(): 7 | server.mcp.run(transport='stdio') 8 | 9 | __all__ = ['main', 'server'] ``` -------------------------------------------------------------------------------- /src/gcp_mcp/__main__.py: -------------------------------------------------------------------------------- ```python 1 | from . import main 2 | 3 | if __name__ == "__main__": 4 | print("Starting gcp_mcp") 5 | main() 6 | else: 7 | print("Starting gcp_mcp from import") 8 | main() ``` -------------------------------------------------------------------------------- /pytest.ini: -------------------------------------------------------------------------------- ``` 1 | [pytest] 2 | testpaths = tests 3 | python_files = test_*.py 4 | python_classes = Test* 5 | python_functions = test_* 6 | markers = 7 | asyncio: mark a test as an asyncio test 8 | addopts = -v ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml 1 | [build-system] 2 | requires = ["hatchling"] 3 | build-backend = "hatchling.build" 4 | 5 | [project] 6 | name = "gcp-mcp" 7 | version = "0.1.0" 8 | description = "MCP interface for Google Cloud Platform" 9 | readme = "README.md" 10 | requires-python = ">=3.11.8" 11 | dependencies = [ 12 | "httpx>=0.28.0", 13 | "mcp>=1.3.0", 14 | "google-cloud-resource-manager>=1.14.0", 15 | "google-cloud-compute>=1.26.0", 16 | "google-cloud-storage>=3.1.0", 17 | "google-cloud-service-usage>=1.13.0", 18 | "google-cloud-billing>=1.16.0", 19 | "google-api-python-client>=2.163.0", 20 | "google-cloud-monitoring>=2.22.0", 21 | "google-cloud-logging>=3.9.0", 22 | "google-cloud-container>=2.35.0", 23 | "google-cloud-firestore>=2.16.0", 24 | "google-cloud-bigtable>=2.19.0", 25 | "google-cloud-spanner>=3.39.0", 26 | "google-cloud-iam>=2.17.0", 27 | "google-cloud-vpc-access>=1.4.0", 28 | "google-cloud-asset>=3.25.0", 29 | "google-auth>=2.16.0", 30 | "google-auth-oauthlib>=1.2.0", 31 | "google-auth-httplib2>=0.1.0" 32 | ] 33 | 34 | [project.entry-points.console] 35 | gcp = "gcp_mcp:main" 36 | gcp-mcp = "gcp_mcp:main" 37 | 38 | [project.scripts] 39 | gcp-mcp = "gcp_mcp:main" ``` -------------------------------------------------------------------------------- /tests/conftest.py: -------------------------------------------------------------------------------- ```python 1 | import pytest 2 | import sys 3 | from unittest.mock import MagicMock 4 | 5 | # Mock the Google Cloud libraries 6 | MOCK_MODULES = [ 7 | 'google.cloud.resourcemanager_v3', 8 | 'google.cloud.service_usage', 9 | 'google.cloud.compute_v1', 10 | 'google.cloud.storage', 11 | 'google.cloud.billing.v1', 12 | 'google.auth', 13 | 'google.iam.v1', 14 | 'google.auth.exceptions', 15 | 'googleapiclient', 16 | 'googleapiclient.discovery' 17 | ] 18 | 19 | for mod_name in MOCK_MODULES: 20 | sys.modules[mod_name] = MagicMock() 21 | 22 | # Mock specific classes 23 | sys.modules['google.cloud.resourcemanager_v3'].ProjectsClient = MagicMock 24 | sys.modules['google.cloud.service_usage'].ServiceUsageClient = MagicMock 25 | sys.modules['google.cloud.compute_v1'].InstancesClient = MagicMock 26 | sys.modules['google.cloud.compute_v1'].ZonesClient = MagicMock 27 | sys.modules['google.cloud.storage'].Client = MagicMock 28 | sys.modules['google.cloud.billing.v1'].CloudBillingClient = MagicMock 29 | sys.modules['google.cloud.billing.v1'].CloudCatalogClient = MagicMock 30 | sys.modules['google.auth'].default = MagicMock 31 | sys.modules['google.iam.v1'].iam_policy_pb2 = MagicMock 32 | sys.modules['googleapiclient.discovery'].build = MagicMock ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/deployment/tools.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Google Cloud Platform Deployment tools. 3 | """ 4 | from typing import List, Dict, Any, Optional 5 | 6 | def register_tools(mcp): 7 | """Register all deployment tools with the MCP server.""" 8 | 9 | @mcp.tool() 10 | def list_deployment_manager_deployments(project_id: str) -> str: 11 | """ 12 | List Deployment Manager deployments in a GCP project. 13 | 14 | Args: 15 | project_id: The ID of the GCP project to list deployments for 16 | 17 | Returns: 18 | List of Deployment Manager deployments in the specified GCP project 19 | """ 20 | # TODO: Implement this function 21 | return f"Not yet implemented: listing deployments for project {project_id}" 22 | 23 | @mcp.tool() 24 | def get_deployment_details(project_id: str, deployment_name: str) -> str: 25 | """ 26 | Get details of a specific Deployment Manager deployment. 27 | 28 | Args: 29 | project_id: The ID of the GCP project 30 | deployment_name: The name of the deployment to get details for 31 | 32 | Returns: 33 | Details of the specified deployment 34 | """ 35 | # TODO: Implement this function 36 | return f"Not yet implemented: getting details for deployment {deployment_name} in project {project_id}" 37 | 38 | @mcp.tool() 39 | def list_cloud_build_triggers(project_id: str) -> str: 40 | """ 41 | List Cloud Build triggers in a GCP project. 42 | 43 | Args: 44 | project_id: The ID of the GCP project to list build triggers for 45 | 46 | Returns: 47 | List of Cloud Build triggers in the specified GCP project 48 | """ 49 | # TODO: Implement this function 50 | return f"Not yet implemented: listing Cloud Build triggers for project {project_id}" ``` -------------------------------------------------------------------------------- /src/gcp_mcp/server.py: -------------------------------------------------------------------------------- ```python 1 | from typing import Any 2 | import httpx 3 | from mcp.server.fastmcp import FastMCP 4 | 5 | # Import all modules 6 | from .gcp_modules.resource_management import tools as resource_tools 7 | from .gcp_modules.iam import tools as iam_tools 8 | from .gcp_modules.compute import tools as compute_tools 9 | from .gcp_modules.storage import tools as storage_tools 10 | from .gcp_modules.billing import tools as billing_tools 11 | from .gcp_modules.networking import tools as networking_tools 12 | from .gcp_modules.kubernetes import tools as kubernetes_tools 13 | from .gcp_modules.monitoring import tools as monitoring_tools 14 | from .gcp_modules.databases import tools as databases_tools 15 | from .gcp_modules.deployment import tools as deployment_tools 16 | from .gcp_modules.auth import tools as auth_tools 17 | 18 | # Initialize FastMCP server 19 | mcp = FastMCP("gcp") 20 | 21 | # A simple test function 22 | @mcp.tool() 23 | async def say_hello(name: str) -> str: 24 | """Say hello to a person.""" 25 | return f"Hello, {name}!" 26 | 27 | # Register all module tools 28 | def register_tools(): 29 | # Register authentication tools (placed first for visibility) 30 | auth_tools.register_tools(mcp) 31 | 32 | # Register resource management tools 33 | resource_tools.register_tools(mcp) 34 | 35 | # Register IAM tools 36 | iam_tools.register_tools(mcp) 37 | 38 | # Register compute tools 39 | compute_tools.register_tools(mcp) 40 | 41 | # Register storage tools 42 | storage_tools.register_tools(mcp) 43 | 44 | # Register billing tools 45 | billing_tools.register_tools(mcp) 46 | 47 | # Register networking tools 48 | networking_tools.register_tools(mcp) 49 | 50 | # Register kubernetes tools 51 | kubernetes_tools.register_tools(mcp) 52 | 53 | # Register monitoring tools 54 | monitoring_tools.register_tools(mcp) 55 | 56 | # Register databases tools 57 | databases_tools.register_tools(mcp) 58 | 59 | # Register deployment tools 60 | deployment_tools.register_tools(mcp) 61 | 62 | # Register all tools 63 | register_tools() 64 | 65 | # if __name__ == "__main__": 66 | # mcp.run(transport='stdio') ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/billing/tools.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Google Cloud Platform Billing tools. 3 | """ 4 | from typing import List, Dict, Any, Optional 5 | 6 | def register_tools(mcp): 7 | """Register all billing tools with the MCP server.""" 8 | 9 | @mcp.tool() 10 | def get_billing_info(project_id: str) -> str: 11 | """ 12 | Get billing information for a GCP project. 13 | 14 | Args: 15 | project_id: The ID of the GCP project to get billing information for 16 | 17 | Returns: 18 | Billing information for the specified GCP project 19 | """ 20 | try: 21 | try: 22 | from google.cloud import billing_v1 23 | except ImportError: 24 | return "Error: The Google Cloud billing library is not installed. Please install it with 'pip install google-cloud-billing'." 25 | 26 | # Initialize the Cloud Billing client 27 | billing_client = billing_v1.CloudBillingClient() 28 | 29 | # Get the billing account for the project 30 | project_name = f"projects/{project_id}" 31 | billing_info = billing_client.get_project_billing_info(name=project_name) 32 | 33 | # If billing is enabled, get more details about the billing account 34 | if billing_info.billing_account_name: 35 | billing_account = billing_client.get_billing_account( 36 | name=billing_info.billing_account_name 37 | ) 38 | 39 | # Initialize the Cloud Catalog client to get pricing information 40 | catalog_client = billing_v1.CloudCatalogClient() 41 | 42 | # Format the response 43 | return f""" 44 | Billing Information for GCP Project {project_id}: 45 | 46 | Billing Enabled: {billing_info.billing_enabled} 47 | Billing Account: {billing_info.billing_account_name} 48 | Display Name: {billing_account.display_name} 49 | Open: {billing_account.open} 50 | """ 51 | else: 52 | return f"Billing is not enabled for project {project_id}." 53 | except Exception as e: 54 | return f"Error getting billing information: {str(e)}" ``` -------------------------------------------------------------------------------- /tests/mock_gcp.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Mock functions for testing GCP functionality. 3 | """ 4 | 5 | def list_gcp_projects(): 6 | """Mock function to list GCP projects.""" 7 | return ["test-project-1", "test-project-2", "test-project-3"] 8 | 9 | def get_gcp_project_details(project_id): 10 | """Mock function to get details of a GCP project.""" 11 | return f""" 12 | Project ID: {project_id} 13 | Name: Test Project 14 | Created: 2023-01-01T00:00:00Z 15 | Status: ACTIVE 16 | Labels: 17 | - env: test 18 | - department: engineering 19 | """ 20 | 21 | def list_gcp_services(project_id): 22 | """Mock function to list enabled services in a GCP project.""" 23 | return f""" 24 | Enabled services in project {project_id}: 25 | - compute.googleapis.com: Compute Engine API 26 | - storage.googleapis.com: Cloud Storage API 27 | - iam.googleapis.com: Identity and Access Management (IAM) API 28 | """ 29 | 30 | def list_compute_instances(project_id, zone=None): 31 | """Mock function to list Compute Engine instances.""" 32 | zone_str = f" in zone {zone}" if zone else "" 33 | return f""" 34 | Compute Engine instances in project {project_id}{zone_str}: 35 | - instance-1 (n1-standard-1): RUNNING 36 | Zone: us-central1-a 37 | Created: 2023-01-01 00:00:00 UTC 38 | 39 | - instance-2 (n1-standard-2): STOPPED 40 | Zone: us-central1-a 41 | Created: 2023-02-01 00:00:00 UTC 42 | """ 43 | 44 | def check_iam_permissions(project_id): 45 | """Mock function to check IAM permissions in a GCP project.""" 46 | return f""" 47 | IAM permissions in project {project_id}: 48 | - roles/viewer: [email protected] 49 | - roles/editor: [email protected] 50 | """ 51 | 52 | def list_storage_buckets(project_id): 53 | """Mock function to list Cloud Storage buckets in a GCP project.""" 54 | return f""" 55 | Cloud Storage buckets in project {project_id}: 56 | - test-bucket-1 57 | Location: us-central1 58 | Storage class: STANDARD 59 | Created: 2023-01-01 00:00:00 UTC 60 | 61 | - test-bucket-2 62 | Location: us-east1 63 | Storage class: NEARLINE 64 | Created: 2023-02-01 00:00:00 UTC 65 | """ 66 | 67 | def get_billing_info(project_id): 68 | """Mock function to get billing information for a GCP project.""" 69 | return f""" 70 | Billing information for project {project_id}: 71 | - Billing account: 123456-ABCDEF-123456 72 | - Billing account name: My Billing Account 73 | - Billing account status: Open 74 | - Billing enabled: Yes 75 | - Currency: USD 76 | """ ``` -------------------------------------------------------------------------------- /tests/unit/test_gcp_functions.py: -------------------------------------------------------------------------------- ```python 1 | import pytest 2 | from unittest.mock import patch, MagicMock 3 | import sys 4 | import os 5 | 6 | # Add the parent directory to the Python path so we can import the mock modules 7 | sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) 8 | 9 | # Import the functions to test from the mock modules 10 | from tests.mock_gcp import ( 11 | list_gcp_projects, 12 | get_gcp_project_details, 13 | list_gcp_services, 14 | list_compute_instances, 15 | check_iam_permissions, 16 | list_storage_buckets, 17 | get_billing_info 18 | ) 19 | 20 | 21 | class TestGCPFunctions: 22 | """Test class for GCP-related functions.""" 23 | 24 | def test_list_gcp_projects(self): 25 | """Test the list_gcp_projects function.""" 26 | # Call the function 27 | result = list_gcp_projects() 28 | 29 | # Assertions 30 | assert isinstance(result, list) 31 | assert "test-project-1" in result 32 | assert "test-project-2" in result 33 | assert "test-project-3" in result 34 | assert len(result) == 3 35 | 36 | def test_get_gcp_project_details(self): 37 | """Test the get_gcp_project_details function.""" 38 | # Call the function 39 | result = get_gcp_project_details("test-project-id") 40 | 41 | # Assertions 42 | assert isinstance(result, str) 43 | assert "Test Project" in result 44 | assert "2023-01-01T00:00:00Z" in result 45 | assert "ACTIVE" in result 46 | assert "env: test" in result 47 | assert "department: engineering" in result 48 | 49 | def test_list_gcp_services(self): 50 | """Test the list_gcp_services function.""" 51 | # Call the function 52 | result = list_gcp_services("test-project") 53 | 54 | # Assertions 55 | assert isinstance(result, str) 56 | assert "compute.googleapis.com: Compute Engine API" in result 57 | assert "storage.googleapis.com: Cloud Storage API" in result 58 | assert "iam.googleapis.com: Identity and Access Management (IAM) API" in result 59 | 60 | def test_list_compute_instances_with_zone(self): 61 | """Test the list_compute_instances function with a specified zone.""" 62 | # Call the function with a specified zone 63 | result = list_compute_instances("test-project", "us-central1-a") 64 | 65 | # Assertions 66 | assert isinstance(result, str) 67 | assert "instance-1" in result 68 | assert "instance-2" in result 69 | assert "n1-standard-1" in result 70 | assert "n1-standard-2" in result 71 | assert "RUNNING" in result 72 | assert "STOPPED" in result 73 | assert "us-central1-a" in result 74 | 75 | def test_check_iam_permissions(self): 76 | """Test the check_iam_permissions function.""" 77 | # Call the function 78 | result = check_iam_permissions("test-project") 79 | 80 | # Assertions 81 | assert isinstance(result, str) 82 | assert "roles/viewer" in result 83 | assert "roles/editor" in result 84 | assert "[email protected]" in result 85 | 86 | def test_list_storage_buckets(self): 87 | """Test the list_storage_buckets function.""" 88 | # Call the function 89 | result = list_storage_buckets("test-project") 90 | 91 | # Assertions 92 | assert isinstance(result, str) 93 | assert "test-bucket-1" in result 94 | assert "test-bucket-2" in result 95 | assert "us-central1" in result 96 | assert "us-east1" in result 97 | assert "STANDARD" in result 98 | assert "NEARLINE" in result 99 | assert "2023-01-01 00:00:00 UTC" in result 100 | assert "2023-02-01 00:00:00 UTC" in result 101 | 102 | def test_get_billing_info(self): 103 | """Test the get_billing_info function.""" 104 | # Call the function 105 | result = get_billing_info("test-project") 106 | 107 | # Assertions 108 | assert isinstance(result, str) 109 | assert "123456-ABCDEF-123456" in result 110 | assert "My Billing Account" in result 111 | assert "Open" in result 112 | assert "Yes" in result # billing_enabled 113 | assert "USD" in result ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/resource_management/tools.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Google Cloud Platform Resource Management tools. 3 | """ 4 | from typing import List, Dict, Any, Optional 5 | 6 | def register_tools(mcp): 7 | """Register all resource management tools with the MCP server.""" 8 | 9 | @mcp.tool() 10 | def list_gcp_projects(): 11 | """ 12 | List all available GCP projects for the authenticated user. 13 | 14 | Returns: 15 | List of project IDs 16 | """ 17 | try: 18 | from google.cloud import resourcemanager_v3 19 | client = resourcemanager_v3.ProjectsClient() 20 | request = resourcemanager_v3.SearchProjectsRequest() 21 | response = client.search_projects(request=request) 22 | return [project.project_id for project in response] 23 | except Exception as e: 24 | return [f"Error listing GCP projects: {str(e)}"] 25 | 26 | @mcp.tool() 27 | def get_gcp_project_details(project_id: str) -> str: 28 | """ 29 | Get detailed information about a specific GCP project. 30 | 31 | Args: 32 | project_id: The ID of the GCP project to get details for 33 | 34 | Returns: 35 | Detailed information about the specified GCP project 36 | """ 37 | try: 38 | from google.cloud import resourcemanager_v3 39 | 40 | # Initialize the Resource Manager client 41 | client = resourcemanager_v3.ProjectsClient() 42 | 43 | # Get the project details 44 | name = f"projects/{project_id}" 45 | project = client.get_project(name=name) 46 | 47 | # Format the response 48 | project_number = project.name.split('/')[-1] if project.name else "N/A" 49 | display_name = project.display_name or "N/A" 50 | create_time = project.create_time.isoformat() if project.create_time else "N/A" 51 | state = project.state.name if project.state else "N/A" 52 | 53 | labels = dict(project.labels) if project.labels else {} 54 | labels_str = "\n".join([f" {k}: {v}" for k, v in labels.items()]) if labels else " None" 55 | 56 | return f""" 57 | GCP Project Details for {project_id}: 58 | Project Number: {project_number} 59 | Name: {display_name} 60 | Creation Time: {create_time} 61 | State: {state} 62 | Labels: 63 | {labels_str} 64 | """ 65 | except Exception as e: 66 | return f"Error getting GCP project details: {str(e)}" 67 | 68 | @mcp.tool() 69 | def list_assets(project_id: str, asset_types: Optional[List[str]] = None, page_size: int = 50) -> str: 70 | """ 71 | List assets in a GCP project using Cloud Asset Inventory API. 72 | 73 | Args: 74 | project_id: The ID of the GCP project to list assets for 75 | asset_types: Optional list of asset types to filter by (e.g., ["compute.googleapis.com/Instance"]) 76 | page_size: Number of assets to return per page (default: 50, max: 1000) 77 | 78 | Returns: 79 | List of assets in the specified GCP project 80 | """ 81 | try: 82 | try: 83 | from google.cloud import asset_v1 84 | except ImportError: 85 | return "Error: The Google Cloud Asset Inventory library is not installed. Please install it with 'pip install google-cloud-asset'." 86 | 87 | # Initialize the Asset client 88 | client = asset_v1.AssetServiceClient() 89 | 90 | # Format the parent resource 91 | parent = f"projects/{project_id}" 92 | 93 | # Create the request 94 | request = asset_v1.ListAssetsRequest( 95 | parent=parent, 96 | content_type=asset_v1.ContentType.RESOURCE, 97 | page_size=min(page_size, 1000) # API limit is 1000 98 | ) 99 | 100 | # Add asset types filter if provided 101 | if asset_types: 102 | request.asset_types = asset_types 103 | 104 | # List assets 105 | response = client.list_assets(request=request) 106 | 107 | # Format the response 108 | assets_list = [] 109 | for asset in response: 110 | asset_type = asset.asset_type 111 | name = asset.name 112 | display_name = asset.display_name if hasattr(asset, 'display_name') and asset.display_name else name.split('/')[-1] 113 | 114 | # Extract location if available 115 | location = "global" 116 | if hasattr(asset.resource, 'location') and asset.resource.location: 117 | location = asset.resource.location 118 | 119 | assets_list.append(f"- {display_name} ({asset_type})\n Location: {location}\n Name: {name}") 120 | 121 | if not assets_list: 122 | filter_msg = f" with types {asset_types}" if asset_types else "" 123 | return f"No assets found{filter_msg} in project {project_id}." 124 | 125 | # Add pagination info if there's a next page token 126 | pagination_info = "" 127 | if hasattr(response, 'next_page_token') and response.next_page_token: 128 | pagination_info = "\n\nMore assets are available. Refine your search or increase page_size to see more." 129 | 130 | return f"Assets in GCP Project {project_id}:\n\n" + "\n\n".join(assets_list) + pagination_info 131 | except Exception as e: 132 | return f"Error listing assets: {str(e)}" 133 | 134 | @mcp.tool() 135 | def set_quota_project(project_id: str) -> str: 136 | """ 137 | Set a quota project for Google Cloud API requests. 138 | 139 | This helps resolve the warning: "Your application has authenticated using end user credentials 140 | from Google Cloud SDK without a quota project." 141 | 142 | Args: 143 | project_id: The ID of the GCP project to use for quota attribution 144 | 145 | Returns: 146 | Confirmation message if successful, error message otherwise 147 | """ 148 | try: 149 | try: 150 | import google.auth 151 | from google.auth import exceptions 152 | import os 153 | except ImportError: 154 | return "Error: Required libraries not installed. Please install with 'pip install google-auth'." 155 | 156 | # Set the quota project in the environment variable 157 | os.environ["GOOGLE_CLOUD_QUOTA_PROJECT"] = project_id 158 | 159 | # Try to get credentials with the quota project 160 | try: 161 | # Get the current credentials 162 | credentials, project = google.auth.default() 163 | 164 | # Set the quota project on the credentials if supported 165 | if hasattr(credentials, "with_quota_project"): 166 | credentials = credentials.with_quota_project(project_id) 167 | 168 | # Save the credentials back (this depends on the credential type) 169 | # This is a best-effort approach 170 | try: 171 | if hasattr(google.auth, "_default_credentials"): 172 | google.auth._default_credentials = credentials 173 | except: 174 | pass 175 | 176 | return f"Successfully set quota project to '{project_id}'. New API requests will use this project for quota attribution." 177 | else: 178 | return f"Set environment variable GOOGLE_CLOUD_QUOTA_PROJECT={project_id}, but your credential type doesn't support quota projects directly." 179 | except exceptions.GoogleAuthError as e: 180 | return f"Error setting quota project: {str(e)}" 181 | except Exception as e: 182 | return f"Error setting quota project: {str(e)}" ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/storage/tools.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Google Cloud Platform Storage tools. 3 | """ 4 | from typing import List, Dict, Any, Optional 5 | 6 | def register_tools(mcp): 7 | """Register all storage tools with the MCP server.""" 8 | 9 | @mcp.tool() 10 | def list_storage_buckets(project_id: str) -> str: 11 | """ 12 | List Cloud Storage buckets in a GCP project. 13 | 14 | Args: 15 | project_id: The ID of the GCP project to list buckets for 16 | 17 | Returns: 18 | List of Cloud Storage buckets in the specified GCP project 19 | """ 20 | try: 21 | from google.cloud import storage 22 | 23 | # Initialize the Storage client 24 | client = storage.Client(project=project_id) 25 | 26 | # List buckets 27 | buckets = client.list_buckets() 28 | 29 | # Format the response 30 | buckets_list = [] 31 | for bucket in buckets: 32 | location = bucket.location or "Unknown" 33 | storage_class = bucket.storage_class or "Unknown" 34 | created = bucket.time_created.strftime("%Y-%m-%d %H:%M:%S UTC") if bucket.time_created else "Unknown" 35 | buckets_list.append(f"- {bucket.name} (Location: {location}, Class: {storage_class}, Created: {created})") 36 | 37 | if not buckets_list: 38 | return f"No Cloud Storage buckets found in project {project_id}." 39 | 40 | buckets_str = "\n".join(buckets_list) 41 | 42 | return f""" 43 | Cloud Storage Buckets in GCP Project {project_id}: 44 | {buckets_str} 45 | """ 46 | except Exception as e: 47 | return f"Error listing Cloud Storage buckets: {str(e)}" 48 | 49 | @mcp.tool() 50 | def get_bucket_details(project_id: str, bucket_name: str) -> str: 51 | """ 52 | Get detailed information about a specific Cloud Storage bucket. 53 | 54 | Args: 55 | project_id: The ID of the GCP project 56 | bucket_name: The name of the bucket to get details for 57 | 58 | Returns: 59 | Detailed information about the specified Cloud Storage bucket 60 | """ 61 | try: 62 | from google.cloud import storage 63 | 64 | # Initialize the Storage client 65 | client = storage.Client(project=project_id) 66 | 67 | # Get the bucket 68 | bucket = client.get_bucket(bucket_name) 69 | 70 | # Format the response 71 | details = [] 72 | details.append(f"Name: {bucket.name}") 73 | details.append(f"Project: {project_id}") 74 | details.append(f"Location: {bucket.location or 'Unknown'}") 75 | details.append(f"Storage Class: {bucket.storage_class or 'Unknown'}") 76 | details.append(f"Created: {bucket.time_created.strftime('%Y-%m-%d %H:%M:%S UTC') if bucket.time_created else 'Unknown'}") 77 | details.append(f"Versioning Enabled: {bucket.versioning_enabled}") 78 | details.append(f"Requester Pays: {bucket.requester_pays}") 79 | details.append(f"Lifecycle Rules: {len(bucket.lifecycle_rules) if bucket.lifecycle_rules else 0} rules defined") 80 | details.append(f"Labels: {bucket.labels or 'None'}") 81 | details.append(f"CORS: {bucket.cors or 'None'}") 82 | 83 | details_str = "\n".join(details) 84 | 85 | return f""" 86 | Cloud Storage Bucket Details: 87 | {details_str} 88 | """ 89 | except Exception as e: 90 | return f"Error getting bucket details: {str(e)}" 91 | 92 | @mcp.tool() 93 | def list_objects(project_id: str, bucket_name: str, prefix: Optional[str] = None, limit: int = 100) -> str: 94 | """ 95 | List objects in a Cloud Storage bucket. 96 | 97 | Args: 98 | project_id: The ID of the GCP project 99 | bucket_name: The name of the bucket to list objects from 100 | prefix: Optional prefix to filter objects by 101 | limit: Maximum number of objects to list (default: 100) 102 | 103 | Returns: 104 | List of objects in the specified Cloud Storage bucket 105 | """ 106 | try: 107 | from google.cloud import storage 108 | 109 | # Initialize the Storage client 110 | client = storage.Client(project=project_id) 111 | 112 | # Get the bucket 113 | bucket = client.get_bucket(bucket_name) 114 | 115 | # List blobs 116 | blobs = bucket.list_blobs(prefix=prefix, max_results=limit) 117 | 118 | # Format the response 119 | objects_list = [] 120 | for blob in blobs: 121 | size_mb = blob.size / (1024 * 1024) 122 | updated = blob.updated.strftime("%Y-%m-%d %H:%M:%S UTC") if blob.updated else "Unknown" 123 | objects_list.append(f"- {blob.name} (Size: {size_mb:.2f} MB, Updated: {updated}, Content-Type: {blob.content_type})") 124 | 125 | if not objects_list: 126 | return f"No objects found in bucket {bucket_name}{' with prefix ' + prefix if prefix else ''}." 127 | 128 | objects_str = "\n".join(objects_list) 129 | 130 | return f""" 131 | Objects in Cloud Storage Bucket {bucket_name}{' with prefix ' + prefix if prefix else ''}: 132 | {objects_str} 133 | """ 134 | except Exception as e: 135 | return f"Error listing objects: {str(e)}" 136 | 137 | @mcp.tool() 138 | def upload_object(project_id: str, bucket_name: str, source_file_path: str, destination_blob_name: Optional[str] = None, content_type: Optional[str] = None) -> str: 139 | """ 140 | Upload a file to a Cloud Storage bucket. 141 | 142 | Args: 143 | project_id: The ID of the GCP project 144 | bucket_name: The name of the bucket to upload to 145 | source_file_path: The local file path to upload 146 | destination_blob_name: The name to give the file in GCS (default: filename from source) 147 | content_type: The content type of the file (default: auto-detect) 148 | 149 | Returns: 150 | Result of the upload operation 151 | """ 152 | try: 153 | import os 154 | from google.cloud import storage 155 | 156 | # Initialize the Storage client 157 | client = storage.Client(project=project_id) 158 | 159 | # Get the bucket 160 | bucket = client.get_bucket(bucket_name) 161 | 162 | # If no destination name is provided, use the source filename 163 | if not destination_blob_name: 164 | destination_blob_name = os.path.basename(source_file_path) 165 | 166 | # Create a blob object 167 | blob = bucket.blob(destination_blob_name) 168 | 169 | # Upload the file 170 | blob.upload_from_filename(source_file_path, content_type=content_type) 171 | 172 | return f""" 173 | File successfully uploaded: 174 | - Source: {source_file_path} 175 | - Destination: gs://{bucket_name}/{destination_blob_name} 176 | - Size: {blob.size / (1024 * 1024):.2f} MB 177 | - Content-Type: {blob.content_type} 178 | """ 179 | except Exception as e: 180 | return f"Error uploading file: {str(e)}" 181 | 182 | @mcp.tool() 183 | def download_object(project_id: str, bucket_name: str, source_blob_name: str, destination_file_path: str) -> str: 184 | """ 185 | Download a file from a Cloud Storage bucket. 186 | 187 | Args: 188 | project_id: The ID of the GCP project 189 | bucket_name: The name of the bucket to download from 190 | source_blob_name: The name of the file in the bucket 191 | destination_file_path: The local path to save the file to 192 | 193 | Returns: 194 | Result of the download operation 195 | """ 196 | try: 197 | from google.cloud import storage 198 | 199 | # Initialize the Storage client 200 | client = storage.Client(project=project_id) 201 | 202 | # Get the bucket 203 | bucket = client.get_bucket(bucket_name) 204 | 205 | # Get the blob 206 | blob = bucket.blob(source_blob_name) 207 | 208 | # Download the file 209 | blob.download_to_filename(destination_file_path) 210 | 211 | return f""" 212 | File successfully downloaded: 213 | - Source: gs://{bucket_name}/{source_blob_name} 214 | - Destination: {destination_file_path} 215 | - Size: {blob.size / (1024 * 1024):.2f} MB 216 | - Content-Type: {blob.content_type} 217 | """ 218 | except Exception as e: 219 | return f"Error downloading file: {str(e)}" 220 | 221 | @mcp.tool() 222 | def delete_object(project_id: str, bucket_name: str, blob_name: str) -> str: 223 | """ 224 | Delete an object from a Cloud Storage bucket. 225 | 226 | Args: 227 | project_id: The ID of the GCP project 228 | bucket_name: The name of the bucket to delete from 229 | blob_name: The name of the file to delete 230 | 231 | Returns: 232 | Result of the delete operation 233 | """ 234 | try: 235 | from google.cloud import storage 236 | 237 | # Initialize the Storage client 238 | client = storage.Client(project=project_id) 239 | 240 | # Get the bucket 241 | bucket = client.get_bucket(bucket_name) 242 | 243 | # Delete the blob 244 | blob = bucket.blob(blob_name) 245 | blob.delete() 246 | 247 | return f"Object gs://{bucket_name}/{blob_name} has been successfully deleted." 248 | except Exception as e: 249 | return f"Error deleting object: {str(e)}" ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/iam/tools.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Google Cloud Platform IAM tools. 3 | """ 4 | from typing import List, Dict, Any, Optional 5 | 6 | def register_tools(mcp): 7 | """Register all IAM tools with the MCP server.""" 8 | 9 | @mcp.tool() 10 | def check_iam_permissions(project_id: str) -> str: 11 | """ 12 | Check IAM permissions for the current user in a GCP project. 13 | 14 | Args: 15 | project_id: The ID of the GCP project to check permissions for 16 | 17 | Returns: 18 | List of IAM permissions for the current user in the specified GCP project 19 | """ 20 | try: 21 | from google.cloud import resourcemanager_v3 22 | from google.iam.v1 import iam_policy_pb2 23 | 24 | # Initialize the Resource Manager client 25 | client = resourcemanager_v3.ProjectsClient() 26 | 27 | # Get the IAM policy for the project 28 | request = iam_policy_pb2.GetIamPolicyRequest( 29 | resource=f"projects/{project_id}" 30 | ) 31 | policy = client.get_iam_policy(request=request) 32 | 33 | # Get the current user 34 | import google.auth 35 | credentials, _ = google.auth.default() 36 | user = credentials.service_account_email if hasattr(credentials, 'service_account_email') else "current user" 37 | 38 | # Check which roles the user has 39 | user_bindings = [] 40 | for binding in policy.bindings: 41 | role = binding.role 42 | members = binding.members 43 | 44 | # Check if the current user is in the members list 45 | for member in members: 46 | if member == f"user:{user}" or member == "serviceAccount:{user}" or member == "allUsers" or member == "allAuthenticatedUsers": 47 | user_bindings.append(f"- {role}") 48 | break 49 | 50 | if not user_bindings: 51 | return f"No explicit IAM permissions found for {user} in project {project_id}." 52 | 53 | user_bindings_str = "\n".join(user_bindings) 54 | 55 | return f""" 56 | IAM Permissions for {user} in GCP Project {project_id}: 57 | {user_bindings_str} 58 | """ 59 | except Exception as e: 60 | return f"Error checking IAM permissions: {str(e)}" 61 | 62 | @mcp.tool() 63 | def list_roles(project_id: Optional[str] = None) -> str: 64 | """ 65 | List IAM roles (predefined or custom). 66 | 67 | Args: 68 | project_id: Optional project ID for listing custom roles. If not provided, lists predefined roles. 69 | 70 | Returns: 71 | List of IAM roles 72 | """ 73 | try: 74 | from google.cloud import iam_v1 75 | 76 | # Initialize the IAM client 77 | client = iam_v1.IAMClient() 78 | 79 | roles_list = [] 80 | 81 | if project_id: 82 | # List custom roles for the project 83 | request = iam_v1.ListRolesRequest( 84 | parent=f"projects/{project_id}", 85 | view=iam_v1.ListRolesRequest.RoleView.FULL 86 | ) 87 | roles = client.list_roles(request=request) 88 | 89 | for role in roles: 90 | description = role.description or "No description" 91 | roles_list.append(f"- {role.name} - {description}") 92 | 93 | if not roles_list: 94 | return f"No custom IAM roles found in project {project_id}." 95 | 96 | return f""" 97 | Custom IAM Roles in GCP Project {project_id}: 98 | {chr(10).join(roles_list)} 99 | """ 100 | else: 101 | # List predefined roles 102 | request = iam_v1.ListRolesRequest( 103 | view=iam_v1.ListRolesRequest.RoleView.BASIC 104 | ) 105 | roles = client.list_roles(request=request) 106 | 107 | for role in roles: 108 | if role.name.startswith("roles/"): 109 | description = role.description or "No description" 110 | roles_list.append(f"- {role.name} - {description}") 111 | 112 | if not roles_list: 113 | return "No predefined IAM roles found." 114 | 115 | return f""" 116 | Predefined IAM Roles in GCP: 117 | {chr(10).join(roles_list[:100])} 118 | (Limited to 100 roles. To see more specific roles, narrow your search criteria.) 119 | """ 120 | except Exception as e: 121 | return f"Error listing IAM roles: {str(e)}" 122 | 123 | @mcp.tool() 124 | def get_role_permissions(role_name: str, project_id: Optional[str] = None) -> str: 125 | """ 126 | Get detailed information about an IAM role, including its permissions. 127 | 128 | Args: 129 | role_name: The name of the role (e.g., "roles/compute.admin" or "projects/my-project/roles/myCustomRole") 130 | project_id: Optional project ID for custom roles. Not needed if role_name is fully qualified. 131 | 132 | Returns: 133 | Detailed information about the IAM role 134 | """ 135 | try: 136 | from google.cloud import iam_v1 137 | 138 | # Initialize the IAM client 139 | client = iam_v1.IAMClient() 140 | 141 | # If project_id is provided and role_name doesn't include it, create fully qualified role name 142 | if project_id and not role_name.startswith("projects/") and not role_name.startswith("roles/"): 143 | role_name = f"projects/{project_id}/roles/{role_name}" 144 | elif not role_name.startswith("projects/") and not role_name.startswith("roles/"): 145 | role_name = f"roles/{role_name}" 146 | 147 | # Get role details 148 | request = iam_v1.GetRoleRequest(name=role_name) 149 | role = client.get_role(request=request) 150 | 151 | details = [] 152 | details.append(f"Name: {role.name}") 153 | details.append(f"Title: {role.title}") 154 | details.append(f"Description: {role.description or 'No description'}") 155 | 156 | if role.included_permissions: 157 | permissions_str = "\n".join([f"- {permission}" for permission in role.included_permissions]) 158 | details.append(f"Permissions ({len(role.included_permissions)}):\n{permissions_str}") 159 | else: 160 | details.append("Permissions: None") 161 | 162 | if hasattr(role, 'stage'): 163 | details.append(f"Stage: {role.stage}") 164 | 165 | if hasattr(role, 'etag'): 166 | details.append(f"ETag: {role.etag}") 167 | 168 | return f""" 169 | IAM Role Details for {role.name}: 170 | {chr(10).join(details)} 171 | """ 172 | except Exception as e: 173 | return f"Error getting role permissions: {str(e)}" 174 | 175 | @mcp.tool() 176 | def list_service_accounts(project_id: str) -> str: 177 | """ 178 | List service accounts in a GCP project. 179 | 180 | Args: 181 | project_id: The ID of the GCP project 182 | 183 | Returns: 184 | List of service accounts in the project 185 | """ 186 | try: 187 | from google.cloud import iam_v1 188 | 189 | # Initialize the IAM client 190 | client = iam_v1.IAMClient() 191 | 192 | # List service accounts 193 | request = iam_v1.ListServiceAccountsRequest( 194 | name=f"projects/{project_id}" 195 | ) 196 | service_accounts = client.list_service_accounts(request=request) 197 | 198 | accounts_list = [] 199 | for account in service_accounts: 200 | display_name = account.display_name or "No display name" 201 | accounts_list.append(f"- {account.email} ({display_name})") 202 | 203 | if not accounts_list: 204 | return f"No service accounts found in project {project_id}." 205 | 206 | accounts_str = "\n".join(accounts_list) 207 | 208 | return f""" 209 | Service Accounts in GCP Project {project_id}: 210 | {accounts_str} 211 | """ 212 | except Exception as e: 213 | return f"Error listing service accounts: {str(e)}" 214 | 215 | @mcp.tool() 216 | def create_service_account(project_id: str, account_id: str, display_name: str, description: Optional[str] = None) -> str: 217 | """ 218 | Create a new service account in a GCP project. 219 | 220 | Args: 221 | project_id: The ID of the GCP project 222 | account_id: The ID for the service account (must be between 6 and 30 characters) 223 | display_name: A user-friendly name for the service account 224 | description: Optional description for the service account 225 | 226 | Returns: 227 | Result of the service account creation 228 | """ 229 | try: 230 | from google.cloud import iam_v1 231 | 232 | # Initialize the IAM client 233 | client = iam_v1.IAMClient() 234 | 235 | # Create service account 236 | request = iam_v1.CreateServiceAccountRequest( 237 | name=f"projects/{project_id}", 238 | account_id=account_id, 239 | service_account=iam_v1.ServiceAccount( 240 | display_name=display_name, 241 | description=description 242 | ) 243 | ) 244 | service_account = client.create_service_account(request=request) 245 | 246 | return f""" 247 | Service Account created successfully: 248 | - Email: {service_account.email} 249 | - Name: {service_account.name} 250 | - Display Name: {service_account.display_name} 251 | - Description: {service_account.description or 'None'} 252 | """ 253 | except Exception as e: 254 | return f"Error creating service account: {str(e)}" 255 | 256 | @mcp.tool() 257 | def add_iam_policy_binding(project_id: str, role: str, member: str) -> str: 258 | """ 259 | Add an IAM policy binding to a GCP project. 260 | 261 | Args: 262 | project_id: The ID of the GCP project 263 | role: The role to grant (e.g., "roles/compute.admin") 264 | member: The member to grant the role to (e.g., "user:[email protected]", "serviceAccount:[email protected]") 265 | 266 | Returns: 267 | Result of the policy binding operation 268 | """ 269 | try: 270 | from google.cloud import resourcemanager_v3 271 | from google.iam.v1 import iam_policy_pb2, policy_pb2 272 | 273 | # Initialize the Resource Manager client 274 | client = resourcemanager_v3.ProjectsClient() 275 | 276 | # Get the current IAM policy 277 | get_request = iam_policy_pb2.GetIamPolicyRequest( 278 | resource=f"projects/{project_id}" 279 | ) 280 | policy = client.get_iam_policy(request=get_request) 281 | 282 | # Check if the binding already exists 283 | binding_exists = False 284 | for binding in policy.bindings: 285 | if binding.role == role and member in binding.members: 286 | binding_exists = True 287 | break 288 | 289 | if binding_exists: 290 | return f"IAM policy binding already exists: {member} already has role {role} in project {project_id}." 291 | 292 | # Add the new binding 293 | binding = policy_pb2.Binding() 294 | binding.role = role 295 | binding.members.append(member) 296 | policy.bindings.append(binding) 297 | 298 | # Set the updated IAM policy 299 | set_request = iam_policy_pb2.SetIamPolicyRequest( 300 | resource=f"projects/{project_id}", 301 | policy=policy 302 | ) 303 | updated_policy = client.set_iam_policy(request=set_request) 304 | 305 | return f""" 306 | IAM policy binding added successfully: 307 | - Project: {project_id} 308 | - Role: {role} 309 | - Member: {member} 310 | """ 311 | except Exception as e: 312 | return f"Error adding IAM policy binding: {str(e)}" ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/databases/tools.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Google Cloud Platform Database tools. 3 | """ 4 | from typing import List, Dict, Any, Optional 5 | 6 | def register_tools(mcp): 7 | """Register all database tools with the MCP server.""" 8 | 9 | @mcp.tool() 10 | def list_cloud_sql_instances(project_id: str) -> str: 11 | """ 12 | List Cloud SQL instances in a GCP project. 13 | 14 | Args: 15 | project_id: The ID of the GCP project to list Cloud SQL instances for 16 | 17 | Returns: 18 | List of Cloud SQL instances in the specified GCP project 19 | """ 20 | try: 21 | from googleapiclient import discovery 22 | 23 | # Initialize the Cloud SQL Admin API client 24 | service = discovery.build('sqladmin', 'v1') 25 | 26 | # List SQL instances 27 | request = service.instances().list(project=project_id) 28 | response = request.execute() 29 | 30 | # Format the response 31 | instances_list = [] 32 | 33 | if 'items' in response: 34 | for instance in response['items']: 35 | name = instance.get('name', 'Unknown') 36 | db_version = instance.get('databaseVersion', 'Unknown') 37 | state = instance.get('state', 'Unknown') 38 | region = instance.get('region', 'Unknown') 39 | tier = instance.get('settings', {}).get('tier', 'Unknown') 40 | storage_size = instance.get('settings', {}).get('dataDiskSizeGb', 'Unknown') 41 | 42 | instances_list.append(f"- {name} (Type: {db_version}, Region: {region}, Tier: {tier}, Storage: {storage_size}GB, State: {state})") 43 | 44 | if not instances_list: 45 | return f"No Cloud SQL instances found in project {project_id}." 46 | 47 | instances_str = "\n".join(instances_list) 48 | 49 | return f""" 50 | Cloud SQL Instances in GCP Project {project_id}: 51 | {instances_str} 52 | """ 53 | except Exception as e: 54 | return f"Error listing Cloud SQL instances: {str(e)}" 55 | 56 | @mcp.tool() 57 | def get_sql_instance_details(project_id: str, instance_id: str) -> str: 58 | """ 59 | Get detailed information about a specific Cloud SQL instance. 60 | 61 | Args: 62 | project_id: The ID of the GCP project 63 | instance_id: The ID of the Cloud SQL instance 64 | 65 | Returns: 66 | Detailed information about the specified Cloud SQL instance 67 | """ 68 | try: 69 | from googleapiclient import discovery 70 | 71 | # Initialize the Cloud SQL Admin API client 72 | service = discovery.build('sqladmin', 'v1') 73 | 74 | # Get instance details 75 | request = service.instances().get(project=project_id, instance=instance_id) 76 | instance = request.execute() 77 | 78 | # Format the response 79 | details = [] 80 | details.append(f"Name: {instance.get('name', 'Unknown')}") 81 | details.append(f"Self Link: {instance.get('selfLink', 'Unknown')}") 82 | details.append(f"Database Version: {instance.get('databaseVersion', 'Unknown')}") 83 | details.append(f"State: {instance.get('state', 'Unknown')}") 84 | details.append(f"Region: {instance.get('region', 'Unknown')}") 85 | 86 | # Settings information 87 | if 'settings' in instance: 88 | settings = instance['settings'] 89 | details.append(f"Tier: {settings.get('tier', 'Unknown')}") 90 | details.append(f"Storage Size: {settings.get('dataDiskSizeGb', 'Unknown')}GB") 91 | details.append(f"Availability Type: {settings.get('availabilityType', 'Unknown')}") 92 | 93 | # Backup configuration 94 | if 'backupConfiguration' in settings: 95 | backup = settings['backupConfiguration'] 96 | enabled = backup.get('enabled', False) 97 | details.append(f"Automated Backups: {'Enabled' if enabled else 'Disabled'}") 98 | if enabled: 99 | details.append(f"Backup Start Time: {backup.get('startTime', 'Unknown')}") 100 | details.append(f"Binary Log Enabled: {backup.get('binaryLogEnabled', False)}") 101 | 102 | # IP configuration 103 | if 'ipConfiguration' in settings: 104 | ip_config = settings['ipConfiguration'] 105 | public_ip = "Enabled" if not ip_config.get('privateNetwork') else "Disabled" 106 | details.append(f"Public IP: {public_ip}") 107 | 108 | if 'authorizedNetworks' in ip_config: 109 | networks = [] 110 | for network in ip_config['authorizedNetworks']: 111 | name = network.get('name', 'Unnamed') 112 | value = network.get('value', 'Unknown') 113 | networks.append(f" - {name}: {value}") 114 | 115 | if networks: 116 | details.append("Authorized Networks:") 117 | details.extend(networks) 118 | 119 | # IP Addresses 120 | if 'ipAddresses' in instance: 121 | ip_addresses = [] 122 | for ip in instance['ipAddresses']: 123 | ip_type = ip.get('type', 'Unknown') 124 | ip_address = ip.get('ipAddress', 'Unknown') 125 | ip_addresses.append(f" - {ip_type}: {ip_address}") 126 | 127 | if ip_addresses: 128 | details.append("IP Addresses:") 129 | details.extend(ip_addresses) 130 | 131 | details_str = "\n".join(details) 132 | 133 | return f""" 134 | Cloud SQL Instance Details: 135 | {details_str} 136 | """ 137 | except Exception as e: 138 | return f"Error getting SQL instance details: {str(e)}" 139 | 140 | @mcp.tool() 141 | def list_databases(project_id: str, instance_id: str) -> str: 142 | """ 143 | List databases in a Cloud SQL instance. 144 | 145 | Args: 146 | project_id: The ID of the GCP project 147 | instance_id: The ID of the Cloud SQL instance 148 | 149 | Returns: 150 | List of databases in the specified Cloud SQL instance 151 | """ 152 | try: 153 | from googleapiclient import discovery 154 | 155 | # Initialize the Cloud SQL Admin API client 156 | service = discovery.build('sqladmin', 'v1') 157 | 158 | # List databases 159 | request = service.databases().list(project=project_id, instance=instance_id) 160 | response = request.execute() 161 | 162 | # Format the response 163 | databases_list = [] 164 | 165 | if 'items' in response: 166 | for database in response['items']: 167 | name = database.get('name', 'Unknown') 168 | charset = database.get('charset', 'Unknown') 169 | collation = database.get('collation', 'Unknown') 170 | 171 | databases_list.append(f"- {name} (Charset: {charset}, Collation: {collation})") 172 | 173 | if not databases_list: 174 | return f"No databases found in Cloud SQL instance {instance_id}." 175 | 176 | databases_str = "\n".join(databases_list) 177 | 178 | return f""" 179 | Databases in Cloud SQL Instance {instance_id}: 180 | {databases_str} 181 | """ 182 | except Exception as e: 183 | return f"Error listing databases: {str(e)}" 184 | 185 | @mcp.tool() 186 | def create_backup(project_id: str, instance_id: str, description: Optional[str] = None) -> str: 187 | """ 188 | Create a backup for a Cloud SQL instance. 189 | 190 | Args: 191 | project_id: The ID of the GCP project 192 | instance_id: The ID of the Cloud SQL instance 193 | description: Optional description for the backup 194 | 195 | Returns: 196 | Result of the backup operation 197 | """ 198 | try: 199 | from googleapiclient import discovery 200 | 201 | # Initialize the Cloud SQL Admin API client 202 | service = discovery.build('sqladmin', 'v1') 203 | 204 | # Create backup 205 | backup_run_body = {} 206 | if description: 207 | backup_run_body['description'] = description 208 | 209 | request = service.backupRuns().insert(project=project_id, instance=instance_id, body=backup_run_body) 210 | operation = request.execute() 211 | 212 | # Get operation ID and status 213 | operation_id = operation.get('name', 'Unknown') 214 | status = operation.get('status', 'Unknown') 215 | 216 | return f""" 217 | Backup operation initiated: 218 | - Instance: {instance_id} 219 | - Project: {project_id} 220 | - Description: {description or 'None provided'} 221 | 222 | Operation ID: {operation_id} 223 | Status: {status} 224 | 225 | The backup process may take some time to complete. You can check the status of the backup using the Cloud SQL Admin API or Google Cloud Console. 226 | """ 227 | except Exception as e: 228 | return f"Error creating backup: {str(e)}" 229 | 230 | @mcp.tool() 231 | def list_firestore_databases(project_id: str) -> str: 232 | """ 233 | List Firestore databases in a GCP project. 234 | 235 | Args: 236 | project_id: The ID of the GCP project to list Firestore databases for 237 | 238 | Returns: 239 | List of Firestore databases in the specified GCP project 240 | """ 241 | try: 242 | from google.cloud import firestore_admin_v1 243 | 244 | # Initialize the Firestore Admin client 245 | client = firestore_admin_v1.FirestoreAdminClient() 246 | 247 | # List databases 248 | parent = f"projects/{project_id}" 249 | databases = client.list_databases(parent=parent) 250 | 251 | # Format the response 252 | databases_list = [] 253 | 254 | for database in databases: 255 | name = database.name.split('/')[-1] 256 | db_type = "Firestore Native" if database.type_ == firestore_admin_v1.Database.DatabaseType.FIRESTORE_NATIVE else "Datastore Mode" 257 | location = database.location_id 258 | 259 | databases_list.append(f"- {name} (Type: {db_type}, Location: {location})") 260 | 261 | if not databases_list: 262 | return f"No Firestore databases found in project {project_id}." 263 | 264 | databases_str = "\n".join(databases_list) 265 | 266 | return f""" 267 | Firestore Databases in GCP Project {project_id}: 268 | {databases_str} 269 | """ 270 | except Exception as e: 271 | return f"Error listing Firestore databases: {str(e)}" 272 | 273 | @mcp.tool() 274 | def list_firestore_collections(project_id: str, database_id: str = "(default)") -> str: 275 | """ 276 | List collections in a Firestore database. 277 | 278 | Args: 279 | project_id: The ID of the GCP project 280 | database_id: The ID of the Firestore database (default is "(default)") 281 | 282 | Returns: 283 | List of collections in the specified Firestore database 284 | """ 285 | try: 286 | from google.cloud import firestore 287 | 288 | # Initialize the Firestore client 289 | client = firestore.Client(project=project_id, database=database_id) 290 | 291 | # List collections 292 | collections = client.collections() 293 | 294 | # Format the response 295 | collections_list = [] 296 | 297 | for collection in collections: 298 | collections_list.append(f"- {collection.id}") 299 | 300 | if not collections_list: 301 | return f"No collections found in Firestore database {database_id}." 302 | 303 | collections_str = "\n".join(collections_list) 304 | 305 | return f""" 306 | Collections in Firestore Database {database_id} (Project: {project_id}): 307 | {collections_str} 308 | """ 309 | except Exception as e: 310 | return f"Error listing Firestore collections: {str(e)}" ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/kubernetes/tools.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Google Cloud Platform Kubernetes Engine (GKE) tools. 3 | """ 4 | from typing import List, Dict, Any, Optional 5 | 6 | def register_tools(mcp): 7 | """Register all kubernetes tools with the MCP server.""" 8 | 9 | @mcp.tool() 10 | def list_gke_clusters(project_id: str, region: str = "") -> str: 11 | """ 12 | List Google Kubernetes Engine (GKE) clusters in a GCP project. 13 | 14 | Args: 15 | project_id: The ID of the GCP project to list GKE clusters for 16 | region: Optional region to filter clusters (e.g., "us-central1") 17 | 18 | Returns: 19 | List of GKE clusters in the specified GCP project 20 | """ 21 | try: 22 | from google.cloud import container_v1 23 | 24 | # Initialize the GKE client 25 | client = container_v1.ClusterManagerClient() 26 | 27 | clusters_list = [] 28 | 29 | if region: 30 | # List clusters in the specified region 31 | parent = f"projects/{project_id}/locations/{region}" 32 | response = client.list_clusters(parent=parent) 33 | 34 | for cluster in response.clusters: 35 | version = cluster.current_master_version 36 | node_count = sum(pool.initial_node_count for pool in cluster.node_pools) 37 | status = "Running" if cluster.status == container_v1.Cluster.Status.RUNNING else cluster.status.name 38 | clusters_list.append(f"- {cluster.name} (Region: {region}, Version: {version}, Nodes: {node_count}, Status: {status})") 39 | else: 40 | # List clusters in all regions 41 | from google.cloud import compute_v1 42 | 43 | # Get all regions 44 | regions_client = compute_v1.RegionsClient() 45 | regions_request = compute_v1.ListRegionsRequest(project=project_id) 46 | regions = regions_client.list(request=regions_request) 47 | 48 | for region_item in regions: 49 | region_name = region_item.name 50 | parent = f"projects/{project_id}/locations/{region_name}" 51 | try: 52 | response = client.list_clusters(parent=parent) 53 | 54 | for cluster in response.clusters: 55 | version = cluster.current_master_version 56 | node_count = sum(pool.initial_node_count for pool in cluster.node_pools) 57 | status = "Running" if cluster.status == container_v1.Cluster.Status.RUNNING else cluster.status.name 58 | clusters_list.append(f"- {cluster.name} (Region: {region_name}, Version: {version}, Nodes: {node_count}, Status: {status})") 59 | except Exception: 60 | # Skip regions where we can't list clusters 61 | continue 62 | 63 | # Also check zonal clusters 64 | zones_client = compute_v1.ZonesClient() 65 | zones_request = compute_v1.ListZonesRequest(project=project_id) 66 | zones = zones_client.list(request=zones_request) 67 | 68 | for zone_item in zones: 69 | zone_name = zone_item.name 70 | parent = f"projects/{project_id}/locations/{zone_name}" 71 | try: 72 | response = client.list_clusters(parent=parent) 73 | 74 | for cluster in response.clusters: 75 | version = cluster.current_master_version 76 | node_count = sum(pool.initial_node_count for pool in cluster.node_pools) 77 | status = "Running" if cluster.status == container_v1.Cluster.Status.RUNNING else cluster.status.name 78 | clusters_list.append(f"- {cluster.name} (Zone: {zone_name}, Version: {version}, Nodes: {node_count}, Status: {status})") 79 | except Exception: 80 | # Skip zones where we can't list clusters 81 | continue 82 | 83 | if not clusters_list: 84 | region_msg = f" in region {region}" if region else "" 85 | return f"No GKE clusters found{region_msg} for project {project_id}." 86 | 87 | clusters_str = "\n".join(clusters_list) 88 | region_msg = f" in region {region}" if region else "" 89 | 90 | return f""" 91 | Google Kubernetes Engine (GKE) Clusters{region_msg} in GCP Project {project_id}: 92 | {clusters_str} 93 | """ 94 | except Exception as e: 95 | return f"Error listing GKE clusters: {str(e)}" 96 | 97 | @mcp.tool() 98 | def get_cluster_details(project_id: str, cluster_name: str, location: str) -> str: 99 | """ 100 | Get detailed information about a specific GKE cluster. 101 | 102 | Args: 103 | project_id: The ID of the GCP project 104 | cluster_name: The name of the GKE cluster 105 | location: The location (region or zone) of the cluster 106 | 107 | Returns: 108 | Detailed information about the specified GKE cluster 109 | """ 110 | try: 111 | from google.cloud import container_v1 112 | 113 | # Initialize the GKE client 114 | client = container_v1.ClusterManagerClient() 115 | 116 | # Get cluster details 117 | cluster_path = f"projects/{project_id}/locations/{location}/clusters/{cluster_name}" 118 | cluster = client.get_cluster(name=cluster_path) 119 | 120 | # Format the response 121 | details = [] 122 | details.append(f"Name: {cluster.name}") 123 | details.append(f"Description: {cluster.description or 'None'}") 124 | details.append(f"Location: {location}") 125 | details.append(f"Location Type: {'Regional' if '-' not in location else 'Zonal'}") 126 | details.append(f"Status: {'Running' if cluster.status == container_v1.Cluster.Status.RUNNING else cluster.status.name}") 127 | details.append(f"Kubernetes Version: {cluster.current_master_version}") 128 | details.append(f"Network: {cluster.network}") 129 | details.append(f"Subnetwork: {cluster.subnetwork}") 130 | details.append(f"Cluster CIDR: {cluster.cluster_ipv4_cidr}") 131 | details.append(f"Services CIDR: {cluster.services_ipv4_cidr}") 132 | details.append(f"Endpoint: {cluster.endpoint}") 133 | 134 | # Add Node Pools information 135 | node_pools = [] 136 | for pool in cluster.node_pools: 137 | machine_type = pool.config.machine_type 138 | disk_size_gb = pool.config.disk_size_gb 139 | autoscaling = "Enabled" if pool.autoscaling and pool.autoscaling.enabled else "Disabled" 140 | min_nodes = pool.autoscaling.min_node_count if pool.autoscaling and pool.autoscaling.enabled else "N/A" 141 | max_nodes = pool.autoscaling.max_node_count if pool.autoscaling and pool.autoscaling.enabled else "N/A" 142 | initial_nodes = pool.initial_node_count 143 | 144 | node_pools.append(f" - {pool.name} (Machine: {machine_type}, Disk: {disk_size_gb}GB, Initial Nodes: {initial_nodes})") 145 | if autoscaling == "Enabled": 146 | node_pools.append(f" Autoscaling: {autoscaling} (Min: {min_nodes}, Max: {max_nodes})") 147 | 148 | if node_pools: 149 | details.append(f"Node Pools ({len(cluster.node_pools)}):\n" + "\n".join(node_pools)) 150 | 151 | # Add Addons information 152 | addons = [] 153 | if cluster.addons_config: 154 | config = cluster.addons_config 155 | addons.append(f" - HTTP Load Balancing: {'Enabled' if not config.http_load_balancing or not config.http_load_balancing.disabled else 'Disabled'}") 156 | addons.append(f" - Horizontal Pod Autoscaling: {'Enabled' if not config.horizontal_pod_autoscaling or not config.horizontal_pod_autoscaling.disabled else 'Disabled'}") 157 | addons.append(f" - Kubernetes Dashboard: {'Enabled' if not config.kubernetes_dashboard or not config.kubernetes_dashboard.disabled else 'Disabled'}") 158 | addons.append(f" - Network Policy: {'Enabled' if config.network_policy_config and not config.network_policy_config.disabled else 'Disabled'}") 159 | 160 | if addons: 161 | details.append(f"Addons:\n" + "\n".join(addons)) 162 | 163 | details_str = "\n".join(details) 164 | 165 | return f""" 166 | GKE Cluster Details: 167 | {details_str} 168 | """ 169 | except Exception as e: 170 | return f"Error getting cluster details: {str(e)}" 171 | 172 | @mcp.tool() 173 | def list_node_pools(project_id: str, cluster_name: str, location: str) -> str: 174 | """ 175 | List node pools in a GKE cluster. 176 | 177 | Args: 178 | project_id: The ID of the GCP project 179 | cluster_name: The name of the GKE cluster 180 | location: The location (region or zone) of the cluster 181 | 182 | Returns: 183 | List of node pools in the specified GKE cluster 184 | """ 185 | try: 186 | from google.cloud import container_v1 187 | 188 | # Initialize the GKE client 189 | client = container_v1.ClusterManagerClient() 190 | 191 | # List node pools 192 | cluster_path = f"projects/{project_id}/locations/{location}/clusters/{cluster_name}" 193 | node_pools = client.list_node_pools(parent=cluster_path) 194 | 195 | # Format the response 196 | pools_list = [] 197 | for pool in node_pools.node_pools: 198 | machine_type = pool.config.machine_type 199 | disk_size_gb = pool.config.disk_size_gb 200 | autoscaling = "Enabled" if pool.autoscaling and pool.autoscaling.enabled else "Disabled" 201 | min_nodes = pool.autoscaling.min_node_count if pool.autoscaling and pool.autoscaling.enabled else "N/A" 202 | max_nodes = pool.autoscaling.max_node_count if pool.autoscaling and pool.autoscaling.enabled else "N/A" 203 | initial_nodes = pool.initial_node_count 204 | 205 | pool_info = [ 206 | f"- {pool.name}:", 207 | f" Machine Type: {machine_type}", 208 | f" Disk Size: {disk_size_gb}GB", 209 | f" Initial Node Count: {initial_nodes}", 210 | f" Autoscaling: {autoscaling}" 211 | ] 212 | 213 | if autoscaling == "Enabled": 214 | pool_info.append(f" Min Nodes: {min_nodes}") 215 | pool_info.append(f" Max Nodes: {max_nodes}") 216 | 217 | if pool.config.labels: 218 | labels = [f"{k}: {v}" for k, v in pool.config.labels.items()] 219 | pool_info.append(f" Labels: {', '.join(labels)}") 220 | 221 | pools_list.append("\n".join(pool_info)) 222 | 223 | if not pools_list: 224 | return f"No node pools found in GKE cluster {cluster_name} in location {location}." 225 | 226 | pools_str = "\n".join(pools_list) 227 | 228 | return f""" 229 | Node Pools in GKE Cluster {cluster_name} (Location: {location}): 230 | {pools_str} 231 | """ 232 | except Exception as e: 233 | return f"Error listing node pools: {str(e)}" 234 | 235 | @mcp.tool() 236 | def resize_node_pool(project_id: str, cluster_name: str, location: str, node_pool_name: str, node_count: int) -> str: 237 | """ 238 | Resize a node pool in a GKE cluster. 239 | 240 | Args: 241 | project_id: The ID of the GCP project 242 | cluster_name: The name of the GKE cluster 243 | location: The location (region or zone) of the cluster 244 | node_pool_name: The name of the node pool to resize 245 | node_count: The new node count for the pool 246 | 247 | Returns: 248 | Result of the node pool resize operation 249 | """ 250 | try: 251 | from google.cloud import container_v1 252 | 253 | # Initialize the GKE client 254 | client = container_v1.ClusterManagerClient() 255 | 256 | # Create the node pool path 257 | node_pool_path = f"projects/{project_id}/locations/{location}/clusters/{cluster_name}/nodePools/{node_pool_name}" 258 | 259 | # Get the current node pool 260 | node_pool = client.get_node_pool(name=node_pool_path) 261 | current_node_count = node_pool.initial_node_count 262 | 263 | # Check if autoscaling is enabled 264 | if node_pool.autoscaling and node_pool.autoscaling.enabled: 265 | return f""" 266 | Cannot resize node pool {node_pool_name} because autoscaling is enabled. 267 | To manually set the node count, you must first disable autoscaling for this node pool. 268 | Current autoscaling settings: 269 | - Min nodes: {node_pool.autoscaling.min_node_count} 270 | - Max nodes: {node_pool.autoscaling.max_node_count} 271 | """ 272 | 273 | # Resize the node pool 274 | request = container_v1.SetNodePoolSizeRequest( 275 | name=node_pool_path, 276 | node_count=node_count 277 | ) 278 | operation = client.set_node_pool_size(request=request) 279 | 280 | return f""" 281 | Node pool resize operation initiated: 282 | - Cluster: {cluster_name} 283 | - Location: {location} 284 | - Node Pool: {node_pool_name} 285 | - Current Node Count: {current_node_count} 286 | - New Node Count: {node_count} 287 | 288 | Operation ID: {operation.name} 289 | Status: {operation.status.name if hasattr(operation.status, 'name') else operation.status} 290 | """ 291 | except Exception as e: 292 | return f"Error resizing node pool: {str(e)}" ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/monitoring/tools.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Google Cloud Platform Monitoring tools. 3 | """ 4 | from typing import List, Dict, Any, Optional 5 | 6 | def register_tools(mcp): 7 | """Register all monitoring tools with the MCP server.""" 8 | 9 | @mcp.tool() 10 | def list_monitoring_metrics(project_id: str, filter_str: str = "") -> str: 11 | """ 12 | List available monitoring metrics for a GCP project. 13 | 14 | Args: 15 | project_id: The ID of the GCP project to list metrics for 16 | filter_str: Optional filter string to narrow down the metrics 17 | 18 | Returns: 19 | List of available monitoring metrics in the specified GCP project 20 | """ 21 | try: 22 | try: 23 | from google.cloud import monitoring_v3 24 | except ImportError: 25 | return "Error: The Google Cloud monitoring library is not installed. Please install it with 'pip install google-cloud-monitoring'." 26 | 27 | # Initialize the Monitoring client 28 | client = monitoring_v3.MetricServiceClient() 29 | 30 | # Format the project name 31 | project_name = f"projects/{project_id}" 32 | 33 | # Create the request object with the filter 34 | request = monitoring_v3.ListMetricDescriptorsRequest( 35 | name=project_name 36 | ) 37 | 38 | # Add filter if provided 39 | if filter_str: 40 | request.filter = filter_str 41 | 42 | # List metric descriptors with optional filter 43 | descriptors = client.list_metric_descriptors(request=request) 44 | 45 | # Format the response 46 | metrics_list = [] 47 | for descriptor in descriptors: 48 | metric_type = descriptor.type 49 | display_name = descriptor.display_name or metric_type.split('/')[-1] 50 | description = descriptor.description or "No description" 51 | metrics_list.append(f"- {display_name}: {metric_type}\n {description}") 52 | 53 | if not metrics_list: 54 | filter_msg = f" with filter '{filter_str}'" if filter_str else "" 55 | return f"No metrics found{filter_msg} for project {project_id}." 56 | 57 | # Limit to 50 metrics to avoid overwhelming response 58 | if len(metrics_list) > 50: 59 | metrics_str = "\n".join(metrics_list[:50]) 60 | return f"Found {len(metrics_list)} metrics for project {project_id}. Showing first 50:\n\n{metrics_str}\n\nUse a filter to narrow down results." 61 | else: 62 | metrics_str = "\n".join(metrics_list) 63 | return f"Found {len(metrics_list)} metrics for project {project_id}:\n\n{metrics_str}" 64 | except Exception as e: 65 | return f"Error listing monitoring metrics: {str(e)}" 66 | 67 | @mcp.tool() 68 | def get_monitoring_alerts(project_id: str) -> str: 69 | """ 70 | Get active monitoring alerts for a GCP project. 71 | 72 | Args: 73 | project_id: The ID of the GCP project to get alerts for 74 | 75 | Returns: 76 | Active alerts for the specified GCP project 77 | """ 78 | try: 79 | from google.cloud import monitoring_v3 80 | from google.protobuf.json_format import MessageToDict 81 | 82 | # Initialize the Alert Policy Service client 83 | alert_client = monitoring_v3.AlertPolicyServiceClient() 84 | 85 | # Format the project name 86 | project_name = f"projects/{project_id}" 87 | 88 | # Create the request object 89 | request = monitoring_v3.ListAlertPoliciesRequest( 90 | name=project_name 91 | ) 92 | 93 | # List all alert policies 94 | alert_policies = alert_client.list_alert_policies(request=request) 95 | 96 | # Initialize the Metric Service client for metric data 97 | metric_client = monitoring_v3.MetricServiceClient() 98 | 99 | # Format the response 100 | active_alerts = [] 101 | 102 | for policy in alert_policies: 103 | # Check if the policy is enabled 104 | if not policy.enabled: 105 | continue 106 | 107 | # Check for active incidents 108 | filter_str = f'resource.type="alerting_policy" AND metric.type="monitoring.googleapis.com/alert_policy/incident_count" AND metric.label.policy_name="{policy.name.split("/")[-1]}"' 109 | 110 | # Create a time interval for the last hour 111 | import datetime 112 | from google.protobuf import timestamp_pb2 113 | 114 | now = datetime.datetime.utcnow() 115 | seconds = int(now.timestamp()) 116 | end_time = timestamp_pb2.Timestamp(seconds=seconds) 117 | 118 | start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=1) 119 | seconds = int(start_time.timestamp()) 120 | start_time_proto = timestamp_pb2.Timestamp(seconds=seconds) 121 | 122 | # Create the time interval 123 | interval = monitoring_v3.TimeInterval( 124 | start_time=start_time_proto, 125 | end_time=end_time 126 | ) 127 | 128 | # List the time series 129 | try: 130 | # Create the request object 131 | request = monitoring_v3.ListTimeSeriesRequest( 132 | name=project_name, 133 | filter=filter_str, 134 | interval=interval, 135 | view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL 136 | ) 137 | 138 | # List the time series 139 | time_series = metric_client.list_time_series(request=request) 140 | 141 | is_active = False 142 | for series in time_series: 143 | # Check if there's a non-zero value in the time series 144 | for point in series.points: 145 | if point.value.int64_value > 0: 146 | is_active = True 147 | break 148 | if is_active: 149 | break 150 | 151 | if is_active: 152 | # Format conditions 153 | conditions = [] 154 | for condition in policy.conditions: 155 | if condition.display_name: 156 | conditions.append(f" - {condition.display_name}: {condition.condition_threshold.filter}") 157 | 158 | # Add to active alerts 159 | alert_info = [ 160 | f"- {policy.display_name} (ID: {policy.name.split('/')[-1]})", 161 | f" Description: {policy.documentation.content if policy.documentation else 'No description'}", 162 | f" Severity: {policy.alert_strategy.notification_rate_limit.period.seconds}s between notifications" if policy.alert_strategy.notification_rate_limit else " No rate limiting" 163 | ] 164 | 165 | if conditions: 166 | alert_info.append(" Conditions:") 167 | alert_info.extend(conditions) 168 | 169 | active_alerts.append("\n".join(alert_info)) 170 | except Exception: 171 | # Skip if we can't check for active incidents 172 | continue 173 | 174 | if not active_alerts: 175 | return f"No active alerts found for project {project_id}." 176 | 177 | alerts_str = "\n".join(active_alerts) 178 | 179 | return f""" 180 | Active Monitoring Alerts in GCP Project {project_id}: 181 | {alerts_str} 182 | """ 183 | except Exception as e: 184 | return f"Error getting monitoring alerts: {str(e)}" 185 | 186 | @mcp.tool() 187 | def create_alert_policy(project_id: str, display_name: str, metric_type: str, 188 | filter_str: str, duration_seconds: int = 60, 189 | threshold_value: float = 0.0, comparison: str = "COMPARISON_GT", 190 | notification_channels: Optional[List[str]] = None) -> str: 191 | """ 192 | Create a new alert policy in a GCP project. 193 | 194 | Args: 195 | project_id: The ID of the GCP project 196 | display_name: The display name for the alert policy 197 | metric_type: The metric type to monitor (e.g., "compute.googleapis.com/instance/cpu/utilization") 198 | filter_str: The filter for the metric data 199 | duration_seconds: The duration in seconds over which to evaluate the condition (default: 60) 200 | threshold_value: The threshold value for the condition (default: 0.0) 201 | comparison: The comparison type (COMPARISON_GT, COMPARISON_LT, etc.) (default: COMPARISON_GT) 202 | notification_channels: Optional list of notification channel IDs 203 | 204 | Returns: 205 | Result of the alert policy creation 206 | """ 207 | try: 208 | from google.cloud import monitoring_v3 209 | from google.protobuf import duration_pb2 210 | 211 | # Initialize the Alert Policy Service client 212 | client = monitoring_v3.AlertPolicyServiceClient() 213 | 214 | # Format the project name 215 | project_name = f"projects/{project_id}" 216 | 217 | # Create a duration object 218 | duration = duration_pb2.Duration(seconds=duration_seconds) 219 | 220 | # Create the alert condition 221 | condition = monitoring_v3.AlertPolicy.Condition( 222 | display_name=f"Condition for {display_name}", 223 | condition_threshold=monitoring_v3.AlertPolicy.Condition.MetricThreshold( 224 | filter=filter_str, 225 | comparison=getattr(monitoring_v3.ComparisonType, comparison), 226 | threshold_value=threshold_value, 227 | duration=duration, 228 | trigger=monitoring_v3.AlertPolicy.Condition.Trigger( 229 | count=1 230 | ), 231 | aggregations=[ 232 | monitoring_v3.Aggregation( 233 | alignment_period=duration_pb2.Duration(seconds=60), 234 | per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN, 235 | cross_series_reducer=monitoring_v3.Aggregation.Reducer.REDUCE_MEAN 236 | ) 237 | ] 238 | ) 239 | ) 240 | 241 | # Create the alert policy 242 | alert_policy = monitoring_v3.AlertPolicy( 243 | display_name=display_name, 244 | conditions=[condition], 245 | combiner=monitoring_v3.AlertPolicy.ConditionCombinerType.OR 246 | ) 247 | 248 | # Add notification channels if provided 249 | if notification_channels: 250 | alert_policy.notification_channels = [ 251 | f"projects/{project_id}/notificationChannels/{channel_id}" 252 | for channel_id in notification_channels 253 | ] 254 | 255 | # Create the policy 256 | policy = client.create_alert_policy(name=project_name, alert_policy=alert_policy) 257 | 258 | # Format response 259 | conditions_str = "\n".join([ 260 | f"- {c.display_name}: {c.condition_threshold.filter}" 261 | for c in policy.conditions 262 | ]) 263 | 264 | notifications_str = "None" 265 | if policy.notification_channels: 266 | notifications_str = "\n".join([ 267 | f"- {channel.split('/')[-1]}" 268 | for channel in policy.notification_channels 269 | ]) 270 | 271 | return f""" 272 | Alert Policy created successfully: 273 | - Name: {policy.display_name} 274 | - Policy ID: {policy.name.split('/')[-1]} 275 | - Combiner: {policy.combiner.name} 276 | 277 | Conditions: 278 | {conditions_str} 279 | 280 | Notification Channels: 281 | {notifications_str} 282 | """ 283 | except Exception as e: 284 | return f"Error creating alert policy: {str(e)}" 285 | 286 | @mcp.tool() 287 | def list_uptime_checks(project_id: str) -> str: 288 | """ 289 | List Uptime checks in a GCP project. 290 | 291 | Args: 292 | project_id: The ID of the GCP project to list Uptime checks for 293 | 294 | Returns: 295 | List of Uptime checks in the specified GCP project 296 | """ 297 | try: 298 | from google.cloud import monitoring_v3 299 | 300 | # Initialize the Uptime Check Service client 301 | client = monitoring_v3.UptimeCheckServiceClient() 302 | 303 | # Format the project name 304 | project_name = f"projects/{project_id}" 305 | 306 | # Create the request object 307 | request = monitoring_v3.ListUptimeCheckConfigsRequest( 308 | parent=project_name 309 | ) 310 | 311 | # List uptime checks 312 | uptime_checks = client.list_uptime_check_configs(request=request) 313 | 314 | # Format the response 315 | checks_list = [] 316 | 317 | for check in uptime_checks: 318 | check_id = check.name.split('/')[-1] 319 | display_name = check.display_name 320 | period_seconds = check.period.seconds 321 | timeout_seconds = check.timeout.seconds 322 | 323 | # Get check type and details 324 | check_details = [] 325 | if check.HasField('http_check'): 326 | check_type = "HTTP" 327 | url = check.http_check.path 328 | if check.resource.HasField('monitored_resource'): 329 | host = check.monitored_resource.labels.get('host', 'Unknown') 330 | url = f"{host}{url}" 331 | elif check.http_check.HasField('host'): 332 | url = f"{check.http_check.host}{url}" 333 | check_details.append(f"URL: {url}") 334 | check_details.append(f"Port: {check.http_check.port}") 335 | 336 | elif check.HasField('tcp_check'): 337 | check_type = "TCP" 338 | if check.resource.HasField('monitored_resource'): 339 | host = check.monitored_resource.labels.get('host', 'Unknown') 340 | else: 341 | host = check.tcp_check.host 342 | check_details.append(f"Host: {host}") 343 | check_details.append(f"Port: {check.tcp_check.port}") 344 | else: 345 | check_type = "Unknown" 346 | 347 | checks_list.append(f"- {display_name} (ID: {check_id}, Type: {check_type})") 348 | checks_list.append(f" Frequency: {period_seconds}s, Timeout: {timeout_seconds}s") 349 | checks_list.extend([f" {detail}" for detail in check_details]) 350 | 351 | if not checks_list: 352 | return f"No Uptime checks found for project {project_id}." 353 | 354 | checks_str = "\n".join(checks_list) 355 | 356 | return f""" 357 | Uptime Checks in GCP Project {project_id}: 358 | {checks_str} 359 | """ 360 | except Exception as e: 361 | return f"Error listing Uptime checks: {str(e)}" ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/networking/tools.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Google Cloud Platform Networking tools. 3 | """ 4 | from typing import List, Dict, Any, Optional 5 | 6 | def register_tools(mcp): 7 | """Register all networking tools with the MCP server.""" 8 | 9 | @mcp.tool() 10 | def list_vpc_networks(project_id: str) -> str: 11 | """ 12 | List Virtual Private Cloud (VPC) networks in a GCP project. 13 | 14 | Args: 15 | project_id: The ID of the GCP project to list VPC networks for 16 | 17 | Returns: 18 | List of VPC networks in the specified GCP project 19 | """ 20 | try: 21 | from google.cloud import compute_v1 22 | 23 | # Initialize the Compute Engine client for networks 24 | client = compute_v1.NetworksClient() 25 | 26 | # List networks 27 | request = compute_v1.ListNetworksRequest(project=project_id) 28 | networks = client.list(request=request) 29 | 30 | # Format the response 31 | networks_list = [] 32 | for network in networks: 33 | subnet_mode = "Auto" if network.auto_create_subnetworks else "Custom" 34 | creation_time = network.creation_timestamp if network.creation_timestamp else "Unknown" 35 | 36 | # Get subnet information if available 37 | subnets = [] 38 | if not network.auto_create_subnetworks and network.subnetworks: 39 | for subnet_url in network.subnetworks: 40 | subnet_name = subnet_url.split('/')[-1] 41 | subnet_region = subnet_url.split('/')[-3] 42 | subnets.append(f" - {subnet_name} (Region: {subnet_region})") 43 | 44 | network_info = f"- {network.name} (Mode: {subnet_mode}, Created: {creation_time})" 45 | if subnets: 46 | network_info += "\n Subnets:\n" + "\n".join(subnets) 47 | 48 | networks_list.append(network_info) 49 | 50 | if not networks_list: 51 | return f"No VPC networks found in project {project_id}." 52 | 53 | networks_str = "\n".join(networks_list) 54 | 55 | return f""" 56 | VPC Networks in GCP Project {project_id}: 57 | {networks_str} 58 | """ 59 | except Exception as e: 60 | return f"Error listing VPC networks: {str(e)}" 61 | 62 | @mcp.tool() 63 | def get_vpc_details(project_id: str, network_name: str) -> str: 64 | """ 65 | Get detailed information about a specific VPC network. 66 | 67 | Args: 68 | project_id: The ID of the GCP project 69 | network_name: The name of the VPC network 70 | 71 | Returns: 72 | Detailed information about the specified VPC network 73 | """ 74 | try: 75 | from google.cloud import compute_v1 76 | 77 | # Initialize the Compute Engine client for networks 78 | network_client = compute_v1.NetworksClient() 79 | subnet_client = compute_v1.SubnetworksClient() 80 | 81 | # Get network details 82 | network = network_client.get(project=project_id, network=network_name) 83 | 84 | # Format the response 85 | details = [] 86 | details.append(f"Name: {network.name}") 87 | details.append(f"ID: {network.id}") 88 | details.append(f"Description: {network.description or 'None'}") 89 | details.append(f"Self Link: {network.self_link}") 90 | details.append(f"Creation Time: {network.creation_timestamp}") 91 | details.append(f"Subnet Mode: {'Auto' if network.auto_create_subnetworks else 'Custom'}") 92 | details.append(f"Routing Mode: {network.routing_config.routing_mode if network.routing_config else 'Unknown'}") 93 | details.append(f"MTU: {network.mtu}") 94 | 95 | # If it's a custom subnet mode network, get all subnets 96 | if not network.auto_create_subnetworks: 97 | # List all subnets in this network 98 | request = compute_v1.ListSubnetworksRequest(project=project_id) 99 | subnets = [] 100 | 101 | for item in subnet_client.list(request=request): 102 | # Check if the subnet belongs to this network 103 | if network.name in item.network: 104 | cidr = item.ip_cidr_range 105 | region = item.region.split('/')[-1] 106 | purpose = f", Purpose: {item.purpose}" if item.purpose else "" 107 | private_ip = ", Private Google Access: Enabled" if item.private_ip_google_access else "" 108 | subnets.append(f" - {item.name} (Region: {region}, CIDR: {cidr}{purpose}{private_ip})") 109 | 110 | if subnets: 111 | details.append(f"Subnets ({len(subnets)}):\n" + "\n".join(subnets)) 112 | 113 | # List peering connections if any 114 | if network.peerings: 115 | peerings = [] 116 | for peering in network.peerings: 117 | state = peering.state 118 | network_name = peering.network.split('/')[-1] 119 | peerings.append(f" - {network_name} (State: {state})") 120 | 121 | if peerings: 122 | details.append(f"Peerings ({len(peerings)}):\n" + "\n".join(peerings)) 123 | 124 | details_str = "\n".join(details) 125 | 126 | return f""" 127 | VPC Network Details: 128 | {details_str} 129 | """ 130 | except Exception as e: 131 | return f"Error getting VPC network details: {str(e)}" 132 | 133 | @mcp.tool() 134 | def list_subnets(project_id: str, region: Optional[str] = None) -> str: 135 | """ 136 | List subnets in a GCP project, optionally filtered by region. 137 | 138 | Args: 139 | project_id: The ID of the GCP project 140 | region: Optional region to filter subnets by 141 | 142 | Returns: 143 | List of subnets in the specified GCP project 144 | """ 145 | try: 146 | from google.cloud import compute_v1 147 | 148 | # Initialize the Compute Engine client for subnets 149 | client = compute_v1.SubnetworksClient() 150 | 151 | # List subnets 152 | request = compute_v1.ListSubnetworksRequest(project=project_id, region=region) if region else compute_v1.ListSubnetworksRequest(project=project_id) 153 | subnets = client.list(request=request) 154 | 155 | # Format the response 156 | subnets_list = [] 157 | for subnet in subnets: 158 | network_name = subnet.network.split('/')[-1] 159 | region_name = subnet.region.split('/')[-1] 160 | cidr = subnet.ip_cidr_range 161 | purpose = f", Purpose: {subnet.purpose}" if subnet.purpose else "" 162 | private_ip = ", Private Google Access: Enabled" if subnet.private_ip_google_access else "" 163 | 164 | subnet_info = f"- {subnet.name} (Network: {network_name}, Region: {region_name}, CIDR: {cidr}{purpose}{private_ip})" 165 | subnets_list.append(subnet_info) 166 | 167 | if not subnets_list: 168 | return f"No subnets found in project {project_id}{' for region ' + region if region else ''}." 169 | 170 | subnets_str = "\n".join(subnets_list) 171 | 172 | return f""" 173 | Subnets in GCP Project {project_id}{' for region ' + region if region else ''}: 174 | {subnets_str} 175 | """ 176 | except Exception as e: 177 | return f"Error listing subnets: {str(e)}" 178 | 179 | @mcp.tool() 180 | def create_firewall_rule(project_id: str, name: str, network: str, direction: str, priority: int, 181 | source_ranges: Optional[List[str]] = None, destination_ranges: Optional[List[str]] = None, 182 | allowed_protocols: Optional[List[Dict[str, Any]]] = None, denied_protocols: Optional[List[Dict[str, Any]]] = None, 183 | target_tags: Optional[List[str]] = None, source_tags: Optional[List[str]] = None, 184 | description: Optional[str] = None) -> str: 185 | """ 186 | Create a firewall rule in a GCP project. 187 | 188 | Args: 189 | project_id: The ID of the GCP project 190 | name: The name of the firewall rule 191 | network: The name of the network to create the firewall rule for 192 | direction: The direction of traffic to match ('INGRESS' or 'EGRESS') 193 | priority: The priority of the rule (lower number = higher priority, 0-65535) 194 | source_ranges: Optional list of source IP ranges (for INGRESS) 195 | destination_ranges: Optional list of destination IP ranges (for EGRESS) 196 | allowed_protocols: Optional list of allowed protocols, e.g. [{"IPProtocol": "tcp", "ports": ["80", "443"]}] 197 | denied_protocols: Optional list of denied protocols, e.g. [{"IPProtocol": "tcp", "ports": ["22"]}] 198 | target_tags: Optional list of target instance tags 199 | source_tags: Optional list of source instance tags (for INGRESS) 200 | description: Optional description for the firewall rule 201 | 202 | Returns: 203 | Result of the firewall rule creation 204 | """ 205 | try: 206 | from google.cloud import compute_v1 207 | 208 | # Initialize the Compute Engine client for firewall 209 | client = compute_v1.FirewallsClient() 210 | 211 | # Create the firewall resource 212 | firewall = compute_v1.Firewall() 213 | firewall.name = name 214 | firewall.network = f"projects/{project_id}/global/networks/{network}" 215 | firewall.direction = direction 216 | firewall.priority = priority 217 | 218 | if description: 219 | firewall.description = description 220 | 221 | # Set source/destination ranges based on direction 222 | if direction == "INGRESS" and source_ranges: 223 | firewall.source_ranges = source_ranges 224 | elif direction == "EGRESS" and destination_ranges: 225 | firewall.destination_ranges = destination_ranges 226 | 227 | # Set allowed protocols 228 | if allowed_protocols: 229 | firewall.allowed = [] 230 | for protocol in allowed_protocols: 231 | allowed = compute_v1.Allowed() 232 | allowed.I_p_protocol = protocol["IPProtocol"] 233 | if "ports" in protocol: 234 | allowed.ports = protocol["ports"] 235 | firewall.allowed.append(allowed) 236 | 237 | # Set denied protocols 238 | if denied_protocols: 239 | firewall.denied = [] 240 | for protocol in denied_protocols: 241 | denied = compute_v1.Denied() 242 | denied.I_p_protocol = protocol["IPProtocol"] 243 | if "ports" in protocol: 244 | denied.ports = protocol["ports"] 245 | firewall.denied.append(denied) 246 | 247 | # Set target tags 248 | if target_tags: 249 | firewall.target_tags = target_tags 250 | 251 | # Set source tags 252 | if source_tags and direction == "INGRESS": 253 | firewall.source_tags = source_tags 254 | 255 | # Create the firewall rule 256 | operation = client.insert(project=project_id, firewall_resource=firewall) 257 | 258 | return f""" 259 | Firewall rule creation initiated: 260 | - Name: {name} 261 | - Network: {network} 262 | - Direction: {direction} 263 | - Priority: {priority} 264 | - Description: {description or 'None'} 265 | - Source Ranges: {source_ranges or 'None'} 266 | - Destination Ranges: {destination_ranges or 'None'} 267 | - Allowed Protocols: {allowed_protocols or 'None'} 268 | - Denied Protocols: {denied_protocols or 'None'} 269 | - Target Tags: {target_tags or 'None'} 270 | - Source Tags: {source_tags or 'None'} 271 | 272 | Operation ID: {operation.id} 273 | Status: {operation.status} 274 | """ 275 | except Exception as e: 276 | return f"Error creating firewall rule: {str(e)}" 277 | 278 | @mcp.tool() 279 | def list_firewall_rules(project_id: str, network: Optional[str] = None) -> str: 280 | """ 281 | List firewall rules in a GCP project, optionally filtered by network. 282 | 283 | Args: 284 | project_id: The ID of the GCP project 285 | network: Optional network name to filter firewall rules by 286 | 287 | Returns: 288 | List of firewall rules in the specified GCP project 289 | """ 290 | try: 291 | from google.cloud import compute_v1 292 | 293 | # Initialize the Compute Engine client for firewall 294 | client = compute_v1.FirewallsClient() 295 | 296 | # List firewall rules 297 | request = compute_v1.ListFirewallsRequest(project=project_id) 298 | firewalls = client.list(request=request) 299 | 300 | # Format the response 301 | firewalls_list = [] 302 | for firewall in firewalls: 303 | # If network filter is applied, skip firewalls not in that network 304 | if network and network not in firewall.network: 305 | continue 306 | 307 | # Get network name from the full URL 308 | network_name = firewall.network.split('/')[-1] 309 | 310 | # Get allowed/denied protocols 311 | allowed = [] 312 | for allow in firewall.allowed: 313 | ports = f":{','.join(allow.ports)}" if allow.ports else "" 314 | allowed.append(f"{allow.I_p_protocol}{ports}") 315 | 316 | denied = [] 317 | for deny in firewall.denied: 318 | ports = f":{','.join(deny.ports)}" if deny.ports else "" 319 | denied.append(f"{deny.I_p_protocol}{ports}") 320 | 321 | # Format sources/destinations based on direction 322 | if firewall.direction == "INGRESS": 323 | sources = firewall.source_ranges or firewall.source_tags or ["Any"] 324 | destinations = ["Any"] 325 | else: # EGRESS 326 | sources = ["Any"] 327 | destinations = firewall.destination_ranges or ["Any"] 328 | 329 | # Create the firewall rule info string 330 | rule_info = [ 331 | f"- {firewall.name}", 332 | f" Network: {network_name}", 333 | f" Direction: {firewall.direction}", 334 | f" Priority: {firewall.priority}", 335 | f" Action: {'Allow' if firewall.allowed else 'Deny'}" 336 | ] 337 | 338 | if allowed: 339 | rule_info.append(f" Allowed: {', '.join(allowed)}") 340 | if denied: 341 | rule_info.append(f" Denied: {', '.join(denied)}") 342 | 343 | rule_info.append(f" Sources: {', '.join(sources)}") 344 | rule_info.append(f" Destinations: {', '.join(destinations)}") 345 | 346 | if firewall.target_tags: 347 | rule_info.append(f" Target Tags: {', '.join(firewall.target_tags)}") 348 | 349 | firewalls_list.append("\n".join(rule_info)) 350 | 351 | if not firewalls_list: 352 | return f"No firewall rules found in project {project_id}{' for network ' + network if network else ''}." 353 | 354 | firewalls_str = "\n".join(firewalls_list) 355 | 356 | return f""" 357 | Firewall Rules in GCP Project {project_id}{' for network ' + network if network else ''}: 358 | {firewalls_str} 359 | """ 360 | except Exception as e: 361 | return f"Error listing firewall rules: {str(e)}" 362 | 363 | @mcp.tool() 364 | def list_gcp_services(project_id: str) -> str: 365 | """ 366 | List enabled services/APIs in a GCP project. 367 | 368 | Args: 369 | project_id: The ID of the GCP project to list services for 370 | 371 | Returns: 372 | List of enabled services in the specified GCP project 373 | """ 374 | try: 375 | try: 376 | from google.cloud import service_usage 377 | except ImportError: 378 | return "Error: The Google Cloud service usage library is not installed. Please install it with 'pip install google-cloud-service-usage'." 379 | 380 | # Initialize the Service Usage client 381 | client = service_usage.ServiceUsageClient() 382 | 383 | # Create the request 384 | request = service_usage.ListServicesRequest( 385 | parent=f"projects/{project_id}", 386 | filter="state:ENABLED" 387 | ) 388 | 389 | # List enabled services 390 | services = client.list_services(request=request) 391 | 392 | # Format the response 393 | services_list = [] 394 | for service in services: 395 | name = service.name.split('/')[-1] if service.name else "Unknown" 396 | title = service.config.title if service.config else "Unknown" 397 | services_list.append(f"- {name}: {title}") 398 | 399 | if not services_list: 400 | return f"No services are enabled in project {project_id}." 401 | 402 | services_str = "\n".join(services_list) 403 | 404 | return f""" 405 | Enabled Services in GCP Project {project_id}: 406 | {services_str} 407 | """ 408 | except Exception as e: 409 | return f"Error listing GCP services: {str(e)}" ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/auth/tools.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Google Cloud Platform Authentication tools. 3 | """ 4 | import os 5 | import json 6 | from typing import Dict, Any, Optional, List 7 | import webbrowser 8 | import tempfile 9 | from pathlib import Path 10 | 11 | def register_tools(mcp): 12 | """Register all authentication tools with the MCP server.""" 13 | 14 | # Global variable to store the current project ID 15 | _current_project_id = None 16 | 17 | @mcp.tool() 18 | def auth_login(project_id: str = "") -> str: 19 | """ 20 | Authenticate with Google Cloud Platform using browser-based OAuth flow. 21 | 22 | Args: 23 | project_id: Optional project ID to set as default after login 24 | 25 | Returns: 26 | Status message indicating whether authentication was successful 27 | """ 28 | nonlocal _current_project_id 29 | 30 | try: 31 | from google.auth.transport.requests import Request 32 | from google.auth.exceptions import DefaultCredentialsError 33 | from google_auth_oauthlib.flow import InstalledAppFlow 34 | import google.auth 35 | 36 | # First, attempt to use existing credentials to see if we're already authenticated 37 | try: 38 | credentials, project = google.auth.default() 39 | 40 | # Test if credentials are valid 41 | if hasattr(credentials, 'refresh'): 42 | credentials.refresh(Request()) 43 | 44 | # If we get here, credentials are valid 45 | if project_id: 46 | # Update global project ID 47 | _current_project_id = project_id 48 | 49 | # Create a credential configuration file for the project 50 | _set_project_id_in_config(project_id) 51 | return f"Using existing credentials. Project set to {project_id}." 52 | else: 53 | return f"Using existing credentials. Current project: {project or 'Not set'}" 54 | 55 | except (DefaultCredentialsError, Exception) as e: 56 | # If we can't use existing credentials, proceed with login 57 | pass 58 | 59 | # Set up the OAuth flow 60 | print("Opening browser for authentication...") 61 | 62 | # Create a temporary client_secrets.json file for OAuth flow 63 | client_secrets = { 64 | "installed": { 65 | "client_id": "764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com", 66 | "project_id": "gcp-mcp", 67 | "auth_uri": "https://accounts.google.com/o/oauth2/auth", 68 | "token_uri": "https://oauth2.googleapis.com/token", 69 | "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", 70 | "client_secret": "d-FL95Q19q7MQmFpd7hHD0Ty", 71 | "redirect_uris": ["http://localhost", "urn:ietf:wg:oauth:2.0:oob"] 72 | } 73 | } 74 | 75 | with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp: 76 | temp_client_secrets_path = temp.name 77 | json.dump(client_secrets, temp) 78 | 79 | try: 80 | # Create the OAuth flow 81 | flow = InstalledAppFlow.from_client_secrets_file( 82 | temp_client_secrets_path, 83 | scopes=['https://www.googleapis.com/auth/cloud-platform'] 84 | ) 85 | 86 | # Run the flow 87 | creds = flow.run_local_server(port=0) 88 | 89 | # Save the credentials as application default credentials 90 | adc_path = _get_adc_path() 91 | os.makedirs(os.path.dirname(adc_path), exist_ok=True) 92 | 93 | # Write credentials to ADC file 94 | with open(adc_path, 'w') as f: 95 | creds_data = { 96 | "client_id": creds.client_id, 97 | "client_secret": creds.client_secret, 98 | "refresh_token": creds.refresh_token, 99 | "type": "authorized_user" 100 | } 101 | json.dump(creds_data, f) 102 | 103 | # Set project if specified 104 | if project_id: 105 | _current_project_id = project_id 106 | _set_project_id_in_config(project_id) 107 | 108 | success_msg = "Authentication successful!" 109 | 110 | if project_id: 111 | success_msg += f" Default project set to {project_id}." 112 | 113 | # Test by listing accessible projects 114 | try: 115 | from google.cloud import resourcemanager_v3 116 | 117 | # Get fresh credentials after login 118 | credentials, _ = google.auth.default() 119 | client = resourcemanager_v3.ProjectsClient(credentials=credentials) 120 | request = resourcemanager_v3.ListProjectsRequest() 121 | projects = list(client.list_projects(request=request)) 122 | 123 | project_count = len(projects) 124 | if project_count > 0: 125 | project_list = "\n".join([f"- {project.display_name} (ID: {project.project_id})" for project in projects[:5]]) 126 | if project_count > 5: 127 | project_list += f"\n... and {project_count - 5} more" 128 | 129 | success_msg += f"\n\nFound {project_count} accessible projects:\n{project_list}" 130 | except Exception as e: 131 | # Don't fail if we can't list projects 132 | pass 133 | 134 | return success_msg 135 | 136 | finally: 137 | # Clean up the temporary file 138 | try: 139 | os.unlink(temp_client_secrets_path) 140 | except: 141 | pass 142 | 143 | except Exception as e: 144 | return f"Authentication error: {str(e)}" 145 | 146 | @mcp.tool() 147 | def auth_list() -> str: 148 | """ 149 | List active Google Cloud credentials. 150 | 151 | Returns: 152 | List of active credentials and the current default account 153 | """ 154 | try: 155 | import google.auth 156 | 157 | # Check application default credentials 158 | try: 159 | credentials, project = google.auth.default() 160 | 161 | # Try to get email from credentials 162 | email = None 163 | if hasattr(credentials, 'service_account_email'): 164 | email = credentials.service_account_email 165 | elif hasattr(credentials, 'refresh_token') and credentials.refresh_token: 166 | # This is a user credential 167 | adc_path = _get_adc_path() 168 | if os.path.exists(adc_path): 169 | try: 170 | with open(adc_path, 'r') as f: 171 | data = json.load(f) 172 | if 'refresh_token' in data: 173 | # This is a user auth, but we can't get the email directly 174 | email = "User account (ADC)" 175 | except: 176 | pass 177 | 178 | credential_type = type(credentials).__name__ 179 | 180 | output = "Active Credentials:\n" 181 | if email: 182 | output += f"- {email} (Application Default Credentials, type: {credential_type})\n" 183 | else: 184 | output += f"- Application Default Credentials (type: {credential_type})\n" 185 | 186 | if project: 187 | output += f"\nCurrent Project: {project}\n" 188 | else: 189 | output += "\nNo project set in default credentials.\n" 190 | 191 | # Check for other credentials in well-known locations 192 | credentials_dir = os.path.expanduser("~/.config/gcloud/credentials") 193 | if os.path.isdir(credentials_dir): 194 | cred_files = [f for f in os.listdir(credentials_dir) if f.endswith('.json')] 195 | if cred_files: 196 | output += "\nOther available credentials:\n" 197 | for cred_file in cred_files: 198 | try: 199 | with open(os.path.join(credentials_dir, cred_file), 'r') as f: 200 | data = json.load(f) 201 | if 'client_id' in data: 202 | output += f"- User account ({cred_file})\n" 203 | elif 'private_key_id' in data: 204 | output += f"- Service account: {data.get('client_email', 'Unknown')} ({cred_file})\n" 205 | except: 206 | output += f"- Unknown credential type ({cred_file})\n" 207 | 208 | return output 209 | except Exception as e: 210 | return f"No active credentials found. Please run auth_login() to authenticate.\nError: {str(e)}" 211 | 212 | except Exception as e: 213 | return f"Error listing credentials: {str(e)}" 214 | 215 | @mcp.tool() 216 | def auth_revoke() -> str: 217 | """ 218 | Revoke Google Cloud credentials. 219 | 220 | Returns: 221 | Status message indicating whether the credentials were revoked 222 | """ 223 | try: 224 | import google.auth 225 | from google.auth.transport.requests import Request 226 | 227 | # Check if we have application default credentials 228 | try: 229 | credentials, _ = google.auth.default() 230 | 231 | # If credentials have a revoke method, use it 232 | if hasattr(credentials, 'revoke'): 233 | credentials.revoke(Request()) 234 | 235 | # Remove the application default credentials file 236 | adc_path = _get_adc_path() 237 | if os.path.exists(adc_path): 238 | os.remove(adc_path) 239 | return "Application default credentials have been revoked and removed." 240 | else: 241 | return "No application default credentials file found to remove." 242 | 243 | except Exception as e: 244 | return f"No active credentials found or failed to revoke: {str(e)}" 245 | 246 | except Exception as e: 247 | return f"Error revoking credentials: {str(e)}" 248 | 249 | @mcp.tool() 250 | def config_set_project(project_id: str) -> str: 251 | """ 252 | Set the default Google Cloud project. 253 | 254 | Args: 255 | project_id: The ID of the project to set as default 256 | 257 | Returns: 258 | Status message indicating whether the project was set 259 | """ 260 | nonlocal _current_project_id 261 | 262 | try: 263 | # Update global project ID 264 | _current_project_id = project_id 265 | 266 | # Create or update the config file 267 | _set_project_id_in_config(project_id) 268 | 269 | # Verify the project exists 270 | try: 271 | from google.cloud import resourcemanager_v3 272 | import google.auth 273 | 274 | credentials, _ = google.auth.default() 275 | client = resourcemanager_v3.ProjectsClient(credentials=credentials) 276 | name = f"projects/{project_id}" 277 | 278 | try: 279 | project = client.get_project(name=name) 280 | return f"Default project set to {project_id} ({project.display_name})." 281 | except Exception: 282 | # Project might not exist or user might not have access 283 | return f"Default project set to {project_id}. Note: Could not verify if this project exists or if you have access to it." 284 | 285 | except Exception as e: 286 | # Don't fail if we can't verify the project 287 | return f"Default project set to {project_id}." 288 | 289 | except Exception as e: 290 | return f"Error setting project: {str(e)}" 291 | 292 | @mcp.tool() 293 | def config_list() -> str: 294 | """ 295 | List the current Google Cloud configuration. 296 | 297 | Returns: 298 | Current configuration settings 299 | """ 300 | try: 301 | # Get project ID from config 302 | project_id = _get_project_id_from_config() 303 | 304 | # Get project ID from global variable if set 305 | if _current_project_id: 306 | project_id = _current_project_id 307 | 308 | output = "Current Configuration:\n" 309 | 310 | if project_id: 311 | output += f"- Project ID: {project_id}\n" 312 | else: 313 | output += "- Project ID: Not set\n" 314 | 315 | # Check if we have active credentials 316 | try: 317 | import google.auth 318 | 319 | credentials, default_project = google.auth.default() 320 | 321 | if hasattr(credentials, 'service_account_email'): 322 | output += f"- Authenticated as: {credentials.service_account_email} (Service Account)\n" 323 | else: 324 | output += "- Authenticated as: User Account\n" 325 | 326 | if default_project and default_project != project_id: 327 | output += f"- Default Project in Credentials: {default_project}\n" 328 | except Exception: 329 | output += "- Authentication: Not authenticated or credentials not found\n" 330 | 331 | # Get additional configuration 332 | config_file = os.path.join(_get_config_path(), 'configurations', 'config_default') 333 | if os.path.exists(config_file): 334 | try: 335 | with open(config_file, 'r') as f: 336 | config_lines = f.readlines() 337 | 338 | if config_lines: 339 | output += "\nAdditional Configuration Settings:\n" 340 | for line in config_lines: 341 | line = line.strip() 342 | if line and not line.startswith('#') and '=' in line: 343 | key, value = line.split('=', 1) 344 | key = key.strip() 345 | value = value.strip() 346 | 347 | # Skip project since we already displayed it 348 | if key != 'project': 349 | output += f"- {key}: {value}\n" 350 | except: 351 | pass 352 | 353 | return output 354 | 355 | except Exception as e: 356 | return f"Error listing configuration: {str(e)}" 357 | 358 | # Helper functions 359 | def _get_adc_path() -> str: 360 | """Get the path to the application default credentials file.""" 361 | # Standard ADC paths by platform 362 | if os.name == 'nt': # Windows 363 | return os.path.join(os.environ.get('APPDATA', ''), 'gcloud', 'application_default_credentials.json') 364 | else: # Linux/Mac 365 | return os.path.expanduser('~/.config/gcloud/application_default_credentials.json') 366 | 367 | def _get_config_path() -> str: 368 | """Get the path to the configuration directory.""" 369 | if os.name == 'nt': # Windows 370 | return os.path.join(os.environ.get('APPDATA', ''), 'gcloud') 371 | else: # Linux/Mac 372 | return os.path.expanduser('~/.config/gcloud') 373 | 374 | def _set_project_id_in_config(project_id: str) -> None: 375 | """Set the project ID in the configuration file.""" 376 | config_dir = _get_config_path() 377 | os.makedirs(config_dir, exist_ok=True) 378 | 379 | config_file = os.path.join(config_dir, 'configurations', 'config_default') 380 | os.makedirs(os.path.dirname(config_file), exist_ok=True) 381 | 382 | # Read existing config if it exists 383 | config_data = {} 384 | if os.path.exists(config_file): 385 | try: 386 | with open(config_file, 'r') as f: 387 | for line in f: 388 | if '=' in line: 389 | key, value = line.strip().split('=', 1) 390 | config_data[key.strip()] = value.strip() 391 | except: 392 | pass 393 | 394 | # Update project 395 | config_data['project'] = project_id 396 | 397 | # Write back config 398 | with open(config_file, 'w') as f: 399 | for key, value in config_data.items(): 400 | f.write(f"{key} = {value}\n") 401 | 402 | def _get_project_id_from_config() -> Optional[str]: 403 | """Get the project ID from the configuration file.""" 404 | config_file = os.path.join(_get_config_path(), 'configurations', 'config_default') 405 | 406 | if os.path.exists(config_file): 407 | try: 408 | with open(config_file, 'r') as f: 409 | for line in f: 410 | if line.strip().startswith('project ='): 411 | return line.split('=', 1)[1].strip() 412 | except: 413 | pass 414 | 415 | return None ``` -------------------------------------------------------------------------------- /src/gcp_mcp/gcp_modules/compute/tools.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | Google Cloud Platform Compute Engine tools. 3 | """ 4 | from typing import List, Dict, Any, Optional 5 | 6 | def register_tools(mcp): 7 | """Register all compute tools with the MCP server.""" 8 | 9 | @mcp.tool() 10 | def list_compute_instances(project_id: str, zone: str = "") -> str: 11 | """ 12 | List Compute Engine instances in a GCP project. 13 | 14 | Args: 15 | project_id: The ID of the GCP project to list instances for 16 | zone: Optional zone to filter instances (e.g., "us-central1-a") 17 | 18 | Returns: 19 | List of Compute Engine instances in the specified GCP project 20 | """ 21 | try: 22 | from google.cloud import compute_v1 23 | 24 | # Initialize the Compute Engine client 25 | client = compute_v1.InstancesClient() 26 | 27 | instances_list = [] 28 | 29 | if zone: 30 | # List instances in the specified zone 31 | request = compute_v1.ListInstancesRequest( 32 | project=project_id, 33 | zone=zone 34 | ) 35 | instances = client.list(request=request) 36 | 37 | for instance in instances: 38 | machine_type = instance.machine_type.split('/')[-1] if instance.machine_type else "Unknown" 39 | status = instance.status 40 | ext_ip = "None" 41 | int_ip = "None" 42 | 43 | # Get IP addresses 44 | if instance.network_interfaces: 45 | int_ip = instance.network_interfaces[0].network_i_p 46 | if instance.network_interfaces[0].access_configs: 47 | ext_ip = instance.network_interfaces[0].access_configs[0].nat_i_p or "None" 48 | 49 | instances_list.append(f"- {instance.name} (Zone: {zone}, Type: {machine_type}, Internal IP: {int_ip}, External IP: {ext_ip}, Status: {status})") 50 | else: 51 | # List instances in all zones 52 | zones_client = compute_v1.ZonesClient() 53 | zones_request = compute_v1.ListZonesRequest(project=project_id) 54 | zones = zones_client.list(request=zones_request) 55 | 56 | for zone_item in zones: 57 | zone_name = zone_item.name 58 | request = compute_v1.ListInstancesRequest( 59 | project=project_id, 60 | zone=zone_name 61 | ) 62 | try: 63 | instances = client.list(request=request) 64 | 65 | for instance in instances: 66 | machine_type = instance.machine_type.split('/')[-1] if instance.machine_type else "Unknown" 67 | status = instance.status 68 | ext_ip = "None" 69 | int_ip = "None" 70 | 71 | # Get IP addresses 72 | if instance.network_interfaces: 73 | int_ip = instance.network_interfaces[0].network_i_p 74 | if instance.network_interfaces[0].access_configs: 75 | ext_ip = instance.network_interfaces[0].access_configs[0].nat_i_p or "None" 76 | 77 | instances_list.append(f"- {instance.name} (Zone: {zone_name}, Type: {machine_type}, Internal IP: {int_ip}, External IP: {ext_ip}, Status: {status})") 78 | except Exception: 79 | # Skip zones where we can't list instances 80 | continue 81 | 82 | if not instances_list: 83 | zone_msg = f" in zone {zone}" if zone else "" 84 | return f"No Compute Engine instances found{zone_msg} for project {project_id}." 85 | 86 | instances_str = "\n".join(instances_list) 87 | zone_msg = f" in zone {zone}" if zone else "" 88 | 89 | return f""" 90 | Compute Engine Instances{zone_msg} in GCP Project {project_id}: 91 | {instances_str} 92 | """ 93 | except Exception as e: 94 | return f"Error listing Compute Engine instances: {str(e)}" 95 | 96 | @mcp.tool() 97 | def get_instance_details(project_id: str, zone: str, instance_name: str) -> str: 98 | """ 99 | Get detailed information about a specific Compute Engine instance. 100 | 101 | Args: 102 | project_id: The ID of the GCP project 103 | zone: The zone where the instance is located (e.g., "us-central1-a") 104 | instance_name: The name of the instance to get details for 105 | 106 | Returns: 107 | Detailed information about the specified Compute Engine instance 108 | """ 109 | try: 110 | from google.cloud import compute_v1 111 | 112 | # Initialize the Compute Engine client 113 | client = compute_v1.InstancesClient() 114 | 115 | # Get the instance details 116 | instance = client.get(project=project_id, zone=zone, instance=instance_name) 117 | 118 | # Format machine type 119 | machine_type = instance.machine_type.split('/')[-1] if instance.machine_type else "Unknown" 120 | 121 | # Format creation timestamp 122 | creation_timestamp = instance.creation_timestamp if instance.creation_timestamp else "Unknown" 123 | 124 | # Format boot disk 125 | boot_disk = "None" 126 | if instance.disks: 127 | for disk in instance.disks: 128 | if disk.boot: 129 | boot_disk = disk.source.split('/')[-1] if disk.source else "Unknown" 130 | break 131 | 132 | # Get IP addresses 133 | network_interfaces = [] 134 | if instance.network_interfaces: 135 | for i, iface in enumerate(instance.network_interfaces): 136 | network = iface.network.split('/')[-1] if iface.network else "Unknown" 137 | subnetwork = iface.subnetwork.split('/')[-1] if iface.subnetwork else "Unknown" 138 | internal_ip = iface.network_i_p or "None" 139 | 140 | # Check for external IP 141 | external_ip = "None" 142 | if iface.access_configs: 143 | external_ip = iface.access_configs[0].nat_i_p or "None" 144 | 145 | network_interfaces.append(f" Interface {i}:\n Network: {network}\n Subnetwork: {subnetwork}\n Internal IP: {internal_ip}\n External IP: {external_ip}") 146 | 147 | networks_str = "\n".join(network_interfaces) if network_interfaces else " None" 148 | 149 | # Get attached disks 150 | disks = [] 151 | if instance.disks: 152 | for i, disk in enumerate(instance.disks): 153 | disk_name = disk.source.split('/')[-1] if disk.source else "Unknown" 154 | disk_type = "Boot" if disk.boot else "Data" 155 | auto_delete = "Yes" if disk.auto_delete else "No" 156 | mode = disk.mode if disk.mode else "Unknown" 157 | 158 | disks.append(f" Disk {i}:\n Name: {disk_name}\n Type: {disk_type}\n Mode: {mode}\n Auto-delete: {auto_delete}") 159 | 160 | disks_str = "\n".join(disks) if disks else " None" 161 | 162 | # Get labels 163 | labels = [] 164 | if instance.labels: 165 | for key, value in instance.labels.items(): 166 | labels.append(f" {key}: {value}") 167 | 168 | labels_str = "\n".join(labels) if labels else " None" 169 | 170 | # Get metadata 171 | metadata_items = [] 172 | if instance.metadata and instance.metadata.items: 173 | for item in instance.metadata.items: 174 | metadata_items.append(f" {item.key}: {item.value}") 175 | 176 | metadata_str = "\n".join(metadata_items) if metadata_items else " None" 177 | 178 | return f""" 179 | Compute Engine Instance Details for {instance_name}: 180 | 181 | Project: {project_id} 182 | Zone: {zone} 183 | Machine Type: {machine_type} 184 | Status: {instance.status} 185 | Creation Time: {creation_timestamp} 186 | CPU Platform: {instance.cpu_platform} 187 | Boot Disk: {boot_disk} 188 | 189 | Network Interfaces: 190 | {networks_str} 191 | 192 | Disks: 193 | {disks_str} 194 | 195 | Labels: 196 | {labels_str} 197 | 198 | Metadata: 199 | {metadata_str} 200 | 201 | Service Accounts: {"Yes" if instance.service_accounts else "None"} 202 | """ 203 | except Exception as e: 204 | return f"Error getting instance details: {str(e)}" 205 | 206 | @mcp.tool() 207 | def start_instance(project_id: str, zone: str, instance_name: str) -> str: 208 | """ 209 | Start a Compute Engine instance. 210 | 211 | Args: 212 | project_id: The ID of the GCP project 213 | zone: The zone where the instance is located (e.g., "us-central1-a") 214 | instance_name: The name of the instance to start 215 | 216 | Returns: 217 | Status message indicating whether the instance was started successfully 218 | """ 219 | try: 220 | from google.cloud import compute_v1 221 | 222 | # Initialize the Compute Engine client 223 | client = compute_v1.InstancesClient() 224 | 225 | # Start the instance 226 | operation = client.start(project=project_id, zone=zone, instance=instance_name) 227 | 228 | # Wait for the operation to complete 229 | operation_client = compute_v1.ZoneOperationsClient() 230 | 231 | # This is a synchronous call that will wait until the operation is complete 232 | while operation.status != compute_v1.Operation.Status.DONE: 233 | operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1]) 234 | import time 235 | time.sleep(1) 236 | 237 | if operation.error: 238 | return f"Error starting instance {instance_name}: {operation.error.errors[0].message}" 239 | 240 | return f"Instance {instance_name} in zone {zone} started successfully." 241 | except Exception as e: 242 | return f"Error starting instance: {str(e)}" 243 | 244 | @mcp.tool() 245 | def stop_instance(project_id: str, zone: str, instance_name: str) -> str: 246 | """ 247 | Stop a Compute Engine instance. 248 | 249 | Args: 250 | project_id: The ID of the GCP project 251 | zone: The zone where the instance is located (e.g., "us-central1-a") 252 | instance_name: The name of the instance to stop 253 | 254 | Returns: 255 | Status message indicating whether the instance was stopped successfully 256 | """ 257 | try: 258 | from google.cloud import compute_v1 259 | 260 | # Initialize the Compute Engine client 261 | client = compute_v1.InstancesClient() 262 | 263 | # Stop the instance 264 | operation = client.stop(project=project_id, zone=zone, instance=instance_name) 265 | 266 | # Wait for the operation to complete 267 | operation_client = compute_v1.ZoneOperationsClient() 268 | 269 | # This is a synchronous call that will wait until the operation is complete 270 | while operation.status != compute_v1.Operation.Status.DONE: 271 | operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1]) 272 | import time 273 | time.sleep(1) 274 | 275 | if operation.error: 276 | return f"Error stopping instance {instance_name}: {operation.error.errors[0].message}" 277 | 278 | return f"Instance {instance_name} in zone {zone} stopped successfully." 279 | except Exception as e: 280 | return f"Error stopping instance: {str(e)}" 281 | 282 | @mcp.tool() 283 | def list_machine_types(project_id: str, zone: str) -> str: 284 | """ 285 | List available machine types in a specific zone. 286 | 287 | Args: 288 | project_id: The ID of the GCP project 289 | zone: The zone to check machine types in (e.g., "us-central1-a") 290 | 291 | Returns: 292 | List of available machine types in the specified zone 293 | """ 294 | try: 295 | from google.cloud import compute_v1 296 | 297 | # Initialize the Machine Types client 298 | client = compute_v1.MachineTypesClient() 299 | 300 | # List machine types 301 | request = compute_v1.ListMachineTypesRequest( 302 | project=project_id, 303 | zone=zone 304 | ) 305 | machine_types = client.list(request=request) 306 | 307 | # Format the response 308 | types_list = [] 309 | 310 | # Group by series 311 | series = {} 312 | for mt in machine_types: 313 | # Determine series (e.g., e2, n1, c2) 314 | name = mt.name 315 | series_name = "custom" if name.startswith("custom") else name.split("-")[0] 316 | 317 | if series_name not in series: 318 | series[series_name] = [] 319 | 320 | # Format the machine type details 321 | vcpus = mt.guest_cpus 322 | memory_gb = mt.memory_mb / 1024 # Convert MB to GB 323 | 324 | series[series_name].append(f" {name}: {vcpus} vCPUs, {memory_gb:.1f} GB RAM") 325 | 326 | # Create formatted output by series 327 | for s_name in sorted(series.keys()): 328 | types_list.append(f" {s_name} series:") 329 | types_list.extend(sorted(series[s_name])) 330 | 331 | if not types_list: 332 | return f"No machine types found in zone {zone} for project {project_id}." 333 | 334 | types_str = "\n".join(types_list) 335 | 336 | return f""" 337 | Available Machine Types in Zone {zone} for Project {project_id}: 338 | {types_str} 339 | """ 340 | except Exception as e: 341 | return f"Error listing machine types: {str(e)}" 342 | 343 | @mcp.tool() 344 | def list_disks(project_id: str, zone: str = "") -> str: 345 | """ 346 | List Compute Engine persistent disks in a GCP project. 347 | 348 | Args: 349 | project_id: The ID of the GCP project to list disks for 350 | zone: Optional zone to filter disks (e.g., "us-central1-a") 351 | 352 | Returns: 353 | List of persistent disks in the specified GCP project 354 | """ 355 | try: 356 | from google.cloud import compute_v1 357 | 358 | # Initialize the Disks client 359 | client = compute_v1.DisksClient() 360 | 361 | disks_list = [] 362 | 363 | if zone: 364 | # List disks in the specified zone 365 | request = compute_v1.ListDisksRequest( 366 | project=project_id, 367 | zone=zone 368 | ) 369 | disks = client.list(request=request) 370 | 371 | for disk in disks: 372 | size_gb = disk.size_gb 373 | disk_type = disk.type.split('/')[-1] if disk.type else "Unknown" 374 | status = disk.status 375 | users = len(disk.users) if disk.users else 0 376 | users_str = f"Attached to {users} instance(s)" if users > 0 else "Not attached" 377 | 378 | disks_list.append(f"- {disk.name} (Zone: {zone}, Type: {disk_type}, Size: {size_gb} GB, Status: {status}, {users_str})") 379 | else: 380 | # List disks in all zones 381 | zones_client = compute_v1.ZonesClient() 382 | zones_request = compute_v1.ListZonesRequest(project=project_id) 383 | zones = zones_client.list(request=zones_request) 384 | 385 | for zone_item in zones: 386 | zone_name = zone_item.name 387 | request = compute_v1.ListDisksRequest( 388 | project=project_id, 389 | zone=zone_name 390 | ) 391 | try: 392 | disks = client.list(request=request) 393 | 394 | for disk in disks: 395 | size_gb = disk.size_gb 396 | disk_type = disk.type.split('/')[-1] if disk.type else "Unknown" 397 | status = disk.status 398 | users = len(disk.users) if disk.users else 0 399 | users_str = f"Attached to {users} instance(s)" if users > 0 else "Not attached" 400 | 401 | disks_list.append(f"- {disk.name} (Zone: {zone_name}, Type: {disk_type}, Size: {size_gb} GB, Status: {status}, {users_str})") 402 | except Exception: 403 | # Skip zones where we can't list disks 404 | continue 405 | 406 | if not disks_list: 407 | zone_msg = f" in zone {zone}" if zone else "" 408 | return f"No persistent disks found{zone_msg} for project {project_id}." 409 | 410 | disks_str = "\n".join(disks_list) 411 | zone_msg = f" in zone {zone}" if zone else "" 412 | 413 | return f""" 414 | Persistent Disks{zone_msg} in GCP Project {project_id}: 415 | {disks_str} 416 | """ 417 | except Exception as e: 418 | return f"Error listing persistent disks: {str(e)}" 419 | 420 | @mcp.tool() 421 | def create_instance(project_id: str, zone: str, instance_name: str, machine_type: str, 422 | source_image: str, boot_disk_size_gb: int = 10, 423 | network: str = "default", subnet: str = "", 424 | external_ip: bool = True) -> str: 425 | """ 426 | Create a new Compute Engine instance. 427 | 428 | Args: 429 | project_id: The ID of the GCP project 430 | zone: The zone to create the instance in (e.g., "us-central1-a") 431 | instance_name: The name for the new instance 432 | machine_type: The machine type (e.g., "e2-medium") 433 | source_image: The source image for the boot disk (e.g., "projects/debian-cloud/global/images/family/debian-11") 434 | boot_disk_size_gb: The size of the boot disk in GB (default: 10) 435 | network: The network to connect to (default: "default") 436 | subnet: The subnetwork to connect to (optional) 437 | external_ip: Whether to allocate an external IP (default: True) 438 | 439 | Returns: 440 | Status message indicating whether the instance was created successfully 441 | """ 442 | try: 443 | from google.cloud import compute_v1 444 | 445 | # Initialize the clients 446 | instances_client = compute_v1.InstancesClient() 447 | 448 | # Format the machine type 449 | machine_type_url = f"projects/{project_id}/zones/{zone}/machineTypes/{machine_type}" 450 | 451 | # Create the disk configuration 452 | boot_disk = compute_v1.AttachedDisk() 453 | boot_disk.boot = True 454 | initialize_params = compute_v1.AttachedDiskInitializeParams() 455 | initialize_params.source_image = source_image 456 | initialize_params.disk_size_gb = boot_disk_size_gb 457 | boot_disk.initialize_params = initialize_params 458 | boot_disk.auto_delete = True 459 | 460 | # Create the network configuration 461 | network_interface = compute_v1.NetworkInterface() 462 | if network.startswith("projects/"): 463 | network_interface.network = network 464 | else: 465 | network_interface.network = f"projects/{project_id}/global/networks/{network}" 466 | 467 | if subnet: 468 | if subnet.startswith("projects/"): 469 | network_interface.subnetwork = subnet 470 | else: 471 | network_interface.subnetwork = f"projects/{project_id}/regions/{zone.rsplit('-', 1)[0]}/subnetworks/{subnet}" 472 | 473 | if external_ip: 474 | access_config = compute_v1.AccessConfig() 475 | access_config.name = "External NAT" 476 | access_config.type_ = "ONE_TO_ONE_NAT" 477 | access_config.network_tier = "PREMIUM" 478 | network_interface.access_configs = [access_config] 479 | 480 | # Create the instance 481 | instance = compute_v1.Instance() 482 | instance.name = instance_name 483 | instance.machine_type = machine_type_url 484 | instance.disks = [boot_disk] 485 | instance.network_interfaces = [network_interface] 486 | 487 | # Create a default service account for the instance 488 | service_account = compute_v1.ServiceAccount() 489 | service_account.email = "default" 490 | service_account.scopes = ["https://www.googleapis.com/auth/cloud-platform"] 491 | instance.service_accounts = [service_account] 492 | 493 | # Create the instance 494 | operation = instances_client.insert( 495 | project=project_id, 496 | zone=zone, 497 | instance_resource=instance 498 | ) 499 | 500 | # Wait for the create operation to complete 501 | operation_client = compute_v1.ZoneOperationsClient() 502 | 503 | # This is a synchronous call that will wait until the operation is complete 504 | while operation.status != compute_v1.Operation.Status.DONE: 505 | operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1]) 506 | import time 507 | time.sleep(1) 508 | 509 | if operation.error: 510 | return f"Error creating instance {instance_name}: {operation.error.errors[0].message}" 511 | 512 | # Get the created instance to return its details 513 | created_instance = instances_client.get(project=project_id, zone=zone, instance=instance_name) 514 | 515 | # Get the instance IP addresses 516 | internal_ip = "None" 517 | external_ip = "None" 518 | 519 | if created_instance.network_interfaces: 520 | internal_ip = created_instance.network_interfaces[0].network_i_p or "None" 521 | if created_instance.network_interfaces[0].access_configs: 522 | external_ip = created_instance.network_interfaces[0].access_configs[0].nat_i_p or "None" 523 | 524 | return f""" 525 | Instance {instance_name} created successfully in zone {zone}. 526 | 527 | Details: 528 | - Machine Type: {machine_type} 529 | - Internal IP: {internal_ip} 530 | - External IP: {external_ip} 531 | - Status: {created_instance.status} 532 | """ 533 | except Exception as e: 534 | return f"Error creating instance: {str(e)}" 535 | 536 | @mcp.tool() 537 | def delete_instance(project_id: str, zone: str, instance_name: str) -> str: 538 | """ 539 | Delete a Compute Engine instance. 540 | 541 | Args: 542 | project_id: The ID of the GCP project 543 | zone: The zone where the instance is located (e.g., "us-central1-a") 544 | instance_name: The name of the instance to delete 545 | 546 | Returns: 547 | Status message indicating whether the instance was deleted successfully 548 | """ 549 | try: 550 | from google.cloud import compute_v1 551 | 552 | # Initialize the Compute Engine client 553 | client = compute_v1.InstancesClient() 554 | 555 | # Delete the instance 556 | operation = client.delete(project=project_id, zone=zone, instance=instance_name) 557 | 558 | # Wait for the operation to complete 559 | operation_client = compute_v1.ZoneOperationsClient() 560 | 561 | # This is a synchronous call that will wait until the operation is complete 562 | while operation.status != compute_v1.Operation.Status.DONE: 563 | operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1]) 564 | import time 565 | time.sleep(1) 566 | 567 | if operation.error: 568 | return f"Error deleting instance {instance_name}: {operation.error.errors[0].message}" 569 | 570 | return f"Instance {instance_name} in zone {zone} deleted successfully." 571 | except Exception as e: 572 | return f"Error deleting instance: {str(e)}" 573 | 574 | @mcp.tool() 575 | def create_snapshot(project_id: str, zone: str, disk_name: str, snapshot_name: str, description: str = "") -> str: 576 | """ 577 | Create a snapshot of a Compute Engine disk. 578 | 579 | Args: 580 | project_id: The ID of the GCP project 581 | zone: The zone where the disk is located (e.g., "us-central1-a") 582 | disk_name: The name of the disk to snapshot 583 | snapshot_name: The name for the new snapshot 584 | description: Optional description for the snapshot 585 | 586 | Returns: 587 | Status message indicating whether the snapshot was created successfully 588 | """ 589 | try: 590 | from google.cloud import compute_v1 591 | 592 | # Initialize the Disks client 593 | disks_client = compute_v1.DisksClient() 594 | 595 | # Create the snapshot request 596 | snapshot = compute_v1.Snapshot() 597 | snapshot.name = snapshot_name 598 | if description: 599 | snapshot.description = description 600 | 601 | # Create the snapshot 602 | operation = disks_client.create_snapshot( 603 | project=project_id, 604 | zone=zone, 605 | disk=disk_name, 606 | snapshot_resource=snapshot 607 | ) 608 | 609 | # Wait for the operation to complete 610 | operation_client = compute_v1.ZoneOperationsClient() 611 | 612 | # This is a synchronous call that will wait until the operation is complete 613 | while operation.status != compute_v1.Operation.Status.DONE: 614 | operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1]) 615 | import time 616 | time.sleep(1) 617 | 618 | if operation.error: 619 | return f"Error creating snapshot {snapshot_name}: {operation.error.errors[0].message}" 620 | 621 | return f"Snapshot {snapshot_name} of disk {disk_name} in zone {zone} created successfully." 622 | except Exception as e: 623 | return f"Error creating snapshot: {str(e)}" 624 | 625 | @mcp.tool() 626 | def list_snapshots(project_id: str) -> str: 627 | """ 628 | List disk snapshots in a GCP project. 629 | 630 | Args: 631 | project_id: The ID of the GCP project to list snapshots for 632 | 633 | Returns: 634 | List of disk snapshots in the specified GCP project 635 | """ 636 | try: 637 | from google.cloud import compute_v1 638 | 639 | # Initialize the Snapshots client 640 | client = compute_v1.SnapshotsClient() 641 | 642 | # List snapshots 643 | request = compute_v1.ListSnapshotsRequest(project=project_id) 644 | snapshots = client.list(request=request) 645 | 646 | # Format the response 647 | snapshots_list = [] 648 | for snapshot in snapshots: 649 | size_gb = snapshot.disk_size_gb 650 | status = snapshot.status 651 | source_disk = snapshot.source_disk.split('/')[-1] if snapshot.source_disk else "Unknown" 652 | creation_time = snapshot.creation_timestamp if snapshot.creation_timestamp else "Unknown" 653 | 654 | snapshots_list.append(f"- {snapshot.name} (Source: {source_disk}, Size: {size_gb} GB, Status: {status}, Created: {creation_time})") 655 | 656 | if not snapshots_list: 657 | return f"No snapshots found for project {project_id}." 658 | 659 | snapshots_str = "\n".join(snapshots_list) 660 | 661 | return f""" 662 | Disk Snapshots in GCP Project {project_id}: 663 | {snapshots_str} 664 | """ 665 | except Exception as e: 666 | return f"Error listing snapshots: {str(e)}" ```