# Directory Structure
```
├── .gitignore
├── .python-version
├── pyproject.toml
├── pytest.ini
├── README.md
├── src
│ └── gcp_mcp
│ ├── __init__.py
│ ├── __main__.py
│ ├── gcp_modules
│ │ ├── __init__.py
│ │ ├── auth
│ │ │ ├── __init__.py
│ │ │ └── tools.py
│ │ ├── billing
│ │ │ ├── __init__.py
│ │ │ └── tools.py
│ │ ├── compute
│ │ │ ├── __init__.py
│ │ │ └── tools.py
│ │ ├── databases
│ │ │ ├── __init__.py
│ │ │ └── tools.py
│ │ ├── deployment
│ │ │ ├── __init__.py
│ │ │ └── tools.py
│ │ ├── iam
│ │ │ ├── __init__.py
│ │ │ └── tools.py
│ │ ├── kubernetes
│ │ │ ├── __init__.py
│ │ │ └── tools.py
│ │ ├── monitoring
│ │ │ ├── __init__.py
│ │ │ └── tools.py
│ │ ├── networking
│ │ │ ├── __init__.py
│ │ │ └── tools.py
│ │ ├── resource_management
│ │ │ ├── __init__.py
│ │ │ └── tools.py
│ │ └── storage
│ │ ├── __init__.py
│ │ └── tools.py
│ └── server.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── mock_gcp.py
│ └── unit
│ ├── __init__.py
│ └── test_gcp_functions.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.11.8
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# GCP MCP Application
## Claude Desktop Integration
To enable GCP management capabilities in Claude Desktop, simply add the following configuration to your Claude Desktop MCP configuration:
```json
{
"gcp-mcp": {
"command": "uvx",
"args": [
"gcp-mcp"
]
}
}
```
That's it! No additional setup or credential manipulation is required. When you first ask Claude to interact with your GCP resources, a browser window will automatically open for you to authenticate and grant access. Once you approve the access, Claude will be able to manage your GCP resources through natural language commands.
Here are some example requests you can make:
Basic Operations:
- "Could you list my GCP projects?"
- "Show me my compute instances"
- "What storage buckets do I have?"
Resource Creation:
- "Please create a compute instance with 2GB RAM and 10GB storage, name it MCP-engine"
- "Create a new storage bucket called my-backup-bucket in us-central1"
- "Set up a new VPC network named prod-network with subnet 10.0.0.0/24"
Resource Management:
- "Stop all compute instances in the dev project"
- "Show me all instances that have been running for more than 24 hours"
- "What's the current CPU usage of my instance named backend-server?"
- "Create a snapshot of my database disk"
Monitoring and Alerts:
- "Set up an alert for when CPU usage goes above 80%"
- "Show me all critical alerts from the last 24 hours"
- "What's the current status of my GKE clusters?"
## Features
The application provides comprehensive coverage of GCP services:
### Resource Management
- Projects and quotas management
- Asset inventory
- IAM permissions
### Compute & Infrastructure
- Compute Engine instances
- Storage buckets and disks
- VPC networks and firewall rules
- Kubernetes Engine (GKE) clusters
### Databases & Storage
- Cloud SQL instances
- Firestore databases
- Cloud Storage
- Database backups
### Monitoring & Billing
- Metrics and alerts
- Billing information
- Uptime monitoring
- Resource usage tracking
### Coming Soon
- Deployment manager and infrastructure as code
## Installation
```bash
pip install gcp-mcp
```
## License
[MIT License](LICENSE)
Your contributions and issues are welcome !
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/billing/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/compute/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/databases/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/deployment/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/iam/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/kubernetes/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/monitoring/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/networking/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/resource_management/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/storage/__init__.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/auth/__init__.py:
--------------------------------------------------------------------------------
```python
# Authentication module for Google Cloud Platform
```
--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------
```python
# This file is intentionally left empty to make the directory a proper Python package.
```
--------------------------------------------------------------------------------
/tests/unit/__init__.py:
--------------------------------------------------------------------------------
```python
# This file is intentionally left empty to make the directory a proper Python package.
```
--------------------------------------------------------------------------------
/src/gcp_mcp/__init__.py:
--------------------------------------------------------------------------------
```python
from . import server
import asyncio
import os
import sys
def main():
server.mcp.run(transport='stdio')
__all__ = ['main', 'server']
```
--------------------------------------------------------------------------------
/src/gcp_mcp/__main__.py:
--------------------------------------------------------------------------------
```python
from . import main
if __name__ == "__main__":
print("Starting gcp_mcp")
main()
else:
print("Starting gcp_mcp from import")
main()
```
--------------------------------------------------------------------------------
/pytest.ini:
--------------------------------------------------------------------------------
```
[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
markers =
asyncio: mark a test as an asyncio test
addopts = -v
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "gcp-mcp"
version = "0.1.0"
description = "MCP interface for Google Cloud Platform"
readme = "README.md"
requires-python = ">=3.11.8"
dependencies = [
"httpx>=0.28.0",
"mcp>=1.3.0",
"google-cloud-resource-manager>=1.14.0",
"google-cloud-compute>=1.26.0",
"google-cloud-storage>=3.1.0",
"google-cloud-service-usage>=1.13.0",
"google-cloud-billing>=1.16.0",
"google-api-python-client>=2.163.0",
"google-cloud-monitoring>=2.22.0",
"google-cloud-logging>=3.9.0",
"google-cloud-container>=2.35.0",
"google-cloud-firestore>=2.16.0",
"google-cloud-bigtable>=2.19.0",
"google-cloud-spanner>=3.39.0",
"google-cloud-iam>=2.17.0",
"google-cloud-vpc-access>=1.4.0",
"google-cloud-asset>=3.25.0",
"google-auth>=2.16.0",
"google-auth-oauthlib>=1.2.0",
"google-auth-httplib2>=0.1.0"
]
[project.entry-points.console]
gcp = "gcp_mcp:main"
gcp-mcp = "gcp_mcp:main"
[project.scripts]
gcp-mcp = "gcp_mcp:main"
```
--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------
```python
import pytest
import sys
from unittest.mock import MagicMock
# Mock the Google Cloud libraries
MOCK_MODULES = [
'google.cloud.resourcemanager_v3',
'google.cloud.service_usage',
'google.cloud.compute_v1',
'google.cloud.storage',
'google.cloud.billing.v1',
'google.auth',
'google.iam.v1',
'google.auth.exceptions',
'googleapiclient',
'googleapiclient.discovery'
]
for mod_name in MOCK_MODULES:
sys.modules[mod_name] = MagicMock()
# Mock specific classes
sys.modules['google.cloud.resourcemanager_v3'].ProjectsClient = MagicMock
sys.modules['google.cloud.service_usage'].ServiceUsageClient = MagicMock
sys.modules['google.cloud.compute_v1'].InstancesClient = MagicMock
sys.modules['google.cloud.compute_v1'].ZonesClient = MagicMock
sys.modules['google.cloud.storage'].Client = MagicMock
sys.modules['google.cloud.billing.v1'].CloudBillingClient = MagicMock
sys.modules['google.cloud.billing.v1'].CloudCatalogClient = MagicMock
sys.modules['google.auth'].default = MagicMock
sys.modules['google.iam.v1'].iam_policy_pb2 = MagicMock
sys.modules['googleapiclient.discovery'].build = MagicMock
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/deployment/tools.py:
--------------------------------------------------------------------------------
```python
"""
Google Cloud Platform Deployment tools.
"""
from typing import List, Dict, Any, Optional
def register_tools(mcp):
"""Register all deployment tools with the MCP server."""
@mcp.tool()
def list_deployment_manager_deployments(project_id: str) -> str:
"""
List Deployment Manager deployments in a GCP project.
Args:
project_id: The ID of the GCP project to list deployments for
Returns:
List of Deployment Manager deployments in the specified GCP project
"""
# TODO: Implement this function
return f"Not yet implemented: listing deployments for project {project_id}"
@mcp.tool()
def get_deployment_details(project_id: str, deployment_name: str) -> str:
"""
Get details of a specific Deployment Manager deployment.
Args:
project_id: The ID of the GCP project
deployment_name: The name of the deployment to get details for
Returns:
Details of the specified deployment
"""
# TODO: Implement this function
return f"Not yet implemented: getting details for deployment {deployment_name} in project {project_id}"
@mcp.tool()
def list_cloud_build_triggers(project_id: str) -> str:
"""
List Cloud Build triggers in a GCP project.
Args:
project_id: The ID of the GCP project to list build triggers for
Returns:
List of Cloud Build triggers in the specified GCP project
"""
# TODO: Implement this function
return f"Not yet implemented: listing Cloud Build triggers for project {project_id}"
```
--------------------------------------------------------------------------------
/src/gcp_mcp/server.py:
--------------------------------------------------------------------------------
```python
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP
# Import all modules
from .gcp_modules.resource_management import tools as resource_tools
from .gcp_modules.iam import tools as iam_tools
from .gcp_modules.compute import tools as compute_tools
from .gcp_modules.storage import tools as storage_tools
from .gcp_modules.billing import tools as billing_tools
from .gcp_modules.networking import tools as networking_tools
from .gcp_modules.kubernetes import tools as kubernetes_tools
from .gcp_modules.monitoring import tools as monitoring_tools
from .gcp_modules.databases import tools as databases_tools
from .gcp_modules.deployment import tools as deployment_tools
from .gcp_modules.auth import tools as auth_tools
# Initialize FastMCP server
mcp = FastMCP("gcp")
# A simple test function
@mcp.tool()
async def say_hello(name: str) -> str:
"""Say hello to a person."""
return f"Hello, {name}!"
# Register all module tools
def register_tools():
# Register authentication tools (placed first for visibility)
auth_tools.register_tools(mcp)
# Register resource management tools
resource_tools.register_tools(mcp)
# Register IAM tools
iam_tools.register_tools(mcp)
# Register compute tools
compute_tools.register_tools(mcp)
# Register storage tools
storage_tools.register_tools(mcp)
# Register billing tools
billing_tools.register_tools(mcp)
# Register networking tools
networking_tools.register_tools(mcp)
# Register kubernetes tools
kubernetes_tools.register_tools(mcp)
# Register monitoring tools
monitoring_tools.register_tools(mcp)
# Register databases tools
databases_tools.register_tools(mcp)
# Register deployment tools
deployment_tools.register_tools(mcp)
# Register all tools
register_tools()
# if __name__ == "__main__":
# mcp.run(transport='stdio')
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/billing/tools.py:
--------------------------------------------------------------------------------
```python
"""
Google Cloud Platform Billing tools.
"""
from typing import List, Dict, Any, Optional
def register_tools(mcp):
"""Register all billing tools with the MCP server."""
@mcp.tool()
def get_billing_info(project_id: str) -> str:
"""
Get billing information for a GCP project.
Args:
project_id: The ID of the GCP project to get billing information for
Returns:
Billing information for the specified GCP project
"""
try:
try:
from google.cloud import billing_v1
except ImportError:
return "Error: The Google Cloud billing library is not installed. Please install it with 'pip install google-cloud-billing'."
# Initialize the Cloud Billing client
billing_client = billing_v1.CloudBillingClient()
# Get the billing account for the project
project_name = f"projects/{project_id}"
billing_info = billing_client.get_project_billing_info(name=project_name)
# If billing is enabled, get more details about the billing account
if billing_info.billing_account_name:
billing_account = billing_client.get_billing_account(
name=billing_info.billing_account_name
)
# Initialize the Cloud Catalog client to get pricing information
catalog_client = billing_v1.CloudCatalogClient()
# Format the response
return f"""
Billing Information for GCP Project {project_id}:
Billing Enabled: {billing_info.billing_enabled}
Billing Account: {billing_info.billing_account_name}
Display Name: {billing_account.display_name}
Open: {billing_account.open}
"""
else:
return f"Billing is not enabled for project {project_id}."
except Exception as e:
return f"Error getting billing information: {str(e)}"
```
--------------------------------------------------------------------------------
/tests/mock_gcp.py:
--------------------------------------------------------------------------------
```python
"""
Mock functions for testing GCP functionality.
"""
def list_gcp_projects():
"""Mock function to list GCP projects."""
return ["test-project-1", "test-project-2", "test-project-3"]
def get_gcp_project_details(project_id):
"""Mock function to get details of a GCP project."""
return f"""
Project ID: {project_id}
Name: Test Project
Created: 2023-01-01T00:00:00Z
Status: ACTIVE
Labels:
- env: test
- department: engineering
"""
def list_gcp_services(project_id):
"""Mock function to list enabled services in a GCP project."""
return f"""
Enabled services in project {project_id}:
- compute.googleapis.com: Compute Engine API
- storage.googleapis.com: Cloud Storage API
- iam.googleapis.com: Identity and Access Management (IAM) API
"""
def list_compute_instances(project_id, zone=None):
"""Mock function to list Compute Engine instances."""
zone_str = f" in zone {zone}" if zone else ""
return f"""
Compute Engine instances in project {project_id}{zone_str}:
- instance-1 (n1-standard-1): RUNNING
Zone: us-central1-a
Created: 2023-01-01 00:00:00 UTC
- instance-2 (n1-standard-2): STOPPED
Zone: us-central1-a
Created: 2023-02-01 00:00:00 UTC
"""
def check_iam_permissions(project_id):
"""Mock function to check IAM permissions in a GCP project."""
return f"""
IAM permissions in project {project_id}:
- roles/viewer: [email protected]
- roles/editor: [email protected]
"""
def list_storage_buckets(project_id):
"""Mock function to list Cloud Storage buckets in a GCP project."""
return f"""
Cloud Storage buckets in project {project_id}:
- test-bucket-1
Location: us-central1
Storage class: STANDARD
Created: 2023-01-01 00:00:00 UTC
- test-bucket-2
Location: us-east1
Storage class: NEARLINE
Created: 2023-02-01 00:00:00 UTC
"""
def get_billing_info(project_id):
"""Mock function to get billing information for a GCP project."""
return f"""
Billing information for project {project_id}:
- Billing account: 123456-ABCDEF-123456
- Billing account name: My Billing Account
- Billing account status: Open
- Billing enabled: Yes
- Currency: USD
"""
```
--------------------------------------------------------------------------------
/tests/unit/test_gcp_functions.py:
--------------------------------------------------------------------------------
```python
import pytest
from unittest.mock import patch, MagicMock
import sys
import os
# Add the parent directory to the Python path so we can import the mock modules
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
# Import the functions to test from the mock modules
from tests.mock_gcp import (
list_gcp_projects,
get_gcp_project_details,
list_gcp_services,
list_compute_instances,
check_iam_permissions,
list_storage_buckets,
get_billing_info
)
class TestGCPFunctions:
"""Test class for GCP-related functions."""
def test_list_gcp_projects(self):
"""Test the list_gcp_projects function."""
# Call the function
result = list_gcp_projects()
# Assertions
assert isinstance(result, list)
assert "test-project-1" in result
assert "test-project-2" in result
assert "test-project-3" in result
assert len(result) == 3
def test_get_gcp_project_details(self):
"""Test the get_gcp_project_details function."""
# Call the function
result = get_gcp_project_details("test-project-id")
# Assertions
assert isinstance(result, str)
assert "Test Project" in result
assert "2023-01-01T00:00:00Z" in result
assert "ACTIVE" in result
assert "env: test" in result
assert "department: engineering" in result
def test_list_gcp_services(self):
"""Test the list_gcp_services function."""
# Call the function
result = list_gcp_services("test-project")
# Assertions
assert isinstance(result, str)
assert "compute.googleapis.com: Compute Engine API" in result
assert "storage.googleapis.com: Cloud Storage API" in result
assert "iam.googleapis.com: Identity and Access Management (IAM) API" in result
def test_list_compute_instances_with_zone(self):
"""Test the list_compute_instances function with a specified zone."""
# Call the function with a specified zone
result = list_compute_instances("test-project", "us-central1-a")
# Assertions
assert isinstance(result, str)
assert "instance-1" in result
assert "instance-2" in result
assert "n1-standard-1" in result
assert "n1-standard-2" in result
assert "RUNNING" in result
assert "STOPPED" in result
assert "us-central1-a" in result
def test_check_iam_permissions(self):
"""Test the check_iam_permissions function."""
# Call the function
result = check_iam_permissions("test-project")
# Assertions
assert isinstance(result, str)
assert "roles/viewer" in result
assert "roles/editor" in result
assert "[email protected]" in result
def test_list_storage_buckets(self):
"""Test the list_storage_buckets function."""
# Call the function
result = list_storage_buckets("test-project")
# Assertions
assert isinstance(result, str)
assert "test-bucket-1" in result
assert "test-bucket-2" in result
assert "us-central1" in result
assert "us-east1" in result
assert "STANDARD" in result
assert "NEARLINE" in result
assert "2023-01-01 00:00:00 UTC" in result
assert "2023-02-01 00:00:00 UTC" in result
def test_get_billing_info(self):
"""Test the get_billing_info function."""
# Call the function
result = get_billing_info("test-project")
# Assertions
assert isinstance(result, str)
assert "123456-ABCDEF-123456" in result
assert "My Billing Account" in result
assert "Open" in result
assert "Yes" in result # billing_enabled
assert "USD" in result
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/resource_management/tools.py:
--------------------------------------------------------------------------------
```python
"""
Google Cloud Platform Resource Management tools.
"""
from typing import List, Dict, Any, Optional
def register_tools(mcp):
"""Register all resource management tools with the MCP server."""
@mcp.tool()
def list_gcp_projects():
"""
List all available GCP projects for the authenticated user.
Returns:
List of project IDs
"""
try:
from google.cloud import resourcemanager_v3
client = resourcemanager_v3.ProjectsClient()
request = resourcemanager_v3.SearchProjectsRequest()
response = client.search_projects(request=request)
return [project.project_id for project in response]
except Exception as e:
return [f"Error listing GCP projects: {str(e)}"]
@mcp.tool()
def get_gcp_project_details(project_id: str) -> str:
"""
Get detailed information about a specific GCP project.
Args:
project_id: The ID of the GCP project to get details for
Returns:
Detailed information about the specified GCP project
"""
try:
from google.cloud import resourcemanager_v3
# Initialize the Resource Manager client
client = resourcemanager_v3.ProjectsClient()
# Get the project details
name = f"projects/{project_id}"
project = client.get_project(name=name)
# Format the response
project_number = project.name.split('/')[-1] if project.name else "N/A"
display_name = project.display_name or "N/A"
create_time = project.create_time.isoformat() if project.create_time else "N/A"
state = project.state.name if project.state else "N/A"
labels = dict(project.labels) if project.labels else {}
labels_str = "\n".join([f" {k}: {v}" for k, v in labels.items()]) if labels else " None"
return f"""
GCP Project Details for {project_id}:
Project Number: {project_number}
Name: {display_name}
Creation Time: {create_time}
State: {state}
Labels:
{labels_str}
"""
except Exception as e:
return f"Error getting GCP project details: {str(e)}"
@mcp.tool()
def list_assets(project_id: str, asset_types: Optional[List[str]] = None, page_size: int = 50) -> str:
"""
List assets in a GCP project using Cloud Asset Inventory API.
Args:
project_id: The ID of the GCP project to list assets for
asset_types: Optional list of asset types to filter by (e.g., ["compute.googleapis.com/Instance"])
page_size: Number of assets to return per page (default: 50, max: 1000)
Returns:
List of assets in the specified GCP project
"""
try:
try:
from google.cloud import asset_v1
except ImportError:
return "Error: The Google Cloud Asset Inventory library is not installed. Please install it with 'pip install google-cloud-asset'."
# Initialize the Asset client
client = asset_v1.AssetServiceClient()
# Format the parent resource
parent = f"projects/{project_id}"
# Create the request
request = asset_v1.ListAssetsRequest(
parent=parent,
content_type=asset_v1.ContentType.RESOURCE,
page_size=min(page_size, 1000) # API limit is 1000
)
# Add asset types filter if provided
if asset_types:
request.asset_types = asset_types
# List assets
response = client.list_assets(request=request)
# Format the response
assets_list = []
for asset in response:
asset_type = asset.asset_type
name = asset.name
display_name = asset.display_name if hasattr(asset, 'display_name') and asset.display_name else name.split('/')[-1]
# Extract location if available
location = "global"
if hasattr(asset.resource, 'location') and asset.resource.location:
location = asset.resource.location
assets_list.append(f"- {display_name} ({asset_type})\n Location: {location}\n Name: {name}")
if not assets_list:
filter_msg = f" with types {asset_types}" if asset_types else ""
return f"No assets found{filter_msg} in project {project_id}."
# Add pagination info if there's a next page token
pagination_info = ""
if hasattr(response, 'next_page_token') and response.next_page_token:
pagination_info = "\n\nMore assets are available. Refine your search or increase page_size to see more."
return f"Assets in GCP Project {project_id}:\n\n" + "\n\n".join(assets_list) + pagination_info
except Exception as e:
return f"Error listing assets: {str(e)}"
@mcp.tool()
def set_quota_project(project_id: str) -> str:
"""
Set a quota project for Google Cloud API requests.
This helps resolve the warning: "Your application has authenticated using end user credentials
from Google Cloud SDK without a quota project."
Args:
project_id: The ID of the GCP project to use for quota attribution
Returns:
Confirmation message if successful, error message otherwise
"""
try:
try:
import google.auth
from google.auth import exceptions
import os
except ImportError:
return "Error: Required libraries not installed. Please install with 'pip install google-auth'."
# Set the quota project in the environment variable
os.environ["GOOGLE_CLOUD_QUOTA_PROJECT"] = project_id
# Try to get credentials with the quota project
try:
# Get the current credentials
credentials, project = google.auth.default()
# Set the quota project on the credentials if supported
if hasattr(credentials, "with_quota_project"):
credentials = credentials.with_quota_project(project_id)
# Save the credentials back (this depends on the credential type)
# This is a best-effort approach
try:
if hasattr(google.auth, "_default_credentials"):
google.auth._default_credentials = credentials
except:
pass
return f"Successfully set quota project to '{project_id}'. New API requests will use this project for quota attribution."
else:
return f"Set environment variable GOOGLE_CLOUD_QUOTA_PROJECT={project_id}, but your credential type doesn't support quota projects directly."
except exceptions.GoogleAuthError as e:
return f"Error setting quota project: {str(e)}"
except Exception as e:
return f"Error setting quota project: {str(e)}"
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/storage/tools.py:
--------------------------------------------------------------------------------
```python
"""
Google Cloud Platform Storage tools.
"""
from typing import List, Dict, Any, Optional
def register_tools(mcp):
"""Register all storage tools with the MCP server."""
@mcp.tool()
def list_storage_buckets(project_id: str) -> str:
"""
List Cloud Storage buckets in a GCP project.
Args:
project_id: The ID of the GCP project to list buckets for
Returns:
List of Cloud Storage buckets in the specified GCP project
"""
try:
from google.cloud import storage
# Initialize the Storage client
client = storage.Client(project=project_id)
# List buckets
buckets = client.list_buckets()
# Format the response
buckets_list = []
for bucket in buckets:
location = bucket.location or "Unknown"
storage_class = bucket.storage_class or "Unknown"
created = bucket.time_created.strftime("%Y-%m-%d %H:%M:%S UTC") if bucket.time_created else "Unknown"
buckets_list.append(f"- {bucket.name} (Location: {location}, Class: {storage_class}, Created: {created})")
if not buckets_list:
return f"No Cloud Storage buckets found in project {project_id}."
buckets_str = "\n".join(buckets_list)
return f"""
Cloud Storage Buckets in GCP Project {project_id}:
{buckets_str}
"""
except Exception as e:
return f"Error listing Cloud Storage buckets: {str(e)}"
@mcp.tool()
def get_bucket_details(project_id: str, bucket_name: str) -> str:
"""
Get detailed information about a specific Cloud Storage bucket.
Args:
project_id: The ID of the GCP project
bucket_name: The name of the bucket to get details for
Returns:
Detailed information about the specified Cloud Storage bucket
"""
try:
from google.cloud import storage
# Initialize the Storage client
client = storage.Client(project=project_id)
# Get the bucket
bucket = client.get_bucket(bucket_name)
# Format the response
details = []
details.append(f"Name: {bucket.name}")
details.append(f"Project: {project_id}")
details.append(f"Location: {bucket.location or 'Unknown'}")
details.append(f"Storage Class: {bucket.storage_class or 'Unknown'}")
details.append(f"Created: {bucket.time_created.strftime('%Y-%m-%d %H:%M:%S UTC') if bucket.time_created else 'Unknown'}")
details.append(f"Versioning Enabled: {bucket.versioning_enabled}")
details.append(f"Requester Pays: {bucket.requester_pays}")
details.append(f"Lifecycle Rules: {len(bucket.lifecycle_rules) if bucket.lifecycle_rules else 0} rules defined")
details.append(f"Labels: {bucket.labels or 'None'}")
details.append(f"CORS: {bucket.cors or 'None'}")
details_str = "\n".join(details)
return f"""
Cloud Storage Bucket Details:
{details_str}
"""
except Exception as e:
return f"Error getting bucket details: {str(e)}"
@mcp.tool()
def list_objects(project_id: str, bucket_name: str, prefix: Optional[str] = None, limit: int = 100) -> str:
"""
List objects in a Cloud Storage bucket.
Args:
project_id: The ID of the GCP project
bucket_name: The name of the bucket to list objects from
prefix: Optional prefix to filter objects by
limit: Maximum number of objects to list (default: 100)
Returns:
List of objects in the specified Cloud Storage bucket
"""
try:
from google.cloud import storage
# Initialize the Storage client
client = storage.Client(project=project_id)
# Get the bucket
bucket = client.get_bucket(bucket_name)
# List blobs
blobs = bucket.list_blobs(prefix=prefix, max_results=limit)
# Format the response
objects_list = []
for blob in blobs:
size_mb = blob.size / (1024 * 1024)
updated = blob.updated.strftime("%Y-%m-%d %H:%M:%S UTC") if blob.updated else "Unknown"
objects_list.append(f"- {blob.name} (Size: {size_mb:.2f} MB, Updated: {updated}, Content-Type: {blob.content_type})")
if not objects_list:
return f"No objects found in bucket {bucket_name}{' with prefix ' + prefix if prefix else ''}."
objects_str = "\n".join(objects_list)
return f"""
Objects in Cloud Storage Bucket {bucket_name}{' with prefix ' + prefix if prefix else ''}:
{objects_str}
"""
except Exception as e:
return f"Error listing objects: {str(e)}"
@mcp.tool()
def upload_object(project_id: str, bucket_name: str, source_file_path: str, destination_blob_name: Optional[str] = None, content_type: Optional[str] = None) -> str:
"""
Upload a file to a Cloud Storage bucket.
Args:
project_id: The ID of the GCP project
bucket_name: The name of the bucket to upload to
source_file_path: The local file path to upload
destination_blob_name: The name to give the file in GCS (default: filename from source)
content_type: The content type of the file (default: auto-detect)
Returns:
Result of the upload operation
"""
try:
import os
from google.cloud import storage
# Initialize the Storage client
client = storage.Client(project=project_id)
# Get the bucket
bucket = client.get_bucket(bucket_name)
# If no destination name is provided, use the source filename
if not destination_blob_name:
destination_blob_name = os.path.basename(source_file_path)
# Create a blob object
blob = bucket.blob(destination_blob_name)
# Upload the file
blob.upload_from_filename(source_file_path, content_type=content_type)
return f"""
File successfully uploaded:
- Source: {source_file_path}
- Destination: gs://{bucket_name}/{destination_blob_name}
- Size: {blob.size / (1024 * 1024):.2f} MB
- Content-Type: {blob.content_type}
"""
except Exception as e:
return f"Error uploading file: {str(e)}"
@mcp.tool()
def download_object(project_id: str, bucket_name: str, source_blob_name: str, destination_file_path: str) -> str:
"""
Download a file from a Cloud Storage bucket.
Args:
project_id: The ID of the GCP project
bucket_name: The name of the bucket to download from
source_blob_name: The name of the file in the bucket
destination_file_path: The local path to save the file to
Returns:
Result of the download operation
"""
try:
from google.cloud import storage
# Initialize the Storage client
client = storage.Client(project=project_id)
# Get the bucket
bucket = client.get_bucket(bucket_name)
# Get the blob
blob = bucket.blob(source_blob_name)
# Download the file
blob.download_to_filename(destination_file_path)
return f"""
File successfully downloaded:
- Source: gs://{bucket_name}/{source_blob_name}
- Destination: {destination_file_path}
- Size: {blob.size / (1024 * 1024):.2f} MB
- Content-Type: {blob.content_type}
"""
except Exception as e:
return f"Error downloading file: {str(e)}"
@mcp.tool()
def delete_object(project_id: str, bucket_name: str, blob_name: str) -> str:
"""
Delete an object from a Cloud Storage bucket.
Args:
project_id: The ID of the GCP project
bucket_name: The name of the bucket to delete from
blob_name: The name of the file to delete
Returns:
Result of the delete operation
"""
try:
from google.cloud import storage
# Initialize the Storage client
client = storage.Client(project=project_id)
# Get the bucket
bucket = client.get_bucket(bucket_name)
# Delete the blob
blob = bucket.blob(blob_name)
blob.delete()
return f"Object gs://{bucket_name}/{blob_name} has been successfully deleted."
except Exception as e:
return f"Error deleting object: {str(e)}"
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/iam/tools.py:
--------------------------------------------------------------------------------
```python
"""
Google Cloud Platform IAM tools.
"""
from typing import List, Dict, Any, Optional
def register_tools(mcp):
"""Register all IAM tools with the MCP server."""
@mcp.tool()
def check_iam_permissions(project_id: str) -> str:
"""
Check IAM permissions for the current user in a GCP project.
Args:
project_id: The ID of the GCP project to check permissions for
Returns:
List of IAM permissions for the current user in the specified GCP project
"""
try:
from google.cloud import resourcemanager_v3
from google.iam.v1 import iam_policy_pb2
# Initialize the Resource Manager client
client = resourcemanager_v3.ProjectsClient()
# Get the IAM policy for the project
request = iam_policy_pb2.GetIamPolicyRequest(
resource=f"projects/{project_id}"
)
policy = client.get_iam_policy(request=request)
# Get the current user
import google.auth
credentials, _ = google.auth.default()
user = credentials.service_account_email if hasattr(credentials, 'service_account_email') else "current user"
# Check which roles the user has
user_bindings = []
for binding in policy.bindings:
role = binding.role
members = binding.members
# Check if the current user is in the members list
for member in members:
if member == f"user:{user}" or member == "serviceAccount:{user}" or member == "allUsers" or member == "allAuthenticatedUsers":
user_bindings.append(f"- {role}")
break
if not user_bindings:
return f"No explicit IAM permissions found for {user} in project {project_id}."
user_bindings_str = "\n".join(user_bindings)
return f"""
IAM Permissions for {user} in GCP Project {project_id}:
{user_bindings_str}
"""
except Exception as e:
return f"Error checking IAM permissions: {str(e)}"
@mcp.tool()
def list_roles(project_id: Optional[str] = None) -> str:
"""
List IAM roles (predefined or custom).
Args:
project_id: Optional project ID for listing custom roles. If not provided, lists predefined roles.
Returns:
List of IAM roles
"""
try:
from google.cloud import iam_v1
# Initialize the IAM client
client = iam_v1.IAMClient()
roles_list = []
if project_id:
# List custom roles for the project
request = iam_v1.ListRolesRequest(
parent=f"projects/{project_id}",
view=iam_v1.ListRolesRequest.RoleView.FULL
)
roles = client.list_roles(request=request)
for role in roles:
description = role.description or "No description"
roles_list.append(f"- {role.name} - {description}")
if not roles_list:
return f"No custom IAM roles found in project {project_id}."
return f"""
Custom IAM Roles in GCP Project {project_id}:
{chr(10).join(roles_list)}
"""
else:
# List predefined roles
request = iam_v1.ListRolesRequest(
view=iam_v1.ListRolesRequest.RoleView.BASIC
)
roles = client.list_roles(request=request)
for role in roles:
if role.name.startswith("roles/"):
description = role.description or "No description"
roles_list.append(f"- {role.name} - {description}")
if not roles_list:
return "No predefined IAM roles found."
return f"""
Predefined IAM Roles in GCP:
{chr(10).join(roles_list[:100])}
(Limited to 100 roles. To see more specific roles, narrow your search criteria.)
"""
except Exception as e:
return f"Error listing IAM roles: {str(e)}"
@mcp.tool()
def get_role_permissions(role_name: str, project_id: Optional[str] = None) -> str:
"""
Get detailed information about an IAM role, including its permissions.
Args:
role_name: The name of the role (e.g., "roles/compute.admin" or "projects/my-project/roles/myCustomRole")
project_id: Optional project ID for custom roles. Not needed if role_name is fully qualified.
Returns:
Detailed information about the IAM role
"""
try:
from google.cloud import iam_v1
# Initialize the IAM client
client = iam_v1.IAMClient()
# If project_id is provided and role_name doesn't include it, create fully qualified role name
if project_id and not role_name.startswith("projects/") and not role_name.startswith("roles/"):
role_name = f"projects/{project_id}/roles/{role_name}"
elif not role_name.startswith("projects/") and not role_name.startswith("roles/"):
role_name = f"roles/{role_name}"
# Get role details
request = iam_v1.GetRoleRequest(name=role_name)
role = client.get_role(request=request)
details = []
details.append(f"Name: {role.name}")
details.append(f"Title: {role.title}")
details.append(f"Description: {role.description or 'No description'}")
if role.included_permissions:
permissions_str = "\n".join([f"- {permission}" for permission in role.included_permissions])
details.append(f"Permissions ({len(role.included_permissions)}):\n{permissions_str}")
else:
details.append("Permissions: None")
if hasattr(role, 'stage'):
details.append(f"Stage: {role.stage}")
if hasattr(role, 'etag'):
details.append(f"ETag: {role.etag}")
return f"""
IAM Role Details for {role.name}:
{chr(10).join(details)}
"""
except Exception as e:
return f"Error getting role permissions: {str(e)}"
@mcp.tool()
def list_service_accounts(project_id: str) -> str:
"""
List service accounts in a GCP project.
Args:
project_id: The ID of the GCP project
Returns:
List of service accounts in the project
"""
try:
from google.cloud import iam_v1
# Initialize the IAM client
client = iam_v1.IAMClient()
# List service accounts
request = iam_v1.ListServiceAccountsRequest(
name=f"projects/{project_id}"
)
service_accounts = client.list_service_accounts(request=request)
accounts_list = []
for account in service_accounts:
display_name = account.display_name or "No display name"
accounts_list.append(f"- {account.email} ({display_name})")
if not accounts_list:
return f"No service accounts found in project {project_id}."
accounts_str = "\n".join(accounts_list)
return f"""
Service Accounts in GCP Project {project_id}:
{accounts_str}
"""
except Exception as e:
return f"Error listing service accounts: {str(e)}"
@mcp.tool()
def create_service_account(project_id: str, account_id: str, display_name: str, description: Optional[str] = None) -> str:
"""
Create a new service account in a GCP project.
Args:
project_id: The ID of the GCP project
account_id: The ID for the service account (must be between 6 and 30 characters)
display_name: A user-friendly name for the service account
description: Optional description for the service account
Returns:
Result of the service account creation
"""
try:
from google.cloud import iam_v1
# Initialize the IAM client
client = iam_v1.IAMClient()
# Create service account
request = iam_v1.CreateServiceAccountRequest(
name=f"projects/{project_id}",
account_id=account_id,
service_account=iam_v1.ServiceAccount(
display_name=display_name,
description=description
)
)
service_account = client.create_service_account(request=request)
return f"""
Service Account created successfully:
- Email: {service_account.email}
- Name: {service_account.name}
- Display Name: {service_account.display_name}
- Description: {service_account.description or 'None'}
"""
except Exception as e:
return f"Error creating service account: {str(e)}"
@mcp.tool()
def add_iam_policy_binding(project_id: str, role: str, member: str) -> str:
"""
Add an IAM policy binding to a GCP project.
Args:
project_id: The ID of the GCP project
role: The role to grant (e.g., "roles/compute.admin")
member: The member to grant the role to (e.g., "user:[email protected]", "serviceAccount:[email protected]")
Returns:
Result of the policy binding operation
"""
try:
from google.cloud import resourcemanager_v3
from google.iam.v1 import iam_policy_pb2, policy_pb2
# Initialize the Resource Manager client
client = resourcemanager_v3.ProjectsClient()
# Get the current IAM policy
get_request = iam_policy_pb2.GetIamPolicyRequest(
resource=f"projects/{project_id}"
)
policy = client.get_iam_policy(request=get_request)
# Check if the binding already exists
binding_exists = False
for binding in policy.bindings:
if binding.role == role and member in binding.members:
binding_exists = True
break
if binding_exists:
return f"IAM policy binding already exists: {member} already has role {role} in project {project_id}."
# Add the new binding
binding = policy_pb2.Binding()
binding.role = role
binding.members.append(member)
policy.bindings.append(binding)
# Set the updated IAM policy
set_request = iam_policy_pb2.SetIamPolicyRequest(
resource=f"projects/{project_id}",
policy=policy
)
updated_policy = client.set_iam_policy(request=set_request)
return f"""
IAM policy binding added successfully:
- Project: {project_id}
- Role: {role}
- Member: {member}
"""
except Exception as e:
return f"Error adding IAM policy binding: {str(e)}"
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/databases/tools.py:
--------------------------------------------------------------------------------
```python
"""
Google Cloud Platform Database tools.
"""
from typing import List, Dict, Any, Optional
def register_tools(mcp):
"""Register all database tools with the MCP server."""
@mcp.tool()
def list_cloud_sql_instances(project_id: str) -> str:
"""
List Cloud SQL instances in a GCP project.
Args:
project_id: The ID of the GCP project to list Cloud SQL instances for
Returns:
List of Cloud SQL instances in the specified GCP project
"""
try:
from googleapiclient import discovery
# Initialize the Cloud SQL Admin API client
service = discovery.build('sqladmin', 'v1')
# List SQL instances
request = service.instances().list(project=project_id)
response = request.execute()
# Format the response
instances_list = []
if 'items' in response:
for instance in response['items']:
name = instance.get('name', 'Unknown')
db_version = instance.get('databaseVersion', 'Unknown')
state = instance.get('state', 'Unknown')
region = instance.get('region', 'Unknown')
tier = instance.get('settings', {}).get('tier', 'Unknown')
storage_size = instance.get('settings', {}).get('dataDiskSizeGb', 'Unknown')
instances_list.append(f"- {name} (Type: {db_version}, Region: {region}, Tier: {tier}, Storage: {storage_size}GB, State: {state})")
if not instances_list:
return f"No Cloud SQL instances found in project {project_id}."
instances_str = "\n".join(instances_list)
return f"""
Cloud SQL Instances in GCP Project {project_id}:
{instances_str}
"""
except Exception as e:
return f"Error listing Cloud SQL instances: {str(e)}"
@mcp.tool()
def get_sql_instance_details(project_id: str, instance_id: str) -> str:
"""
Get detailed information about a specific Cloud SQL instance.
Args:
project_id: The ID of the GCP project
instance_id: The ID of the Cloud SQL instance
Returns:
Detailed information about the specified Cloud SQL instance
"""
try:
from googleapiclient import discovery
# Initialize the Cloud SQL Admin API client
service = discovery.build('sqladmin', 'v1')
# Get instance details
request = service.instances().get(project=project_id, instance=instance_id)
instance = request.execute()
# Format the response
details = []
details.append(f"Name: {instance.get('name', 'Unknown')}")
details.append(f"Self Link: {instance.get('selfLink', 'Unknown')}")
details.append(f"Database Version: {instance.get('databaseVersion', 'Unknown')}")
details.append(f"State: {instance.get('state', 'Unknown')}")
details.append(f"Region: {instance.get('region', 'Unknown')}")
# Settings information
if 'settings' in instance:
settings = instance['settings']
details.append(f"Tier: {settings.get('tier', 'Unknown')}")
details.append(f"Storage Size: {settings.get('dataDiskSizeGb', 'Unknown')}GB")
details.append(f"Availability Type: {settings.get('availabilityType', 'Unknown')}")
# Backup configuration
if 'backupConfiguration' in settings:
backup = settings['backupConfiguration']
enabled = backup.get('enabled', False)
details.append(f"Automated Backups: {'Enabled' if enabled else 'Disabled'}")
if enabled:
details.append(f"Backup Start Time: {backup.get('startTime', 'Unknown')}")
details.append(f"Binary Log Enabled: {backup.get('binaryLogEnabled', False)}")
# IP configuration
if 'ipConfiguration' in settings:
ip_config = settings['ipConfiguration']
public_ip = "Enabled" if not ip_config.get('privateNetwork') else "Disabled"
details.append(f"Public IP: {public_ip}")
if 'authorizedNetworks' in ip_config:
networks = []
for network in ip_config['authorizedNetworks']:
name = network.get('name', 'Unnamed')
value = network.get('value', 'Unknown')
networks.append(f" - {name}: {value}")
if networks:
details.append("Authorized Networks:")
details.extend(networks)
# IP Addresses
if 'ipAddresses' in instance:
ip_addresses = []
for ip in instance['ipAddresses']:
ip_type = ip.get('type', 'Unknown')
ip_address = ip.get('ipAddress', 'Unknown')
ip_addresses.append(f" - {ip_type}: {ip_address}")
if ip_addresses:
details.append("IP Addresses:")
details.extend(ip_addresses)
details_str = "\n".join(details)
return f"""
Cloud SQL Instance Details:
{details_str}
"""
except Exception as e:
return f"Error getting SQL instance details: {str(e)}"
@mcp.tool()
def list_databases(project_id: str, instance_id: str) -> str:
"""
List databases in a Cloud SQL instance.
Args:
project_id: The ID of the GCP project
instance_id: The ID of the Cloud SQL instance
Returns:
List of databases in the specified Cloud SQL instance
"""
try:
from googleapiclient import discovery
# Initialize the Cloud SQL Admin API client
service = discovery.build('sqladmin', 'v1')
# List databases
request = service.databases().list(project=project_id, instance=instance_id)
response = request.execute()
# Format the response
databases_list = []
if 'items' in response:
for database in response['items']:
name = database.get('name', 'Unknown')
charset = database.get('charset', 'Unknown')
collation = database.get('collation', 'Unknown')
databases_list.append(f"- {name} (Charset: {charset}, Collation: {collation})")
if not databases_list:
return f"No databases found in Cloud SQL instance {instance_id}."
databases_str = "\n".join(databases_list)
return f"""
Databases in Cloud SQL Instance {instance_id}:
{databases_str}
"""
except Exception as e:
return f"Error listing databases: {str(e)}"
@mcp.tool()
def create_backup(project_id: str, instance_id: str, description: Optional[str] = None) -> str:
"""
Create a backup for a Cloud SQL instance.
Args:
project_id: The ID of the GCP project
instance_id: The ID of the Cloud SQL instance
description: Optional description for the backup
Returns:
Result of the backup operation
"""
try:
from googleapiclient import discovery
# Initialize the Cloud SQL Admin API client
service = discovery.build('sqladmin', 'v1')
# Create backup
backup_run_body = {}
if description:
backup_run_body['description'] = description
request = service.backupRuns().insert(project=project_id, instance=instance_id, body=backup_run_body)
operation = request.execute()
# Get operation ID and status
operation_id = operation.get('name', 'Unknown')
status = operation.get('status', 'Unknown')
return f"""
Backup operation initiated:
- Instance: {instance_id}
- Project: {project_id}
- Description: {description or 'None provided'}
Operation ID: {operation_id}
Status: {status}
The backup process may take some time to complete. You can check the status of the backup using the Cloud SQL Admin API or Google Cloud Console.
"""
except Exception as e:
return f"Error creating backup: {str(e)}"
@mcp.tool()
def list_firestore_databases(project_id: str) -> str:
"""
List Firestore databases in a GCP project.
Args:
project_id: The ID of the GCP project to list Firestore databases for
Returns:
List of Firestore databases in the specified GCP project
"""
try:
from google.cloud import firestore_admin_v1
# Initialize the Firestore Admin client
client = firestore_admin_v1.FirestoreAdminClient()
# List databases
parent = f"projects/{project_id}"
databases = client.list_databases(parent=parent)
# Format the response
databases_list = []
for database in databases:
name = database.name.split('/')[-1]
db_type = "Firestore Native" if database.type_ == firestore_admin_v1.Database.DatabaseType.FIRESTORE_NATIVE else "Datastore Mode"
location = database.location_id
databases_list.append(f"- {name} (Type: {db_type}, Location: {location})")
if not databases_list:
return f"No Firestore databases found in project {project_id}."
databases_str = "\n".join(databases_list)
return f"""
Firestore Databases in GCP Project {project_id}:
{databases_str}
"""
except Exception as e:
return f"Error listing Firestore databases: {str(e)}"
@mcp.tool()
def list_firestore_collections(project_id: str, database_id: str = "(default)") -> str:
"""
List collections in a Firestore database.
Args:
project_id: The ID of the GCP project
database_id: The ID of the Firestore database (default is "(default)")
Returns:
List of collections in the specified Firestore database
"""
try:
from google.cloud import firestore
# Initialize the Firestore client
client = firestore.Client(project=project_id, database=database_id)
# List collections
collections = client.collections()
# Format the response
collections_list = []
for collection in collections:
collections_list.append(f"- {collection.id}")
if not collections_list:
return f"No collections found in Firestore database {database_id}."
collections_str = "\n".join(collections_list)
return f"""
Collections in Firestore Database {database_id} (Project: {project_id}):
{collections_str}
"""
except Exception as e:
return f"Error listing Firestore collections: {str(e)}"
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/kubernetes/tools.py:
--------------------------------------------------------------------------------
```python
"""
Google Cloud Platform Kubernetes Engine (GKE) tools.
"""
from typing import List, Dict, Any, Optional
def register_tools(mcp):
"""Register all kubernetes tools with the MCP server."""
@mcp.tool()
def list_gke_clusters(project_id: str, region: str = "") -> str:
"""
List Google Kubernetes Engine (GKE) clusters in a GCP project.
Args:
project_id: The ID of the GCP project to list GKE clusters for
region: Optional region to filter clusters (e.g., "us-central1")
Returns:
List of GKE clusters in the specified GCP project
"""
try:
from google.cloud import container_v1
# Initialize the GKE client
client = container_v1.ClusterManagerClient()
clusters_list = []
if region:
# List clusters in the specified region
parent = f"projects/{project_id}/locations/{region}"
response = client.list_clusters(parent=parent)
for cluster in response.clusters:
version = cluster.current_master_version
node_count = sum(pool.initial_node_count for pool in cluster.node_pools)
status = "Running" if cluster.status == container_v1.Cluster.Status.RUNNING else cluster.status.name
clusters_list.append(f"- {cluster.name} (Region: {region}, Version: {version}, Nodes: {node_count}, Status: {status})")
else:
# List clusters in all regions
from google.cloud import compute_v1
# Get all regions
regions_client = compute_v1.RegionsClient()
regions_request = compute_v1.ListRegionsRequest(project=project_id)
regions = regions_client.list(request=regions_request)
for region_item in regions:
region_name = region_item.name
parent = f"projects/{project_id}/locations/{region_name}"
try:
response = client.list_clusters(parent=parent)
for cluster in response.clusters:
version = cluster.current_master_version
node_count = sum(pool.initial_node_count for pool in cluster.node_pools)
status = "Running" if cluster.status == container_v1.Cluster.Status.RUNNING else cluster.status.name
clusters_list.append(f"- {cluster.name} (Region: {region_name}, Version: {version}, Nodes: {node_count}, Status: {status})")
except Exception:
# Skip regions where we can't list clusters
continue
# Also check zonal clusters
zones_client = compute_v1.ZonesClient()
zones_request = compute_v1.ListZonesRequest(project=project_id)
zones = zones_client.list(request=zones_request)
for zone_item in zones:
zone_name = zone_item.name
parent = f"projects/{project_id}/locations/{zone_name}"
try:
response = client.list_clusters(parent=parent)
for cluster in response.clusters:
version = cluster.current_master_version
node_count = sum(pool.initial_node_count for pool in cluster.node_pools)
status = "Running" if cluster.status == container_v1.Cluster.Status.RUNNING else cluster.status.name
clusters_list.append(f"- {cluster.name} (Zone: {zone_name}, Version: {version}, Nodes: {node_count}, Status: {status})")
except Exception:
# Skip zones where we can't list clusters
continue
if not clusters_list:
region_msg = f" in region {region}" if region else ""
return f"No GKE clusters found{region_msg} for project {project_id}."
clusters_str = "\n".join(clusters_list)
region_msg = f" in region {region}" if region else ""
return f"""
Google Kubernetes Engine (GKE) Clusters{region_msg} in GCP Project {project_id}:
{clusters_str}
"""
except Exception as e:
return f"Error listing GKE clusters: {str(e)}"
@mcp.tool()
def get_cluster_details(project_id: str, cluster_name: str, location: str) -> str:
"""
Get detailed information about a specific GKE cluster.
Args:
project_id: The ID of the GCP project
cluster_name: The name of the GKE cluster
location: The location (region or zone) of the cluster
Returns:
Detailed information about the specified GKE cluster
"""
try:
from google.cloud import container_v1
# Initialize the GKE client
client = container_v1.ClusterManagerClient()
# Get cluster details
cluster_path = f"projects/{project_id}/locations/{location}/clusters/{cluster_name}"
cluster = client.get_cluster(name=cluster_path)
# Format the response
details = []
details.append(f"Name: {cluster.name}")
details.append(f"Description: {cluster.description or 'None'}")
details.append(f"Location: {location}")
details.append(f"Location Type: {'Regional' if '-' not in location else 'Zonal'}")
details.append(f"Status: {'Running' if cluster.status == container_v1.Cluster.Status.RUNNING else cluster.status.name}")
details.append(f"Kubernetes Version: {cluster.current_master_version}")
details.append(f"Network: {cluster.network}")
details.append(f"Subnetwork: {cluster.subnetwork}")
details.append(f"Cluster CIDR: {cluster.cluster_ipv4_cidr}")
details.append(f"Services CIDR: {cluster.services_ipv4_cidr}")
details.append(f"Endpoint: {cluster.endpoint}")
# Add Node Pools information
node_pools = []
for pool in cluster.node_pools:
machine_type = pool.config.machine_type
disk_size_gb = pool.config.disk_size_gb
autoscaling = "Enabled" if pool.autoscaling and pool.autoscaling.enabled else "Disabled"
min_nodes = pool.autoscaling.min_node_count if pool.autoscaling and pool.autoscaling.enabled else "N/A"
max_nodes = pool.autoscaling.max_node_count if pool.autoscaling and pool.autoscaling.enabled else "N/A"
initial_nodes = pool.initial_node_count
node_pools.append(f" - {pool.name} (Machine: {machine_type}, Disk: {disk_size_gb}GB, Initial Nodes: {initial_nodes})")
if autoscaling == "Enabled":
node_pools.append(f" Autoscaling: {autoscaling} (Min: {min_nodes}, Max: {max_nodes})")
if node_pools:
details.append(f"Node Pools ({len(cluster.node_pools)}):\n" + "\n".join(node_pools))
# Add Addons information
addons = []
if cluster.addons_config:
config = cluster.addons_config
addons.append(f" - HTTP Load Balancing: {'Enabled' if not config.http_load_balancing or not config.http_load_balancing.disabled else 'Disabled'}")
addons.append(f" - Horizontal Pod Autoscaling: {'Enabled' if not config.horizontal_pod_autoscaling or not config.horizontal_pod_autoscaling.disabled else 'Disabled'}")
addons.append(f" - Kubernetes Dashboard: {'Enabled' if not config.kubernetes_dashboard or not config.kubernetes_dashboard.disabled else 'Disabled'}")
addons.append(f" - Network Policy: {'Enabled' if config.network_policy_config and not config.network_policy_config.disabled else 'Disabled'}")
if addons:
details.append(f"Addons:\n" + "\n".join(addons))
details_str = "\n".join(details)
return f"""
GKE Cluster Details:
{details_str}
"""
except Exception as e:
return f"Error getting cluster details: {str(e)}"
@mcp.tool()
def list_node_pools(project_id: str, cluster_name: str, location: str) -> str:
"""
List node pools in a GKE cluster.
Args:
project_id: The ID of the GCP project
cluster_name: The name of the GKE cluster
location: The location (region or zone) of the cluster
Returns:
List of node pools in the specified GKE cluster
"""
try:
from google.cloud import container_v1
# Initialize the GKE client
client = container_v1.ClusterManagerClient()
# List node pools
cluster_path = f"projects/{project_id}/locations/{location}/clusters/{cluster_name}"
node_pools = client.list_node_pools(parent=cluster_path)
# Format the response
pools_list = []
for pool in node_pools.node_pools:
machine_type = pool.config.machine_type
disk_size_gb = pool.config.disk_size_gb
autoscaling = "Enabled" if pool.autoscaling and pool.autoscaling.enabled else "Disabled"
min_nodes = pool.autoscaling.min_node_count if pool.autoscaling and pool.autoscaling.enabled else "N/A"
max_nodes = pool.autoscaling.max_node_count if pool.autoscaling and pool.autoscaling.enabled else "N/A"
initial_nodes = pool.initial_node_count
pool_info = [
f"- {pool.name}:",
f" Machine Type: {machine_type}",
f" Disk Size: {disk_size_gb}GB",
f" Initial Node Count: {initial_nodes}",
f" Autoscaling: {autoscaling}"
]
if autoscaling == "Enabled":
pool_info.append(f" Min Nodes: {min_nodes}")
pool_info.append(f" Max Nodes: {max_nodes}")
if pool.config.labels:
labels = [f"{k}: {v}" for k, v in pool.config.labels.items()]
pool_info.append(f" Labels: {', '.join(labels)}")
pools_list.append("\n".join(pool_info))
if not pools_list:
return f"No node pools found in GKE cluster {cluster_name} in location {location}."
pools_str = "\n".join(pools_list)
return f"""
Node Pools in GKE Cluster {cluster_name} (Location: {location}):
{pools_str}
"""
except Exception as e:
return f"Error listing node pools: {str(e)}"
@mcp.tool()
def resize_node_pool(project_id: str, cluster_name: str, location: str, node_pool_name: str, node_count: int) -> str:
"""
Resize a node pool in a GKE cluster.
Args:
project_id: The ID of the GCP project
cluster_name: The name of the GKE cluster
location: The location (region or zone) of the cluster
node_pool_name: The name of the node pool to resize
node_count: The new node count for the pool
Returns:
Result of the node pool resize operation
"""
try:
from google.cloud import container_v1
# Initialize the GKE client
client = container_v1.ClusterManagerClient()
# Create the node pool path
node_pool_path = f"projects/{project_id}/locations/{location}/clusters/{cluster_name}/nodePools/{node_pool_name}"
# Get the current node pool
node_pool = client.get_node_pool(name=node_pool_path)
current_node_count = node_pool.initial_node_count
# Check if autoscaling is enabled
if node_pool.autoscaling and node_pool.autoscaling.enabled:
return f"""
Cannot resize node pool {node_pool_name} because autoscaling is enabled.
To manually set the node count, you must first disable autoscaling for this node pool.
Current autoscaling settings:
- Min nodes: {node_pool.autoscaling.min_node_count}
- Max nodes: {node_pool.autoscaling.max_node_count}
"""
# Resize the node pool
request = container_v1.SetNodePoolSizeRequest(
name=node_pool_path,
node_count=node_count
)
operation = client.set_node_pool_size(request=request)
return f"""
Node pool resize operation initiated:
- Cluster: {cluster_name}
- Location: {location}
- Node Pool: {node_pool_name}
- Current Node Count: {current_node_count}
- New Node Count: {node_count}
Operation ID: {operation.name}
Status: {operation.status.name if hasattr(operation.status, 'name') else operation.status}
"""
except Exception as e:
return f"Error resizing node pool: {str(e)}"
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/monitoring/tools.py:
--------------------------------------------------------------------------------
```python
"""
Google Cloud Platform Monitoring tools.
"""
from typing import List, Dict, Any, Optional
def register_tools(mcp):
"""Register all monitoring tools with the MCP server."""
@mcp.tool()
def list_monitoring_metrics(project_id: str, filter_str: str = "") -> str:
"""
List available monitoring metrics for a GCP project.
Args:
project_id: The ID of the GCP project to list metrics for
filter_str: Optional filter string to narrow down the metrics
Returns:
List of available monitoring metrics in the specified GCP project
"""
try:
try:
from google.cloud import monitoring_v3
except ImportError:
return "Error: The Google Cloud monitoring library is not installed. Please install it with 'pip install google-cloud-monitoring'."
# Initialize the Monitoring client
client = monitoring_v3.MetricServiceClient()
# Format the project name
project_name = f"projects/{project_id}"
# Create the request object with the filter
request = monitoring_v3.ListMetricDescriptorsRequest(
name=project_name
)
# Add filter if provided
if filter_str:
request.filter = filter_str
# List metric descriptors with optional filter
descriptors = client.list_metric_descriptors(request=request)
# Format the response
metrics_list = []
for descriptor in descriptors:
metric_type = descriptor.type
display_name = descriptor.display_name or metric_type.split('/')[-1]
description = descriptor.description or "No description"
metrics_list.append(f"- {display_name}: {metric_type}\n {description}")
if not metrics_list:
filter_msg = f" with filter '{filter_str}'" if filter_str else ""
return f"No metrics found{filter_msg} for project {project_id}."
# Limit to 50 metrics to avoid overwhelming response
if len(metrics_list) > 50:
metrics_str = "\n".join(metrics_list[:50])
return f"Found {len(metrics_list)} metrics for project {project_id}. Showing first 50:\n\n{metrics_str}\n\nUse a filter to narrow down results."
else:
metrics_str = "\n".join(metrics_list)
return f"Found {len(metrics_list)} metrics for project {project_id}:\n\n{metrics_str}"
except Exception as e:
return f"Error listing monitoring metrics: {str(e)}"
@mcp.tool()
def get_monitoring_alerts(project_id: str) -> str:
"""
Get active monitoring alerts for a GCP project.
Args:
project_id: The ID of the GCP project to get alerts for
Returns:
Active alerts for the specified GCP project
"""
try:
from google.cloud import monitoring_v3
from google.protobuf.json_format import MessageToDict
# Initialize the Alert Policy Service client
alert_client = monitoring_v3.AlertPolicyServiceClient()
# Format the project name
project_name = f"projects/{project_id}"
# Create the request object
request = monitoring_v3.ListAlertPoliciesRequest(
name=project_name
)
# List all alert policies
alert_policies = alert_client.list_alert_policies(request=request)
# Initialize the Metric Service client for metric data
metric_client = monitoring_v3.MetricServiceClient()
# Format the response
active_alerts = []
for policy in alert_policies:
# Check if the policy is enabled
if not policy.enabled:
continue
# Check for active incidents
filter_str = f'resource.type="alerting_policy" AND metric.type="monitoring.googleapis.com/alert_policy/incident_count" AND metric.label.policy_name="{policy.name.split("/")[-1]}"'
# Create a time interval for the last hour
import datetime
from google.protobuf import timestamp_pb2
now = datetime.datetime.utcnow()
seconds = int(now.timestamp())
end_time = timestamp_pb2.Timestamp(seconds=seconds)
start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
seconds = int(start_time.timestamp())
start_time_proto = timestamp_pb2.Timestamp(seconds=seconds)
# Create the time interval
interval = monitoring_v3.TimeInterval(
start_time=start_time_proto,
end_time=end_time
)
# List the time series
try:
# Create the request object
request = monitoring_v3.ListTimeSeriesRequest(
name=project_name,
filter=filter_str,
interval=interval,
view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
)
# List the time series
time_series = metric_client.list_time_series(request=request)
is_active = False
for series in time_series:
# Check if there's a non-zero value in the time series
for point in series.points:
if point.value.int64_value > 0:
is_active = True
break
if is_active:
break
if is_active:
# Format conditions
conditions = []
for condition in policy.conditions:
if condition.display_name:
conditions.append(f" - {condition.display_name}: {condition.condition_threshold.filter}")
# Add to active alerts
alert_info = [
f"- {policy.display_name} (ID: {policy.name.split('/')[-1]})",
f" Description: {policy.documentation.content if policy.documentation else 'No description'}",
f" Severity: {policy.alert_strategy.notification_rate_limit.period.seconds}s between notifications" if policy.alert_strategy.notification_rate_limit else " No rate limiting"
]
if conditions:
alert_info.append(" Conditions:")
alert_info.extend(conditions)
active_alerts.append("\n".join(alert_info))
except Exception:
# Skip if we can't check for active incidents
continue
if not active_alerts:
return f"No active alerts found for project {project_id}."
alerts_str = "\n".join(active_alerts)
return f"""
Active Monitoring Alerts in GCP Project {project_id}:
{alerts_str}
"""
except Exception as e:
return f"Error getting monitoring alerts: {str(e)}"
@mcp.tool()
def create_alert_policy(project_id: str, display_name: str, metric_type: str,
filter_str: str, duration_seconds: int = 60,
threshold_value: float = 0.0, comparison: str = "COMPARISON_GT",
notification_channels: Optional[List[str]] = None) -> str:
"""
Create a new alert policy in a GCP project.
Args:
project_id: The ID of the GCP project
display_name: The display name for the alert policy
metric_type: The metric type to monitor (e.g., "compute.googleapis.com/instance/cpu/utilization")
filter_str: The filter for the metric data
duration_seconds: The duration in seconds over which to evaluate the condition (default: 60)
threshold_value: The threshold value for the condition (default: 0.0)
comparison: The comparison type (COMPARISON_GT, COMPARISON_LT, etc.) (default: COMPARISON_GT)
notification_channels: Optional list of notification channel IDs
Returns:
Result of the alert policy creation
"""
try:
from google.cloud import monitoring_v3
from google.protobuf import duration_pb2
# Initialize the Alert Policy Service client
client = monitoring_v3.AlertPolicyServiceClient()
# Format the project name
project_name = f"projects/{project_id}"
# Create a duration object
duration = duration_pb2.Duration(seconds=duration_seconds)
# Create the alert condition
condition = monitoring_v3.AlertPolicy.Condition(
display_name=f"Condition for {display_name}",
condition_threshold=monitoring_v3.AlertPolicy.Condition.MetricThreshold(
filter=filter_str,
comparison=getattr(monitoring_v3.ComparisonType, comparison),
threshold_value=threshold_value,
duration=duration,
trigger=monitoring_v3.AlertPolicy.Condition.Trigger(
count=1
),
aggregations=[
monitoring_v3.Aggregation(
alignment_period=duration_pb2.Duration(seconds=60),
per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN,
cross_series_reducer=monitoring_v3.Aggregation.Reducer.REDUCE_MEAN
)
]
)
)
# Create the alert policy
alert_policy = monitoring_v3.AlertPolicy(
display_name=display_name,
conditions=[condition],
combiner=monitoring_v3.AlertPolicy.ConditionCombinerType.OR
)
# Add notification channels if provided
if notification_channels:
alert_policy.notification_channels = [
f"projects/{project_id}/notificationChannels/{channel_id}"
for channel_id in notification_channels
]
# Create the policy
policy = client.create_alert_policy(name=project_name, alert_policy=alert_policy)
# Format response
conditions_str = "\n".join([
f"- {c.display_name}: {c.condition_threshold.filter}"
for c in policy.conditions
])
notifications_str = "None"
if policy.notification_channels:
notifications_str = "\n".join([
f"- {channel.split('/')[-1]}"
for channel in policy.notification_channels
])
return f"""
Alert Policy created successfully:
- Name: {policy.display_name}
- Policy ID: {policy.name.split('/')[-1]}
- Combiner: {policy.combiner.name}
Conditions:
{conditions_str}
Notification Channels:
{notifications_str}
"""
except Exception as e:
return f"Error creating alert policy: {str(e)}"
@mcp.tool()
def list_uptime_checks(project_id: str) -> str:
"""
List Uptime checks in a GCP project.
Args:
project_id: The ID of the GCP project to list Uptime checks for
Returns:
List of Uptime checks in the specified GCP project
"""
try:
from google.cloud import monitoring_v3
# Initialize the Uptime Check Service client
client = monitoring_v3.UptimeCheckServiceClient()
# Format the project name
project_name = f"projects/{project_id}"
# Create the request object
request = monitoring_v3.ListUptimeCheckConfigsRequest(
parent=project_name
)
# List uptime checks
uptime_checks = client.list_uptime_check_configs(request=request)
# Format the response
checks_list = []
for check in uptime_checks:
check_id = check.name.split('/')[-1]
display_name = check.display_name
period_seconds = check.period.seconds
timeout_seconds = check.timeout.seconds
# Get check type and details
check_details = []
if check.HasField('http_check'):
check_type = "HTTP"
url = check.http_check.path
if check.resource.HasField('monitored_resource'):
host = check.monitored_resource.labels.get('host', 'Unknown')
url = f"{host}{url}"
elif check.http_check.HasField('host'):
url = f"{check.http_check.host}{url}"
check_details.append(f"URL: {url}")
check_details.append(f"Port: {check.http_check.port}")
elif check.HasField('tcp_check'):
check_type = "TCP"
if check.resource.HasField('monitored_resource'):
host = check.monitored_resource.labels.get('host', 'Unknown')
else:
host = check.tcp_check.host
check_details.append(f"Host: {host}")
check_details.append(f"Port: {check.tcp_check.port}")
else:
check_type = "Unknown"
checks_list.append(f"- {display_name} (ID: {check_id}, Type: {check_type})")
checks_list.append(f" Frequency: {period_seconds}s, Timeout: {timeout_seconds}s")
checks_list.extend([f" {detail}" for detail in check_details])
if not checks_list:
return f"No Uptime checks found for project {project_id}."
checks_str = "\n".join(checks_list)
return f"""
Uptime Checks in GCP Project {project_id}:
{checks_str}
"""
except Exception as e:
return f"Error listing Uptime checks: {str(e)}"
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/networking/tools.py:
--------------------------------------------------------------------------------
```python
"""
Google Cloud Platform Networking tools.
"""
from typing import List, Dict, Any, Optional
def register_tools(mcp):
"""Register all networking tools with the MCP server."""
@mcp.tool()
def list_vpc_networks(project_id: str) -> str:
"""
List Virtual Private Cloud (VPC) networks in a GCP project.
Args:
project_id: The ID of the GCP project to list VPC networks for
Returns:
List of VPC networks in the specified GCP project
"""
try:
from google.cloud import compute_v1
# Initialize the Compute Engine client for networks
client = compute_v1.NetworksClient()
# List networks
request = compute_v1.ListNetworksRequest(project=project_id)
networks = client.list(request=request)
# Format the response
networks_list = []
for network in networks:
subnet_mode = "Auto" if network.auto_create_subnetworks else "Custom"
creation_time = network.creation_timestamp if network.creation_timestamp else "Unknown"
# Get subnet information if available
subnets = []
if not network.auto_create_subnetworks and network.subnetworks:
for subnet_url in network.subnetworks:
subnet_name = subnet_url.split('/')[-1]
subnet_region = subnet_url.split('/')[-3]
subnets.append(f" - {subnet_name} (Region: {subnet_region})")
network_info = f"- {network.name} (Mode: {subnet_mode}, Created: {creation_time})"
if subnets:
network_info += "\n Subnets:\n" + "\n".join(subnets)
networks_list.append(network_info)
if not networks_list:
return f"No VPC networks found in project {project_id}."
networks_str = "\n".join(networks_list)
return f"""
VPC Networks in GCP Project {project_id}:
{networks_str}
"""
except Exception as e:
return f"Error listing VPC networks: {str(e)}"
@mcp.tool()
def get_vpc_details(project_id: str, network_name: str) -> str:
"""
Get detailed information about a specific VPC network.
Args:
project_id: The ID of the GCP project
network_name: The name of the VPC network
Returns:
Detailed information about the specified VPC network
"""
try:
from google.cloud import compute_v1
# Initialize the Compute Engine client for networks
network_client = compute_v1.NetworksClient()
subnet_client = compute_v1.SubnetworksClient()
# Get network details
network = network_client.get(project=project_id, network=network_name)
# Format the response
details = []
details.append(f"Name: {network.name}")
details.append(f"ID: {network.id}")
details.append(f"Description: {network.description or 'None'}")
details.append(f"Self Link: {network.self_link}")
details.append(f"Creation Time: {network.creation_timestamp}")
details.append(f"Subnet Mode: {'Auto' if network.auto_create_subnetworks else 'Custom'}")
details.append(f"Routing Mode: {network.routing_config.routing_mode if network.routing_config else 'Unknown'}")
details.append(f"MTU: {network.mtu}")
# If it's a custom subnet mode network, get all subnets
if not network.auto_create_subnetworks:
# List all subnets in this network
request = compute_v1.ListSubnetworksRequest(project=project_id)
subnets = []
for item in subnet_client.list(request=request):
# Check if the subnet belongs to this network
if network.name in item.network:
cidr = item.ip_cidr_range
region = item.region.split('/')[-1]
purpose = f", Purpose: {item.purpose}" if item.purpose else ""
private_ip = ", Private Google Access: Enabled" if item.private_ip_google_access else ""
subnets.append(f" - {item.name} (Region: {region}, CIDR: {cidr}{purpose}{private_ip})")
if subnets:
details.append(f"Subnets ({len(subnets)}):\n" + "\n".join(subnets))
# List peering connections if any
if network.peerings:
peerings = []
for peering in network.peerings:
state = peering.state
network_name = peering.network.split('/')[-1]
peerings.append(f" - {network_name} (State: {state})")
if peerings:
details.append(f"Peerings ({len(peerings)}):\n" + "\n".join(peerings))
details_str = "\n".join(details)
return f"""
VPC Network Details:
{details_str}
"""
except Exception as e:
return f"Error getting VPC network details: {str(e)}"
@mcp.tool()
def list_subnets(project_id: str, region: Optional[str] = None) -> str:
"""
List subnets in a GCP project, optionally filtered by region.
Args:
project_id: The ID of the GCP project
region: Optional region to filter subnets by
Returns:
List of subnets in the specified GCP project
"""
try:
from google.cloud import compute_v1
# Initialize the Compute Engine client for subnets
client = compute_v1.SubnetworksClient()
# List subnets
request = compute_v1.ListSubnetworksRequest(project=project_id, region=region) if region else compute_v1.ListSubnetworksRequest(project=project_id)
subnets = client.list(request=request)
# Format the response
subnets_list = []
for subnet in subnets:
network_name = subnet.network.split('/')[-1]
region_name = subnet.region.split('/')[-1]
cidr = subnet.ip_cidr_range
purpose = f", Purpose: {subnet.purpose}" if subnet.purpose else ""
private_ip = ", Private Google Access: Enabled" if subnet.private_ip_google_access else ""
subnet_info = f"- {subnet.name} (Network: {network_name}, Region: {region_name}, CIDR: {cidr}{purpose}{private_ip})"
subnets_list.append(subnet_info)
if not subnets_list:
return f"No subnets found in project {project_id}{' for region ' + region if region else ''}."
subnets_str = "\n".join(subnets_list)
return f"""
Subnets in GCP Project {project_id}{' for region ' + region if region else ''}:
{subnets_str}
"""
except Exception as e:
return f"Error listing subnets: {str(e)}"
@mcp.tool()
def create_firewall_rule(project_id: str, name: str, network: str, direction: str, priority: int,
source_ranges: Optional[List[str]] = None, destination_ranges: Optional[List[str]] = None,
allowed_protocols: Optional[List[Dict[str, Any]]] = None, denied_protocols: Optional[List[Dict[str, Any]]] = None,
target_tags: Optional[List[str]] = None, source_tags: Optional[List[str]] = None,
description: Optional[str] = None) -> str:
"""
Create a firewall rule in a GCP project.
Args:
project_id: The ID of the GCP project
name: The name of the firewall rule
network: The name of the network to create the firewall rule for
direction: The direction of traffic to match ('INGRESS' or 'EGRESS')
priority: The priority of the rule (lower number = higher priority, 0-65535)
source_ranges: Optional list of source IP ranges (for INGRESS)
destination_ranges: Optional list of destination IP ranges (for EGRESS)
allowed_protocols: Optional list of allowed protocols, e.g. [{"IPProtocol": "tcp", "ports": ["80", "443"]}]
denied_protocols: Optional list of denied protocols, e.g. [{"IPProtocol": "tcp", "ports": ["22"]}]
target_tags: Optional list of target instance tags
source_tags: Optional list of source instance tags (for INGRESS)
description: Optional description for the firewall rule
Returns:
Result of the firewall rule creation
"""
try:
from google.cloud import compute_v1
# Initialize the Compute Engine client for firewall
client = compute_v1.FirewallsClient()
# Create the firewall resource
firewall = compute_v1.Firewall()
firewall.name = name
firewall.network = f"projects/{project_id}/global/networks/{network}"
firewall.direction = direction
firewall.priority = priority
if description:
firewall.description = description
# Set source/destination ranges based on direction
if direction == "INGRESS" and source_ranges:
firewall.source_ranges = source_ranges
elif direction == "EGRESS" and destination_ranges:
firewall.destination_ranges = destination_ranges
# Set allowed protocols
if allowed_protocols:
firewall.allowed = []
for protocol in allowed_protocols:
allowed = compute_v1.Allowed()
allowed.I_p_protocol = protocol["IPProtocol"]
if "ports" in protocol:
allowed.ports = protocol["ports"]
firewall.allowed.append(allowed)
# Set denied protocols
if denied_protocols:
firewall.denied = []
for protocol in denied_protocols:
denied = compute_v1.Denied()
denied.I_p_protocol = protocol["IPProtocol"]
if "ports" in protocol:
denied.ports = protocol["ports"]
firewall.denied.append(denied)
# Set target tags
if target_tags:
firewall.target_tags = target_tags
# Set source tags
if source_tags and direction == "INGRESS":
firewall.source_tags = source_tags
# Create the firewall rule
operation = client.insert(project=project_id, firewall_resource=firewall)
return f"""
Firewall rule creation initiated:
- Name: {name}
- Network: {network}
- Direction: {direction}
- Priority: {priority}
- Description: {description or 'None'}
- Source Ranges: {source_ranges or 'None'}
- Destination Ranges: {destination_ranges or 'None'}
- Allowed Protocols: {allowed_protocols or 'None'}
- Denied Protocols: {denied_protocols or 'None'}
- Target Tags: {target_tags or 'None'}
- Source Tags: {source_tags or 'None'}
Operation ID: {operation.id}
Status: {operation.status}
"""
except Exception as e:
return f"Error creating firewall rule: {str(e)}"
@mcp.tool()
def list_firewall_rules(project_id: str, network: Optional[str] = None) -> str:
"""
List firewall rules in a GCP project, optionally filtered by network.
Args:
project_id: The ID of the GCP project
network: Optional network name to filter firewall rules by
Returns:
List of firewall rules in the specified GCP project
"""
try:
from google.cloud import compute_v1
# Initialize the Compute Engine client for firewall
client = compute_v1.FirewallsClient()
# List firewall rules
request = compute_v1.ListFirewallsRequest(project=project_id)
firewalls = client.list(request=request)
# Format the response
firewalls_list = []
for firewall in firewalls:
# If network filter is applied, skip firewalls not in that network
if network and network not in firewall.network:
continue
# Get network name from the full URL
network_name = firewall.network.split('/')[-1]
# Get allowed/denied protocols
allowed = []
for allow in firewall.allowed:
ports = f":{','.join(allow.ports)}" if allow.ports else ""
allowed.append(f"{allow.I_p_protocol}{ports}")
denied = []
for deny in firewall.denied:
ports = f":{','.join(deny.ports)}" if deny.ports else ""
denied.append(f"{deny.I_p_protocol}{ports}")
# Format sources/destinations based on direction
if firewall.direction == "INGRESS":
sources = firewall.source_ranges or firewall.source_tags or ["Any"]
destinations = ["Any"]
else: # EGRESS
sources = ["Any"]
destinations = firewall.destination_ranges or ["Any"]
# Create the firewall rule info string
rule_info = [
f"- {firewall.name}",
f" Network: {network_name}",
f" Direction: {firewall.direction}",
f" Priority: {firewall.priority}",
f" Action: {'Allow' if firewall.allowed else 'Deny'}"
]
if allowed:
rule_info.append(f" Allowed: {', '.join(allowed)}")
if denied:
rule_info.append(f" Denied: {', '.join(denied)}")
rule_info.append(f" Sources: {', '.join(sources)}")
rule_info.append(f" Destinations: {', '.join(destinations)}")
if firewall.target_tags:
rule_info.append(f" Target Tags: {', '.join(firewall.target_tags)}")
firewalls_list.append("\n".join(rule_info))
if not firewalls_list:
return f"No firewall rules found in project {project_id}{' for network ' + network if network else ''}."
firewalls_str = "\n".join(firewalls_list)
return f"""
Firewall Rules in GCP Project {project_id}{' for network ' + network if network else ''}:
{firewalls_str}
"""
except Exception as e:
return f"Error listing firewall rules: {str(e)}"
@mcp.tool()
def list_gcp_services(project_id: str) -> str:
"""
List enabled services/APIs in a GCP project.
Args:
project_id: The ID of the GCP project to list services for
Returns:
List of enabled services in the specified GCP project
"""
try:
try:
from google.cloud import service_usage
except ImportError:
return "Error: The Google Cloud service usage library is not installed. Please install it with 'pip install google-cloud-service-usage'."
# Initialize the Service Usage client
client = service_usage.ServiceUsageClient()
# Create the request
request = service_usage.ListServicesRequest(
parent=f"projects/{project_id}",
filter="state:ENABLED"
)
# List enabled services
services = client.list_services(request=request)
# Format the response
services_list = []
for service in services:
name = service.name.split('/')[-1] if service.name else "Unknown"
title = service.config.title if service.config else "Unknown"
services_list.append(f"- {name}: {title}")
if not services_list:
return f"No services are enabled in project {project_id}."
services_str = "\n".join(services_list)
return f"""
Enabled Services in GCP Project {project_id}:
{services_str}
"""
except Exception as e:
return f"Error listing GCP services: {str(e)}"
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/auth/tools.py:
--------------------------------------------------------------------------------
```python
"""
Google Cloud Platform Authentication tools.
"""
import os
import json
from typing import Dict, Any, Optional, List
import webbrowser
import tempfile
from pathlib import Path
def register_tools(mcp):
"""Register all authentication tools with the MCP server."""
# Global variable to store the current project ID
_current_project_id = None
@mcp.tool()
def auth_login(project_id: str = "") -> str:
"""
Authenticate with Google Cloud Platform using browser-based OAuth flow.
Args:
project_id: Optional project ID to set as default after login
Returns:
Status message indicating whether authentication was successful
"""
nonlocal _current_project_id
try:
from google.auth.transport.requests import Request
from google.auth.exceptions import DefaultCredentialsError
from google_auth_oauthlib.flow import InstalledAppFlow
import google.auth
# First, attempt to use existing credentials to see if we're already authenticated
try:
credentials, project = google.auth.default()
# Test if credentials are valid
if hasattr(credentials, 'refresh'):
credentials.refresh(Request())
# If we get here, credentials are valid
if project_id:
# Update global project ID
_current_project_id = project_id
# Create a credential configuration file for the project
_set_project_id_in_config(project_id)
return f"Using existing credentials. Project set to {project_id}."
else:
return f"Using existing credentials. Current project: {project or 'Not set'}"
except (DefaultCredentialsError, Exception) as e:
# If we can't use existing credentials, proceed with login
pass
# Set up the OAuth flow
print("Opening browser for authentication...")
# Create a temporary client_secrets.json file for OAuth flow
client_secrets = {
"installed": {
"client_id": "764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com",
"project_id": "gcp-mcp",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_secret": "d-FL95Q19q7MQmFpd7hHD0Ty",
"redirect_uris": ["http://localhost", "urn:ietf:wg:oauth:2.0:oob"]
}
}
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp:
temp_client_secrets_path = temp.name
json.dump(client_secrets, temp)
try:
# Create the OAuth flow
flow = InstalledAppFlow.from_client_secrets_file(
temp_client_secrets_path,
scopes=['https://www.googleapis.com/auth/cloud-platform']
)
# Run the flow
creds = flow.run_local_server(port=0)
# Save the credentials as application default credentials
adc_path = _get_adc_path()
os.makedirs(os.path.dirname(adc_path), exist_ok=True)
# Write credentials to ADC file
with open(adc_path, 'w') as f:
creds_data = {
"client_id": creds.client_id,
"client_secret": creds.client_secret,
"refresh_token": creds.refresh_token,
"type": "authorized_user"
}
json.dump(creds_data, f)
# Set project if specified
if project_id:
_current_project_id = project_id
_set_project_id_in_config(project_id)
success_msg = "Authentication successful!"
if project_id:
success_msg += f" Default project set to {project_id}."
# Test by listing accessible projects
try:
from google.cloud import resourcemanager_v3
# Get fresh credentials after login
credentials, _ = google.auth.default()
client = resourcemanager_v3.ProjectsClient(credentials=credentials)
request = resourcemanager_v3.ListProjectsRequest()
projects = list(client.list_projects(request=request))
project_count = len(projects)
if project_count > 0:
project_list = "\n".join([f"- {project.display_name} (ID: {project.project_id})" for project in projects[:5]])
if project_count > 5:
project_list += f"\n... and {project_count - 5} more"
success_msg += f"\n\nFound {project_count} accessible projects:\n{project_list}"
except Exception as e:
# Don't fail if we can't list projects
pass
return success_msg
finally:
# Clean up the temporary file
try:
os.unlink(temp_client_secrets_path)
except:
pass
except Exception as e:
return f"Authentication error: {str(e)}"
@mcp.tool()
def auth_list() -> str:
"""
List active Google Cloud credentials.
Returns:
List of active credentials and the current default account
"""
try:
import google.auth
# Check application default credentials
try:
credentials, project = google.auth.default()
# Try to get email from credentials
email = None
if hasattr(credentials, 'service_account_email'):
email = credentials.service_account_email
elif hasattr(credentials, 'refresh_token') and credentials.refresh_token:
# This is a user credential
adc_path = _get_adc_path()
if os.path.exists(adc_path):
try:
with open(adc_path, 'r') as f:
data = json.load(f)
if 'refresh_token' in data:
# This is a user auth, but we can't get the email directly
email = "User account (ADC)"
except:
pass
credential_type = type(credentials).__name__
output = "Active Credentials:\n"
if email:
output += f"- {email} (Application Default Credentials, type: {credential_type})\n"
else:
output += f"- Application Default Credentials (type: {credential_type})\n"
if project:
output += f"\nCurrent Project: {project}\n"
else:
output += "\nNo project set in default credentials.\n"
# Check for other credentials in well-known locations
credentials_dir = os.path.expanduser("~/.config/gcloud/credentials")
if os.path.isdir(credentials_dir):
cred_files = [f for f in os.listdir(credentials_dir) if f.endswith('.json')]
if cred_files:
output += "\nOther available credentials:\n"
for cred_file in cred_files:
try:
with open(os.path.join(credentials_dir, cred_file), 'r') as f:
data = json.load(f)
if 'client_id' in data:
output += f"- User account ({cred_file})\n"
elif 'private_key_id' in data:
output += f"- Service account: {data.get('client_email', 'Unknown')} ({cred_file})\n"
except:
output += f"- Unknown credential type ({cred_file})\n"
return output
except Exception as e:
return f"No active credentials found. Please run auth_login() to authenticate.\nError: {str(e)}"
except Exception as e:
return f"Error listing credentials: {str(e)}"
@mcp.tool()
def auth_revoke() -> str:
"""
Revoke Google Cloud credentials.
Returns:
Status message indicating whether the credentials were revoked
"""
try:
import google.auth
from google.auth.transport.requests import Request
# Check if we have application default credentials
try:
credentials, _ = google.auth.default()
# If credentials have a revoke method, use it
if hasattr(credentials, 'revoke'):
credentials.revoke(Request())
# Remove the application default credentials file
adc_path = _get_adc_path()
if os.path.exists(adc_path):
os.remove(adc_path)
return "Application default credentials have been revoked and removed."
else:
return "No application default credentials file found to remove."
except Exception as e:
return f"No active credentials found or failed to revoke: {str(e)}"
except Exception as e:
return f"Error revoking credentials: {str(e)}"
@mcp.tool()
def config_set_project(project_id: str) -> str:
"""
Set the default Google Cloud project.
Args:
project_id: The ID of the project to set as default
Returns:
Status message indicating whether the project was set
"""
nonlocal _current_project_id
try:
# Update global project ID
_current_project_id = project_id
# Create or update the config file
_set_project_id_in_config(project_id)
# Verify the project exists
try:
from google.cloud import resourcemanager_v3
import google.auth
credentials, _ = google.auth.default()
client = resourcemanager_v3.ProjectsClient(credentials=credentials)
name = f"projects/{project_id}"
try:
project = client.get_project(name=name)
return f"Default project set to {project_id} ({project.display_name})."
except Exception:
# Project might not exist or user might not have access
return f"Default project set to {project_id}. Note: Could not verify if this project exists or if you have access to it."
except Exception as e:
# Don't fail if we can't verify the project
return f"Default project set to {project_id}."
except Exception as e:
return f"Error setting project: {str(e)}"
@mcp.tool()
def config_list() -> str:
"""
List the current Google Cloud configuration.
Returns:
Current configuration settings
"""
try:
# Get project ID from config
project_id = _get_project_id_from_config()
# Get project ID from global variable if set
if _current_project_id:
project_id = _current_project_id
output = "Current Configuration:\n"
if project_id:
output += f"- Project ID: {project_id}\n"
else:
output += "- Project ID: Not set\n"
# Check if we have active credentials
try:
import google.auth
credentials, default_project = google.auth.default()
if hasattr(credentials, 'service_account_email'):
output += f"- Authenticated as: {credentials.service_account_email} (Service Account)\n"
else:
output += "- Authenticated as: User Account\n"
if default_project and default_project != project_id:
output += f"- Default Project in Credentials: {default_project}\n"
except Exception:
output += "- Authentication: Not authenticated or credentials not found\n"
# Get additional configuration
config_file = os.path.join(_get_config_path(), 'configurations', 'config_default')
if os.path.exists(config_file):
try:
with open(config_file, 'r') as f:
config_lines = f.readlines()
if config_lines:
output += "\nAdditional Configuration Settings:\n"
for line in config_lines:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip()
# Skip project since we already displayed it
if key != 'project':
output += f"- {key}: {value}\n"
except:
pass
return output
except Exception as e:
return f"Error listing configuration: {str(e)}"
# Helper functions
def _get_adc_path() -> str:
"""Get the path to the application default credentials file."""
# Standard ADC paths by platform
if os.name == 'nt': # Windows
return os.path.join(os.environ.get('APPDATA', ''), 'gcloud', 'application_default_credentials.json')
else: # Linux/Mac
return os.path.expanduser('~/.config/gcloud/application_default_credentials.json')
def _get_config_path() -> str:
"""Get the path to the configuration directory."""
if os.name == 'nt': # Windows
return os.path.join(os.environ.get('APPDATA', ''), 'gcloud')
else: # Linux/Mac
return os.path.expanduser('~/.config/gcloud')
def _set_project_id_in_config(project_id: str) -> None:
"""Set the project ID in the configuration file."""
config_dir = _get_config_path()
os.makedirs(config_dir, exist_ok=True)
config_file = os.path.join(config_dir, 'configurations', 'config_default')
os.makedirs(os.path.dirname(config_file), exist_ok=True)
# Read existing config if it exists
config_data = {}
if os.path.exists(config_file):
try:
with open(config_file, 'r') as f:
for line in f:
if '=' in line:
key, value = line.strip().split('=', 1)
config_data[key.strip()] = value.strip()
except:
pass
# Update project
config_data['project'] = project_id
# Write back config
with open(config_file, 'w') as f:
for key, value in config_data.items():
f.write(f"{key} = {value}\n")
def _get_project_id_from_config() -> Optional[str]:
"""Get the project ID from the configuration file."""
config_file = os.path.join(_get_config_path(), 'configurations', 'config_default')
if os.path.exists(config_file):
try:
with open(config_file, 'r') as f:
for line in f:
if line.strip().startswith('project ='):
return line.split('=', 1)[1].strip()
except:
pass
return None
```
--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/compute/tools.py:
--------------------------------------------------------------------------------
```python
"""
Google Cloud Platform Compute Engine tools.
"""
from typing import List, Dict, Any, Optional
def register_tools(mcp):
"""Register all compute tools with the MCP server."""
@mcp.tool()
def list_compute_instances(project_id: str, zone: str = "") -> str:
"""
List Compute Engine instances in a GCP project.
Args:
project_id: The ID of the GCP project to list instances for
zone: Optional zone to filter instances (e.g., "us-central1-a")
Returns:
List of Compute Engine instances in the specified GCP project
"""
try:
from google.cloud import compute_v1
# Initialize the Compute Engine client
client = compute_v1.InstancesClient()
instances_list = []
if zone:
# List instances in the specified zone
request = compute_v1.ListInstancesRequest(
project=project_id,
zone=zone
)
instances = client.list(request=request)
for instance in instances:
machine_type = instance.machine_type.split('/')[-1] if instance.machine_type else "Unknown"
status = instance.status
ext_ip = "None"
int_ip = "None"
# Get IP addresses
if instance.network_interfaces:
int_ip = instance.network_interfaces[0].network_i_p
if instance.network_interfaces[0].access_configs:
ext_ip = instance.network_interfaces[0].access_configs[0].nat_i_p or "None"
instances_list.append(f"- {instance.name} (Zone: {zone}, Type: {machine_type}, Internal IP: {int_ip}, External IP: {ext_ip}, Status: {status})")
else:
# List instances in all zones
zones_client = compute_v1.ZonesClient()
zones_request = compute_v1.ListZonesRequest(project=project_id)
zones = zones_client.list(request=zones_request)
for zone_item in zones:
zone_name = zone_item.name
request = compute_v1.ListInstancesRequest(
project=project_id,
zone=zone_name
)
try:
instances = client.list(request=request)
for instance in instances:
machine_type = instance.machine_type.split('/')[-1] if instance.machine_type else "Unknown"
status = instance.status
ext_ip = "None"
int_ip = "None"
# Get IP addresses
if instance.network_interfaces:
int_ip = instance.network_interfaces[0].network_i_p
if instance.network_interfaces[0].access_configs:
ext_ip = instance.network_interfaces[0].access_configs[0].nat_i_p or "None"
instances_list.append(f"- {instance.name} (Zone: {zone_name}, Type: {machine_type}, Internal IP: {int_ip}, External IP: {ext_ip}, Status: {status})")
except Exception:
# Skip zones where we can't list instances
continue
if not instances_list:
zone_msg = f" in zone {zone}" if zone else ""
return f"No Compute Engine instances found{zone_msg} for project {project_id}."
instances_str = "\n".join(instances_list)
zone_msg = f" in zone {zone}" if zone else ""
return f"""
Compute Engine Instances{zone_msg} in GCP Project {project_id}:
{instances_str}
"""
except Exception as e:
return f"Error listing Compute Engine instances: {str(e)}"
@mcp.tool()
def get_instance_details(project_id: str, zone: str, instance_name: str) -> str:
"""
Get detailed information about a specific Compute Engine instance.
Args:
project_id: The ID of the GCP project
zone: The zone where the instance is located (e.g., "us-central1-a")
instance_name: The name of the instance to get details for
Returns:
Detailed information about the specified Compute Engine instance
"""
try:
from google.cloud import compute_v1
# Initialize the Compute Engine client
client = compute_v1.InstancesClient()
# Get the instance details
instance = client.get(project=project_id, zone=zone, instance=instance_name)
# Format machine type
machine_type = instance.machine_type.split('/')[-1] if instance.machine_type else "Unknown"
# Format creation timestamp
creation_timestamp = instance.creation_timestamp if instance.creation_timestamp else "Unknown"
# Format boot disk
boot_disk = "None"
if instance.disks:
for disk in instance.disks:
if disk.boot:
boot_disk = disk.source.split('/')[-1] if disk.source else "Unknown"
break
# Get IP addresses
network_interfaces = []
if instance.network_interfaces:
for i, iface in enumerate(instance.network_interfaces):
network = iface.network.split('/')[-1] if iface.network else "Unknown"
subnetwork = iface.subnetwork.split('/')[-1] if iface.subnetwork else "Unknown"
internal_ip = iface.network_i_p or "None"
# Check for external IP
external_ip = "None"
if iface.access_configs:
external_ip = iface.access_configs[0].nat_i_p or "None"
network_interfaces.append(f" Interface {i}:\n Network: {network}\n Subnetwork: {subnetwork}\n Internal IP: {internal_ip}\n External IP: {external_ip}")
networks_str = "\n".join(network_interfaces) if network_interfaces else " None"
# Get attached disks
disks = []
if instance.disks:
for i, disk in enumerate(instance.disks):
disk_name = disk.source.split('/')[-1] if disk.source else "Unknown"
disk_type = "Boot" if disk.boot else "Data"
auto_delete = "Yes" if disk.auto_delete else "No"
mode = disk.mode if disk.mode else "Unknown"
disks.append(f" Disk {i}:\n Name: {disk_name}\n Type: {disk_type}\n Mode: {mode}\n Auto-delete: {auto_delete}")
disks_str = "\n".join(disks) if disks else " None"
# Get labels
labels = []
if instance.labels:
for key, value in instance.labels.items():
labels.append(f" {key}: {value}")
labels_str = "\n".join(labels) if labels else " None"
# Get metadata
metadata_items = []
if instance.metadata and instance.metadata.items:
for item in instance.metadata.items:
metadata_items.append(f" {item.key}: {item.value}")
metadata_str = "\n".join(metadata_items) if metadata_items else " None"
return f"""
Compute Engine Instance Details for {instance_name}:
Project: {project_id}
Zone: {zone}
Machine Type: {machine_type}
Status: {instance.status}
Creation Time: {creation_timestamp}
CPU Platform: {instance.cpu_platform}
Boot Disk: {boot_disk}
Network Interfaces:
{networks_str}
Disks:
{disks_str}
Labels:
{labels_str}
Metadata:
{metadata_str}
Service Accounts: {"Yes" if instance.service_accounts else "None"}
"""
except Exception as e:
return f"Error getting instance details: {str(e)}"
@mcp.tool()
def start_instance(project_id: str, zone: str, instance_name: str) -> str:
"""
Start a Compute Engine instance.
Args:
project_id: The ID of the GCP project
zone: The zone where the instance is located (e.g., "us-central1-a")
instance_name: The name of the instance to start
Returns:
Status message indicating whether the instance was started successfully
"""
try:
from google.cloud import compute_v1
# Initialize the Compute Engine client
client = compute_v1.InstancesClient()
# Start the instance
operation = client.start(project=project_id, zone=zone, instance=instance_name)
# Wait for the operation to complete
operation_client = compute_v1.ZoneOperationsClient()
# This is a synchronous call that will wait until the operation is complete
while operation.status != compute_v1.Operation.Status.DONE:
operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1])
import time
time.sleep(1)
if operation.error:
return f"Error starting instance {instance_name}: {operation.error.errors[0].message}"
return f"Instance {instance_name} in zone {zone} started successfully."
except Exception as e:
return f"Error starting instance: {str(e)}"
@mcp.tool()
def stop_instance(project_id: str, zone: str, instance_name: str) -> str:
"""
Stop a Compute Engine instance.
Args:
project_id: The ID of the GCP project
zone: The zone where the instance is located (e.g., "us-central1-a")
instance_name: The name of the instance to stop
Returns:
Status message indicating whether the instance was stopped successfully
"""
try:
from google.cloud import compute_v1
# Initialize the Compute Engine client
client = compute_v1.InstancesClient()
# Stop the instance
operation = client.stop(project=project_id, zone=zone, instance=instance_name)
# Wait for the operation to complete
operation_client = compute_v1.ZoneOperationsClient()
# This is a synchronous call that will wait until the operation is complete
while operation.status != compute_v1.Operation.Status.DONE:
operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1])
import time
time.sleep(1)
if operation.error:
return f"Error stopping instance {instance_name}: {operation.error.errors[0].message}"
return f"Instance {instance_name} in zone {zone} stopped successfully."
except Exception as e:
return f"Error stopping instance: {str(e)}"
@mcp.tool()
def list_machine_types(project_id: str, zone: str) -> str:
"""
List available machine types in a specific zone.
Args:
project_id: The ID of the GCP project
zone: The zone to check machine types in (e.g., "us-central1-a")
Returns:
List of available machine types in the specified zone
"""
try:
from google.cloud import compute_v1
# Initialize the Machine Types client
client = compute_v1.MachineTypesClient()
# List machine types
request = compute_v1.ListMachineTypesRequest(
project=project_id,
zone=zone
)
machine_types = client.list(request=request)
# Format the response
types_list = []
# Group by series
series = {}
for mt in machine_types:
# Determine series (e.g., e2, n1, c2)
name = mt.name
series_name = "custom" if name.startswith("custom") else name.split("-")[0]
if series_name not in series:
series[series_name] = []
# Format the machine type details
vcpus = mt.guest_cpus
memory_gb = mt.memory_mb / 1024 # Convert MB to GB
series[series_name].append(f" {name}: {vcpus} vCPUs, {memory_gb:.1f} GB RAM")
# Create formatted output by series
for s_name in sorted(series.keys()):
types_list.append(f" {s_name} series:")
types_list.extend(sorted(series[s_name]))
if not types_list:
return f"No machine types found in zone {zone} for project {project_id}."
types_str = "\n".join(types_list)
return f"""
Available Machine Types in Zone {zone} for Project {project_id}:
{types_str}
"""
except Exception as e:
return f"Error listing machine types: {str(e)}"
@mcp.tool()
def list_disks(project_id: str, zone: str = "") -> str:
"""
List Compute Engine persistent disks in a GCP project.
Args:
project_id: The ID of the GCP project to list disks for
zone: Optional zone to filter disks (e.g., "us-central1-a")
Returns:
List of persistent disks in the specified GCP project
"""
try:
from google.cloud import compute_v1
# Initialize the Disks client
client = compute_v1.DisksClient()
disks_list = []
if zone:
# List disks in the specified zone
request = compute_v1.ListDisksRequest(
project=project_id,
zone=zone
)
disks = client.list(request=request)
for disk in disks:
size_gb = disk.size_gb
disk_type = disk.type.split('/')[-1] if disk.type else "Unknown"
status = disk.status
users = len(disk.users) if disk.users else 0
users_str = f"Attached to {users} instance(s)" if users > 0 else "Not attached"
disks_list.append(f"- {disk.name} (Zone: {zone}, Type: {disk_type}, Size: {size_gb} GB, Status: {status}, {users_str})")
else:
# List disks in all zones
zones_client = compute_v1.ZonesClient()
zones_request = compute_v1.ListZonesRequest(project=project_id)
zones = zones_client.list(request=zones_request)
for zone_item in zones:
zone_name = zone_item.name
request = compute_v1.ListDisksRequest(
project=project_id,
zone=zone_name
)
try:
disks = client.list(request=request)
for disk in disks:
size_gb = disk.size_gb
disk_type = disk.type.split('/')[-1] if disk.type else "Unknown"
status = disk.status
users = len(disk.users) if disk.users else 0
users_str = f"Attached to {users} instance(s)" if users > 0 else "Not attached"
disks_list.append(f"- {disk.name} (Zone: {zone_name}, Type: {disk_type}, Size: {size_gb} GB, Status: {status}, {users_str})")
except Exception:
# Skip zones where we can't list disks
continue
if not disks_list:
zone_msg = f" in zone {zone}" if zone else ""
return f"No persistent disks found{zone_msg} for project {project_id}."
disks_str = "\n".join(disks_list)
zone_msg = f" in zone {zone}" if zone else ""
return f"""
Persistent Disks{zone_msg} in GCP Project {project_id}:
{disks_str}
"""
except Exception as e:
return f"Error listing persistent disks: {str(e)}"
@mcp.tool()
def create_instance(project_id: str, zone: str, instance_name: str, machine_type: str,
source_image: str, boot_disk_size_gb: int = 10,
network: str = "default", subnet: str = "",
external_ip: bool = True) -> str:
"""
Create a new Compute Engine instance.
Args:
project_id: The ID of the GCP project
zone: The zone to create the instance in (e.g., "us-central1-a")
instance_name: The name for the new instance
machine_type: The machine type (e.g., "e2-medium")
source_image: The source image for the boot disk (e.g., "projects/debian-cloud/global/images/family/debian-11")
boot_disk_size_gb: The size of the boot disk in GB (default: 10)
network: The network to connect to (default: "default")
subnet: The subnetwork to connect to (optional)
external_ip: Whether to allocate an external IP (default: True)
Returns:
Status message indicating whether the instance was created successfully
"""
try:
from google.cloud import compute_v1
# Initialize the clients
instances_client = compute_v1.InstancesClient()
# Format the machine type
machine_type_url = f"projects/{project_id}/zones/{zone}/machineTypes/{machine_type}"
# Create the disk configuration
boot_disk = compute_v1.AttachedDisk()
boot_disk.boot = True
initialize_params = compute_v1.AttachedDiskInitializeParams()
initialize_params.source_image = source_image
initialize_params.disk_size_gb = boot_disk_size_gb
boot_disk.initialize_params = initialize_params
boot_disk.auto_delete = True
# Create the network configuration
network_interface = compute_v1.NetworkInterface()
if network.startswith("projects/"):
network_interface.network = network
else:
network_interface.network = f"projects/{project_id}/global/networks/{network}"
if subnet:
if subnet.startswith("projects/"):
network_interface.subnetwork = subnet
else:
network_interface.subnetwork = f"projects/{project_id}/regions/{zone.rsplit('-', 1)[0]}/subnetworks/{subnet}"
if external_ip:
access_config = compute_v1.AccessConfig()
access_config.name = "External NAT"
access_config.type_ = "ONE_TO_ONE_NAT"
access_config.network_tier = "PREMIUM"
network_interface.access_configs = [access_config]
# Create the instance
instance = compute_v1.Instance()
instance.name = instance_name
instance.machine_type = machine_type_url
instance.disks = [boot_disk]
instance.network_interfaces = [network_interface]
# Create a default service account for the instance
service_account = compute_v1.ServiceAccount()
service_account.email = "default"
service_account.scopes = ["https://www.googleapis.com/auth/cloud-platform"]
instance.service_accounts = [service_account]
# Create the instance
operation = instances_client.insert(
project=project_id,
zone=zone,
instance_resource=instance
)
# Wait for the create operation to complete
operation_client = compute_v1.ZoneOperationsClient()
# This is a synchronous call that will wait until the operation is complete
while operation.status != compute_v1.Operation.Status.DONE:
operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1])
import time
time.sleep(1)
if operation.error:
return f"Error creating instance {instance_name}: {operation.error.errors[0].message}"
# Get the created instance to return its details
created_instance = instances_client.get(project=project_id, zone=zone, instance=instance_name)
# Get the instance IP addresses
internal_ip = "None"
external_ip = "None"
if created_instance.network_interfaces:
internal_ip = created_instance.network_interfaces[0].network_i_p or "None"
if created_instance.network_interfaces[0].access_configs:
external_ip = created_instance.network_interfaces[0].access_configs[0].nat_i_p or "None"
return f"""
Instance {instance_name} created successfully in zone {zone}.
Details:
- Machine Type: {machine_type}
- Internal IP: {internal_ip}
- External IP: {external_ip}
- Status: {created_instance.status}
"""
except Exception as e:
return f"Error creating instance: {str(e)}"
@mcp.tool()
def delete_instance(project_id: str, zone: str, instance_name: str) -> str:
"""
Delete a Compute Engine instance.
Args:
project_id: The ID of the GCP project
zone: The zone where the instance is located (e.g., "us-central1-a")
instance_name: The name of the instance to delete
Returns:
Status message indicating whether the instance was deleted successfully
"""
try:
from google.cloud import compute_v1
# Initialize the Compute Engine client
client = compute_v1.InstancesClient()
# Delete the instance
operation = client.delete(project=project_id, zone=zone, instance=instance_name)
# Wait for the operation to complete
operation_client = compute_v1.ZoneOperationsClient()
# This is a synchronous call that will wait until the operation is complete
while operation.status != compute_v1.Operation.Status.DONE:
operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1])
import time
time.sleep(1)
if operation.error:
return f"Error deleting instance {instance_name}: {operation.error.errors[0].message}"
return f"Instance {instance_name} in zone {zone} deleted successfully."
except Exception as e:
return f"Error deleting instance: {str(e)}"
@mcp.tool()
def create_snapshot(project_id: str, zone: str, disk_name: str, snapshot_name: str, description: str = "") -> str:
"""
Create a snapshot of a Compute Engine disk.
Args:
project_id: The ID of the GCP project
zone: The zone where the disk is located (e.g., "us-central1-a")
disk_name: The name of the disk to snapshot
snapshot_name: The name for the new snapshot
description: Optional description for the snapshot
Returns:
Status message indicating whether the snapshot was created successfully
"""
try:
from google.cloud import compute_v1
# Initialize the Disks client
disks_client = compute_v1.DisksClient()
# Create the snapshot request
snapshot = compute_v1.Snapshot()
snapshot.name = snapshot_name
if description:
snapshot.description = description
# Create the snapshot
operation = disks_client.create_snapshot(
project=project_id,
zone=zone,
disk=disk_name,
snapshot_resource=snapshot
)
# Wait for the operation to complete
operation_client = compute_v1.ZoneOperationsClient()
# This is a synchronous call that will wait until the operation is complete
while operation.status != compute_v1.Operation.Status.DONE:
operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1])
import time
time.sleep(1)
if operation.error:
return f"Error creating snapshot {snapshot_name}: {operation.error.errors[0].message}"
return f"Snapshot {snapshot_name} of disk {disk_name} in zone {zone} created successfully."
except Exception as e:
return f"Error creating snapshot: {str(e)}"
@mcp.tool()
def list_snapshots(project_id: str) -> str:
"""
List disk snapshots in a GCP project.
Args:
project_id: The ID of the GCP project to list snapshots for
Returns:
List of disk snapshots in the specified GCP project
"""
try:
from google.cloud import compute_v1
# Initialize the Snapshots client
client = compute_v1.SnapshotsClient()
# List snapshots
request = compute_v1.ListSnapshotsRequest(project=project_id)
snapshots = client.list(request=request)
# Format the response
snapshots_list = []
for snapshot in snapshots:
size_gb = snapshot.disk_size_gb
status = snapshot.status
source_disk = snapshot.source_disk.split('/')[-1] if snapshot.source_disk else "Unknown"
creation_time = snapshot.creation_timestamp if snapshot.creation_timestamp else "Unknown"
snapshots_list.append(f"- {snapshot.name} (Source: {source_disk}, Size: {size_gb} GB, Status: {status}, Created: {creation_time})")
if not snapshots_list:
return f"No snapshots found for project {project_id}."
snapshots_str = "\n".join(snapshots_list)
return f"""
Disk Snapshots in GCP Project {project_id}:
{snapshots_str}
"""
except Exception as e:
return f"Error listing snapshots: {str(e)}"
```