#
tokens: 38913/50000 36/36 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .gitignore
├── .python-version
├── pyproject.toml
├── pytest.ini
├── README.md
├── src
│   └── gcp_mcp
│       ├── __init__.py
│       ├── __main__.py
│       ├── gcp_modules
│       │   ├── __init__.py
│       │   ├── auth
│       │   │   ├── __init__.py
│       │   │   └── tools.py
│       │   ├── billing
│       │   │   ├── __init__.py
│       │   │   └── tools.py
│       │   ├── compute
│       │   │   ├── __init__.py
│       │   │   └── tools.py
│       │   ├── databases
│       │   │   ├── __init__.py
│       │   │   └── tools.py
│       │   ├── deployment
│       │   │   ├── __init__.py
│       │   │   └── tools.py
│       │   ├── iam
│       │   │   ├── __init__.py
│       │   │   └── tools.py
│       │   ├── kubernetes
│       │   │   ├── __init__.py
│       │   │   └── tools.py
│       │   ├── monitoring
│       │   │   ├── __init__.py
│       │   │   └── tools.py
│       │   ├── networking
│       │   │   ├── __init__.py
│       │   │   └── tools.py
│       │   ├── resource_management
│       │   │   ├── __init__.py
│       │   │   └── tools.py
│       │   └── storage
│       │       ├── __init__.py
│       │       └── tools.py
│       └── server.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── mock_gcp.py
│   └── unit
│       ├── __init__.py
│       └── test_gcp_functions.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
1 | 3.11.8
2 | 
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
 1 | # Python-generated files
 2 | __pycache__/
 3 | *.py[oc]
 4 | build/
 5 | dist/
 6 | wheels/
 7 | *.egg-info
 8 | 
 9 | # Virtual environments
10 | .venv
11 | 
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
 1 | # GCP MCP Application
 2 | 
 3 | ## Claude Desktop Integration
 4 | 
 5 | To enable GCP management capabilities in Claude Desktop, simply add the following configuration to your Claude Desktop MCP configuration:
 6 | 
 7 | ```json
 8 | {
 9 |   "gcp-mcp": {
10 |     "command": "uvx",
11 |     "args": [
12 |       "gcp-mcp"
13 |     ]
14 |   }
15 | }
16 | ```
17 | 
18 | That's it! No additional setup or credential manipulation is required. When you first ask Claude to interact with your GCP resources, a browser window will automatically open for you to authenticate and grant access. Once you approve the access, Claude will be able to manage your GCP resources through natural language commands.
19 | 
20 | Here are some example requests you can make:
21 | 
22 | Basic Operations:
23 | - "Could you list my GCP projects?"
24 | - "Show me my compute instances"
25 | - "What storage buckets do I have?"
26 | 
27 | Resource Creation:
28 | - "Please create a compute instance with 2GB RAM and 10GB storage, name it MCP-engine"
29 | - "Create a new storage bucket called my-backup-bucket in us-central1"
30 | - "Set up a new VPC network named prod-network with subnet 10.0.0.0/24"
31 | 
32 | Resource Management:
33 | - "Stop all compute instances in the dev project"
34 | - "Show me all instances that have been running for more than 24 hours"
35 | - "What's the current CPU usage of my instance named backend-server?"
36 | - "Create a snapshot of my database disk"
37 | 
38 | Monitoring and Alerts:
39 | - "Set up an alert for when CPU usage goes above 80%"
40 | - "Show me all critical alerts from the last 24 hours"
41 | - "What's the current status of my GKE clusters?"
42 | 
43 | ## Features
44 | 
45 | The application provides comprehensive coverage of GCP services:
46 | 
47 | ### Resource Management
48 | - Projects and quotas management
49 | - Asset inventory
50 | - IAM permissions
51 | 
52 | ### Compute & Infrastructure
53 | - Compute Engine instances
54 | - Storage buckets and disks
55 | - VPC networks and firewall rules
56 | - Kubernetes Engine (GKE) clusters
57 | 
58 | ### Databases & Storage
59 | - Cloud SQL instances
60 | - Firestore databases
61 | - Cloud Storage
62 | - Database backups
63 | 
64 | ### Monitoring & Billing
65 | - Metrics and alerts
66 | - Billing information
67 | - Uptime monitoring
68 | - Resource usage tracking
69 | 
70 | ### Coming Soon
71 | - Deployment manager and infrastructure as code
72 | 
73 | ## Installation
74 | 
75 | ```bash
76 | pip install gcp-mcp
77 | ```
78 | 
79 | ## License
80 | 
81 | [MIT License](LICENSE) 
82 | 
83 | Your contributions and issues are welcome !
84 | 
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/billing/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/compute/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/databases/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/deployment/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/iam/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/kubernetes/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/monitoring/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/networking/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/resource_management/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/storage/__init__.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/auth/__init__.py:
--------------------------------------------------------------------------------

```python
1 | # Authentication module for Google Cloud Platform
```

--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------

```python
1 | # This file is intentionally left empty to make the directory a proper Python package. 
```

--------------------------------------------------------------------------------
/tests/unit/__init__.py:
--------------------------------------------------------------------------------

```python
1 | # This file is intentionally left empty to make the directory a proper Python package. 
```

--------------------------------------------------------------------------------
/src/gcp_mcp/__init__.py:
--------------------------------------------------------------------------------

```python
1 | from . import server
2 | import asyncio
3 | import os
4 | import sys
5 | 
6 | def main():
7 |     server.mcp.run(transport='stdio')
8 | 
9 | __all__ = ['main', 'server']
```

--------------------------------------------------------------------------------
/src/gcp_mcp/__main__.py:
--------------------------------------------------------------------------------

```python
1 | from . import main
2 | 
3 | if __name__ == "__main__":
4 |     print("Starting gcp_mcp")
5 |     main()
6 | else:
7 |     print("Starting gcp_mcp from import")
8 |     main()
```

--------------------------------------------------------------------------------
/pytest.ini:
--------------------------------------------------------------------------------

```
1 | [pytest]
2 | testpaths = tests
3 | python_files = test_*.py
4 | python_classes = Test*
5 | python_functions = test_*
6 | markers =
7 |     asyncio: mark a test as an asyncio test
8 | addopts = -v 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [build-system]
 2 | requires = ["hatchling"]
 3 | build-backend = "hatchling.build"
 4 | 
 5 | [project]
 6 | name = "gcp-mcp"
 7 | version = "0.1.0"
 8 | description = "MCP interface for Google Cloud Platform"
 9 | readme = "README.md"
10 | requires-python = ">=3.11.8"
11 | dependencies = [
12 |     "httpx>=0.28.0",
13 |     "mcp>=1.3.0",
14 |     "google-cloud-resource-manager>=1.14.0",
15 |     "google-cloud-compute>=1.26.0",
16 |     "google-cloud-storage>=3.1.0",
17 |     "google-cloud-service-usage>=1.13.0",
18 |     "google-cloud-billing>=1.16.0",
19 |     "google-api-python-client>=2.163.0",
20 |     "google-cloud-monitoring>=2.22.0",
21 |     "google-cloud-logging>=3.9.0",
22 |     "google-cloud-container>=2.35.0",
23 |     "google-cloud-firestore>=2.16.0",
24 |     "google-cloud-bigtable>=2.19.0",
25 |     "google-cloud-spanner>=3.39.0",
26 |     "google-cloud-iam>=2.17.0",
27 |     "google-cloud-vpc-access>=1.4.0",
28 |     "google-cloud-asset>=3.25.0",
29 |     "google-auth>=2.16.0",
30 |     "google-auth-oauthlib>=1.2.0",
31 |     "google-auth-httplib2>=0.1.0"
32 | ]
33 | 
34 | [project.entry-points.console]
35 | gcp = "gcp_mcp:main"
36 | gcp-mcp = "gcp_mcp:main"
37 | 
38 | [project.scripts]
39 | gcp-mcp = "gcp_mcp:main"
```

--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------

```python
 1 | import pytest
 2 | import sys
 3 | from unittest.mock import MagicMock
 4 | 
 5 | # Mock the Google Cloud libraries
 6 | MOCK_MODULES = [
 7 |     'google.cloud.resourcemanager_v3',
 8 |     'google.cloud.service_usage',
 9 |     'google.cloud.compute_v1',
10 |     'google.cloud.storage',
11 |     'google.cloud.billing.v1',
12 |     'google.auth',
13 |     'google.iam.v1',
14 |     'google.auth.exceptions',
15 |     'googleapiclient',
16 |     'googleapiclient.discovery'
17 | ]
18 | 
19 | for mod_name in MOCK_MODULES:
20 |     sys.modules[mod_name] = MagicMock()
21 | 
22 | # Mock specific classes
23 | sys.modules['google.cloud.resourcemanager_v3'].ProjectsClient = MagicMock
24 | sys.modules['google.cloud.service_usage'].ServiceUsageClient = MagicMock
25 | sys.modules['google.cloud.compute_v1'].InstancesClient = MagicMock
26 | sys.modules['google.cloud.compute_v1'].ZonesClient = MagicMock
27 | sys.modules['google.cloud.storage'].Client = MagicMock
28 | sys.modules['google.cloud.billing.v1'].CloudBillingClient = MagicMock
29 | sys.modules['google.cloud.billing.v1'].CloudCatalogClient = MagicMock
30 | sys.modules['google.auth'].default = MagicMock
31 | sys.modules['google.iam.v1'].iam_policy_pb2 = MagicMock
32 | sys.modules['googleapiclient.discovery'].build = MagicMock 
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/deployment/tools.py:
--------------------------------------------------------------------------------

```python
 1 | """
 2 | Google Cloud Platform Deployment tools.
 3 | """
 4 | from typing import List, Dict, Any, Optional
 5 | 
 6 | def register_tools(mcp):
 7 |     """Register all deployment tools with the MCP server."""
 8 |     
 9 |     @mcp.tool()
10 |     def list_deployment_manager_deployments(project_id: str) -> str:
11 |         """
12 |         List Deployment Manager deployments in a GCP project.
13 |         
14 |         Args:
15 |             project_id: The ID of the GCP project to list deployments for
16 |         
17 |         Returns:
18 |             List of Deployment Manager deployments in the specified GCP project
19 |         """
20 |         # TODO: Implement this function
21 |         return f"Not yet implemented: listing deployments for project {project_id}"
22 |     
23 |     @mcp.tool()
24 |     def get_deployment_details(project_id: str, deployment_name: str) -> str:
25 |         """
26 |         Get details of a specific Deployment Manager deployment.
27 |         
28 |         Args:
29 |             project_id: The ID of the GCP project
30 |             deployment_name: The name of the deployment to get details for
31 |         
32 |         Returns:
33 |             Details of the specified deployment
34 |         """
35 |         # TODO: Implement this function
36 |         return f"Not yet implemented: getting details for deployment {deployment_name} in project {project_id}"
37 |     
38 |     @mcp.tool()
39 |     def list_cloud_build_triggers(project_id: str) -> str:
40 |         """
41 |         List Cloud Build triggers in a GCP project.
42 |         
43 |         Args:
44 |             project_id: The ID of the GCP project to list build triggers for
45 |         
46 |         Returns:
47 |             List of Cloud Build triggers in the specified GCP project
48 |         """
49 |         # TODO: Implement this function
50 |         return f"Not yet implemented: listing Cloud Build triggers for project {project_id}"
```

--------------------------------------------------------------------------------
/src/gcp_mcp/server.py:
--------------------------------------------------------------------------------

```python
 1 | from typing import Any
 2 | import httpx
 3 | from mcp.server.fastmcp import FastMCP
 4 | 
 5 | # Import all modules
 6 | from .gcp_modules.resource_management import tools as resource_tools
 7 | from .gcp_modules.iam import tools as iam_tools
 8 | from .gcp_modules.compute import tools as compute_tools
 9 | from .gcp_modules.storage import tools as storage_tools
10 | from .gcp_modules.billing import tools as billing_tools
11 | from .gcp_modules.networking import tools as networking_tools
12 | from .gcp_modules.kubernetes import tools as kubernetes_tools
13 | from .gcp_modules.monitoring import tools as monitoring_tools
14 | from .gcp_modules.databases import tools as databases_tools
15 | from .gcp_modules.deployment import tools as deployment_tools
16 | from .gcp_modules.auth import tools as auth_tools
17 | 
18 | # Initialize FastMCP server
19 | mcp = FastMCP("gcp")
20 | 
21 | # A simple test function
22 | @mcp.tool()
23 | async def say_hello(name: str) -> str:
24 |     """Say hello to a person."""
25 |     return f"Hello, {name}!"
26 | 
27 | # Register all module tools
28 | def register_tools():
29 |     # Register authentication tools (placed first for visibility)
30 |     auth_tools.register_tools(mcp)
31 |     
32 |     # Register resource management tools
33 |     resource_tools.register_tools(mcp)
34 |     
35 |     # Register IAM tools
36 |     iam_tools.register_tools(mcp)
37 |     
38 |     # Register compute tools
39 |     compute_tools.register_tools(mcp)
40 |     
41 |     # Register storage tools
42 |     storage_tools.register_tools(mcp)
43 |     
44 |     # Register billing tools
45 |     billing_tools.register_tools(mcp)
46 |     
47 |     # Register networking tools
48 |     networking_tools.register_tools(mcp)
49 |     
50 |     # Register kubernetes tools
51 |     kubernetes_tools.register_tools(mcp)
52 |     
53 |     # Register monitoring tools
54 |     monitoring_tools.register_tools(mcp)
55 |     
56 |     # Register databases tools
57 |     databases_tools.register_tools(mcp)
58 |     
59 |     # Register deployment tools
60 |     deployment_tools.register_tools(mcp)
61 | 
62 | # Register all tools
63 | register_tools()
64 | 
65 | # if __name__ == "__main__":
66 | #     mcp.run(transport='stdio')
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/billing/tools.py:
--------------------------------------------------------------------------------

```python
 1 | """
 2 | Google Cloud Platform Billing tools.
 3 | """
 4 | from typing import List, Dict, Any, Optional
 5 | 
 6 | def register_tools(mcp):
 7 |     """Register all billing tools with the MCP server."""
 8 |     
 9 |     @mcp.tool()
10 |     def get_billing_info(project_id: str) -> str:
11 |         """
12 |         Get billing information for a GCP project.
13 |         
14 |         Args:
15 |             project_id: The ID of the GCP project to get billing information for
16 |         
17 |         Returns:
18 |             Billing information for the specified GCP project
19 |         """
20 |         try:
21 |             try:
22 |                 from google.cloud import billing_v1
23 |             except ImportError:
24 |                 return "Error: The Google Cloud billing library is not installed. Please install it with 'pip install google-cloud-billing'."
25 |             
26 |             # Initialize the Cloud Billing client
27 |             billing_client = billing_v1.CloudBillingClient()
28 |             
29 |             # Get the billing account for the project
30 |             project_name = f"projects/{project_id}"
31 |             billing_info = billing_client.get_project_billing_info(name=project_name)
32 |             
33 |             # If billing is enabled, get more details about the billing account
34 |             if billing_info.billing_account_name:
35 |                 billing_account = billing_client.get_billing_account(
36 |                     name=billing_info.billing_account_name
37 |                 )
38 |                 
39 |                 # Initialize the Cloud Catalog client to get pricing information
40 |                 catalog_client = billing_v1.CloudCatalogClient()
41 |                 
42 |                 # Format the response
43 |                 return f"""
44 | Billing Information for GCP Project {project_id}:
45 | 
46 | Billing Enabled: {billing_info.billing_enabled}
47 | Billing Account: {billing_info.billing_account_name}
48 | Display Name: {billing_account.display_name}
49 | Open: {billing_account.open}
50 | """
51 |             else:
52 |                 return f"Billing is not enabled for project {project_id}."
53 |         except Exception as e:
54 |             return f"Error getting billing information: {str(e)}"
```

--------------------------------------------------------------------------------
/tests/mock_gcp.py:
--------------------------------------------------------------------------------

```python
 1 | """
 2 | Mock functions for testing GCP functionality.
 3 | """
 4 | 
 5 | def list_gcp_projects():
 6 |     """Mock function to list GCP projects."""
 7 |     return ["test-project-1", "test-project-2", "test-project-3"]
 8 | 
 9 | def get_gcp_project_details(project_id):
10 |     """Mock function to get details of a GCP project."""
11 |     return f"""
12 | Project ID: {project_id}
13 | Name: Test Project
14 | Created: 2023-01-01T00:00:00Z
15 | Status: ACTIVE
16 | Labels:
17 |   - env: test
18 |   - department: engineering
19 | """
20 | 
21 | def list_gcp_services(project_id):
22 |     """Mock function to list enabled services in a GCP project."""
23 |     return f"""
24 | Enabled services in project {project_id}:
25 |   - compute.googleapis.com: Compute Engine API
26 |   - storage.googleapis.com: Cloud Storage API
27 |   - iam.googleapis.com: Identity and Access Management (IAM) API
28 | """
29 | 
30 | def list_compute_instances(project_id, zone=None):
31 |     """Mock function to list Compute Engine instances."""
32 |     zone_str = f" in zone {zone}" if zone else ""
33 |     return f"""
34 | Compute Engine instances in project {project_id}{zone_str}:
35 |   - instance-1 (n1-standard-1): RUNNING
36 |     Zone: us-central1-a
37 |     Created: 2023-01-01 00:00:00 UTC
38 |     
39 |   - instance-2 (n1-standard-2): STOPPED
40 |     Zone: us-central1-a
41 |     Created: 2023-02-01 00:00:00 UTC
42 | """
43 | 
44 | def check_iam_permissions(project_id):
45 |     """Mock function to check IAM permissions in a GCP project."""
46 |     return f"""
47 | IAM permissions in project {project_id}:
48 |   - roles/viewer: [email protected]
49 |   - roles/editor: [email protected]
50 | """
51 | 
52 | def list_storage_buckets(project_id):
53 |     """Mock function to list Cloud Storage buckets in a GCP project."""
54 |     return f"""
55 | Cloud Storage buckets in project {project_id}:
56 |   - test-bucket-1
57 |     Location: us-central1
58 |     Storage class: STANDARD
59 |     Created: 2023-01-01 00:00:00 UTC
60 |     
61 |   - test-bucket-2
62 |     Location: us-east1
63 |     Storage class: NEARLINE
64 |     Created: 2023-02-01 00:00:00 UTC
65 | """
66 | 
67 | def get_billing_info(project_id):
68 |     """Mock function to get billing information for a GCP project."""
69 |     return f"""
70 | Billing information for project {project_id}:
71 |   - Billing account: 123456-ABCDEF-123456
72 |   - Billing account name: My Billing Account
73 |   - Billing account status: Open
74 |   - Billing enabled: Yes
75 |   - Currency: USD
76 | """ 
```

--------------------------------------------------------------------------------
/tests/unit/test_gcp_functions.py:
--------------------------------------------------------------------------------

```python
  1 | import pytest
  2 | from unittest.mock import patch, MagicMock
  3 | import sys
  4 | import os
  5 | 
  6 | # Add the parent directory to the Python path so we can import the mock modules
  7 | sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
  8 | 
  9 | # Import the functions to test from the mock modules
 10 | from tests.mock_gcp import (
 11 |     list_gcp_projects,
 12 |     get_gcp_project_details,
 13 |     list_gcp_services,
 14 |     list_compute_instances,
 15 |     check_iam_permissions,
 16 |     list_storage_buckets,
 17 |     get_billing_info
 18 | )
 19 | 
 20 | 
 21 | class TestGCPFunctions:
 22 |     """Test class for GCP-related functions."""
 23 | 
 24 |     def test_list_gcp_projects(self):
 25 |         """Test the list_gcp_projects function."""
 26 |         # Call the function
 27 |         result = list_gcp_projects()
 28 |         
 29 |         # Assertions
 30 |         assert isinstance(result, list)
 31 |         assert "test-project-1" in result
 32 |         assert "test-project-2" in result
 33 |         assert "test-project-3" in result
 34 |         assert len(result) == 3
 35 | 
 36 |     def test_get_gcp_project_details(self):
 37 |         """Test the get_gcp_project_details function."""
 38 |         # Call the function
 39 |         result = get_gcp_project_details("test-project-id")
 40 |         
 41 |         # Assertions
 42 |         assert isinstance(result, str)
 43 |         assert "Test Project" in result
 44 |         assert "2023-01-01T00:00:00Z" in result
 45 |         assert "ACTIVE" in result
 46 |         assert "env: test" in result
 47 |         assert "department: engineering" in result
 48 | 
 49 |     def test_list_gcp_services(self):
 50 |         """Test the list_gcp_services function."""
 51 |         # Call the function
 52 |         result = list_gcp_services("test-project")
 53 |         
 54 |         # Assertions
 55 |         assert isinstance(result, str)
 56 |         assert "compute.googleapis.com: Compute Engine API" in result
 57 |         assert "storage.googleapis.com: Cloud Storage API" in result
 58 |         assert "iam.googleapis.com: Identity and Access Management (IAM) API" in result
 59 | 
 60 |     def test_list_compute_instances_with_zone(self):
 61 |         """Test the list_compute_instances function with a specified zone."""
 62 |         # Call the function with a specified zone
 63 |         result = list_compute_instances("test-project", "us-central1-a")
 64 |         
 65 |         # Assertions
 66 |         assert isinstance(result, str)
 67 |         assert "instance-1" in result
 68 |         assert "instance-2" in result
 69 |         assert "n1-standard-1" in result
 70 |         assert "n1-standard-2" in result
 71 |         assert "RUNNING" in result
 72 |         assert "STOPPED" in result
 73 |         assert "us-central1-a" in result
 74 | 
 75 |     def test_check_iam_permissions(self):
 76 |         """Test the check_iam_permissions function."""
 77 |         # Call the function
 78 |         result = check_iam_permissions("test-project")
 79 |         
 80 |         # Assertions
 81 |         assert isinstance(result, str)
 82 |         assert "roles/viewer" in result
 83 |         assert "roles/editor" in result
 84 |         assert "[email protected]" in result
 85 | 
 86 |     def test_list_storage_buckets(self):
 87 |         """Test the list_storage_buckets function."""
 88 |         # Call the function
 89 |         result = list_storage_buckets("test-project")
 90 |         
 91 |         # Assertions
 92 |         assert isinstance(result, str)
 93 |         assert "test-bucket-1" in result
 94 |         assert "test-bucket-2" in result
 95 |         assert "us-central1" in result
 96 |         assert "us-east1" in result
 97 |         assert "STANDARD" in result
 98 |         assert "NEARLINE" in result
 99 |         assert "2023-01-01 00:00:00 UTC" in result
100 |         assert "2023-02-01 00:00:00 UTC" in result
101 | 
102 |     def test_get_billing_info(self):
103 |         """Test the get_billing_info function."""
104 |         # Call the function
105 |         result = get_billing_info("test-project")
106 |         
107 |         # Assertions
108 |         assert isinstance(result, str)
109 |         assert "123456-ABCDEF-123456" in result
110 |         assert "My Billing Account" in result
111 |         assert "Open" in result
112 |         assert "Yes" in result  # billing_enabled
113 |         assert "USD" in result 
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/resource_management/tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Google Cloud Platform Resource Management tools.
  3 | """
  4 | from typing import List, Dict, Any, Optional
  5 | 
  6 | def register_tools(mcp):
  7 |     """Register all resource management tools with the MCP server."""
  8 |     
  9 |     @mcp.tool()
 10 |     def list_gcp_projects():
 11 |         """
 12 |         List all available GCP projects for the authenticated user.
 13 |         
 14 |         Returns:
 15 |             List of project IDs
 16 |         """
 17 |         try:
 18 |             from google.cloud import resourcemanager_v3
 19 |             client = resourcemanager_v3.ProjectsClient()
 20 |             request = resourcemanager_v3.SearchProjectsRequest()
 21 |             response = client.search_projects(request=request)
 22 |             return [project.project_id for project in response]
 23 |         except Exception as e:
 24 |             return [f"Error listing GCP projects: {str(e)}"]
 25 | 
 26 |     @mcp.tool()
 27 |     def get_gcp_project_details(project_id: str) -> str:
 28 |         """
 29 |         Get detailed information about a specific GCP project.
 30 |         
 31 |         Args:
 32 |             project_id: The ID of the GCP project to get details for
 33 |         
 34 |         Returns:
 35 |             Detailed information about the specified GCP project
 36 |         """
 37 |         try:
 38 |             from google.cloud import resourcemanager_v3
 39 |             
 40 |             # Initialize the Resource Manager client
 41 |             client = resourcemanager_v3.ProjectsClient()
 42 |             
 43 |             # Get the project details
 44 |             name = f"projects/{project_id}"
 45 |             project = client.get_project(name=name)
 46 |             
 47 |             # Format the response
 48 |             project_number = project.name.split('/')[-1] if project.name else "N/A"
 49 |             display_name = project.display_name or "N/A"
 50 |             create_time = project.create_time.isoformat() if project.create_time else "N/A"
 51 |             state = project.state.name if project.state else "N/A"
 52 |             
 53 |             labels = dict(project.labels) if project.labels else {}
 54 |             labels_str = "\n".join([f"  {k}: {v}" for k, v in labels.items()]) if labels else "  None"
 55 |             
 56 |             return f"""
 57 | GCP Project Details for {project_id}:
 58 | Project Number: {project_number}
 59 | Name: {display_name}
 60 | Creation Time: {create_time}
 61 | State: {state}
 62 | Labels:
 63 | {labels_str}
 64 | """
 65 |         except Exception as e:
 66 |             return f"Error getting GCP project details: {str(e)}"
 67 | 
 68 |     @mcp.tool()
 69 |     def list_assets(project_id: str, asset_types: Optional[List[str]] = None, page_size: int = 50) -> str:
 70 |         """
 71 |         List assets in a GCP project using Cloud Asset Inventory API.
 72 |         
 73 |         Args:
 74 |             project_id: The ID of the GCP project to list assets for
 75 |             asset_types: Optional list of asset types to filter by (e.g., ["compute.googleapis.com/Instance"])
 76 |             page_size: Number of assets to return per page (default: 50, max: 1000)
 77 |         
 78 |         Returns:
 79 |             List of assets in the specified GCP project
 80 |         """
 81 |         try:
 82 |             try:
 83 |                 from google.cloud import asset_v1
 84 |             except ImportError:
 85 |                 return "Error: The Google Cloud Asset Inventory library is not installed. Please install it with 'pip install google-cloud-asset'."
 86 |             
 87 |             # Initialize the Asset client
 88 |             client = asset_v1.AssetServiceClient()
 89 |             
 90 |             # Format the parent resource
 91 |             parent = f"projects/{project_id}"
 92 |             
 93 |             # Create the request
 94 |             request = asset_v1.ListAssetsRequest(
 95 |                 parent=parent,
 96 |                 content_type=asset_v1.ContentType.RESOURCE,
 97 |                 page_size=min(page_size, 1000)  # API limit is 1000
 98 |             )
 99 |             
100 |             # Add asset types filter if provided
101 |             if asset_types:
102 |                 request.asset_types = asset_types
103 |             
104 |             # List assets
105 |             response = client.list_assets(request=request)
106 |             
107 |             # Format the response
108 |             assets_list = []
109 |             for asset in response:
110 |                 asset_type = asset.asset_type
111 |                 name = asset.name
112 |                 display_name = asset.display_name if hasattr(asset, 'display_name') and asset.display_name else name.split('/')[-1]
113 |                 
114 |                 # Extract location if available
115 |                 location = "global"
116 |                 if hasattr(asset.resource, 'location') and asset.resource.location:
117 |                     location = asset.resource.location
118 |                 
119 |                 assets_list.append(f"- {display_name} ({asset_type})\n  Location: {location}\n  Name: {name}")
120 |             
121 |             if not assets_list:
122 |                 filter_msg = f" with types {asset_types}" if asset_types else ""
123 |                 return f"No assets found{filter_msg} in project {project_id}."
124 |             
125 |             # Add pagination info if there's a next page token
126 |             pagination_info = ""
127 |             if hasattr(response, 'next_page_token') and response.next_page_token:
128 |                 pagination_info = "\n\nMore assets are available. Refine your search or increase page_size to see more."
129 |             
130 |             return f"Assets in GCP Project {project_id}:\n\n" + "\n\n".join(assets_list) + pagination_info
131 |         except Exception as e:
132 |             return f"Error listing assets: {str(e)}"
133 | 
134 |     @mcp.tool()
135 |     def set_quota_project(project_id: str) -> str:
136 |         """
137 |         Set a quota project for Google Cloud API requests.
138 |         
139 |         This helps resolve the warning: "Your application has authenticated using end user credentials 
140 |         from Google Cloud SDK without a quota project."
141 |         
142 |         Args:
143 |             project_id: The ID of the GCP project to use for quota attribution
144 |         
145 |         Returns:
146 |             Confirmation message if successful, error message otherwise
147 |         """
148 |         try:
149 |             try:
150 |                 import google.auth
151 |                 from google.auth import exceptions
152 |                 import os
153 |             except ImportError:
154 |                 return "Error: Required libraries not installed. Please install with 'pip install google-auth'."
155 |             
156 |             # Set the quota project in the environment variable
157 |             os.environ["GOOGLE_CLOUD_QUOTA_PROJECT"] = project_id
158 |             
159 |             # Try to get credentials with the quota project
160 |             try:
161 |                 # Get the current credentials
162 |                 credentials, project = google.auth.default()
163 |                 
164 |                 # Set the quota project on the credentials if supported
165 |                 if hasattr(credentials, "with_quota_project"):
166 |                     credentials = credentials.with_quota_project(project_id)
167 |                     
168 |                     # Save the credentials back (this depends on the credential type)
169 |                     # This is a best-effort approach
170 |                     try:
171 |                         if hasattr(google.auth, "_default_credentials"):
172 |                             google.auth._default_credentials = credentials
173 |                     except:
174 |                         pass
175 |                     
176 |                     return f"Successfully set quota project to '{project_id}'. New API requests will use this project for quota attribution."
177 |                 else:
178 |                     return f"Set environment variable GOOGLE_CLOUD_QUOTA_PROJECT={project_id}, but your credential type doesn't support quota projects directly."
179 |             except exceptions.GoogleAuthError as e:
180 |                 return f"Error setting quota project: {str(e)}"
181 |         except Exception as e:
182 |             return f"Error setting quota project: {str(e)}"
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/storage/tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Google Cloud Platform Storage tools.
  3 | """
  4 | from typing import List, Dict, Any, Optional
  5 | 
  6 | def register_tools(mcp):
  7 |     """Register all storage tools with the MCP server."""
  8 |     
  9 |     @mcp.tool()
 10 |     def list_storage_buckets(project_id: str) -> str:
 11 |         """
 12 |         List Cloud Storage buckets in a GCP project.
 13 |         
 14 |         Args:
 15 |             project_id: The ID of the GCP project to list buckets for
 16 |         
 17 |         Returns:
 18 |             List of Cloud Storage buckets in the specified GCP project
 19 |         """
 20 |         try:
 21 |             from google.cloud import storage
 22 |             
 23 |             # Initialize the Storage client
 24 |             client = storage.Client(project=project_id)
 25 |             
 26 |             # List buckets
 27 |             buckets = client.list_buckets()
 28 |             
 29 |             # Format the response
 30 |             buckets_list = []
 31 |             for bucket in buckets:
 32 |                 location = bucket.location or "Unknown"
 33 |                 storage_class = bucket.storage_class or "Unknown"
 34 |                 created = bucket.time_created.strftime("%Y-%m-%d %H:%M:%S UTC") if bucket.time_created else "Unknown"
 35 |                 buckets_list.append(f"- {bucket.name} (Location: {location}, Class: {storage_class}, Created: {created})")
 36 |             
 37 |             if not buckets_list:
 38 |                 return f"No Cloud Storage buckets found in project {project_id}."
 39 |             
 40 |             buckets_str = "\n".join(buckets_list)
 41 |             
 42 |             return f"""
 43 | Cloud Storage Buckets in GCP Project {project_id}:
 44 | {buckets_str}
 45 | """
 46 |         except Exception as e:
 47 |             return f"Error listing Cloud Storage buckets: {str(e)}"
 48 |     
 49 |     @mcp.tool()
 50 |     def get_bucket_details(project_id: str, bucket_name: str) -> str:
 51 |         """
 52 |         Get detailed information about a specific Cloud Storage bucket.
 53 |         
 54 |         Args:
 55 |             project_id: The ID of the GCP project
 56 |             bucket_name: The name of the bucket to get details for
 57 |         
 58 |         Returns:
 59 |             Detailed information about the specified Cloud Storage bucket
 60 |         """
 61 |         try:
 62 |             from google.cloud import storage
 63 |             
 64 |             # Initialize the Storage client
 65 |             client = storage.Client(project=project_id)
 66 |             
 67 |             # Get the bucket
 68 |             bucket = client.get_bucket(bucket_name)
 69 |             
 70 |             # Format the response
 71 |             details = []
 72 |             details.append(f"Name: {bucket.name}")
 73 |             details.append(f"Project: {project_id}")
 74 |             details.append(f"Location: {bucket.location or 'Unknown'}")
 75 |             details.append(f"Storage Class: {bucket.storage_class or 'Unknown'}")
 76 |             details.append(f"Created: {bucket.time_created.strftime('%Y-%m-%d %H:%M:%S UTC') if bucket.time_created else 'Unknown'}")
 77 |             details.append(f"Versioning Enabled: {bucket.versioning_enabled}")
 78 |             details.append(f"Requester Pays: {bucket.requester_pays}")
 79 |             details.append(f"Lifecycle Rules: {len(bucket.lifecycle_rules) if bucket.lifecycle_rules else 0} rules defined")
 80 |             details.append(f"Labels: {bucket.labels or 'None'}")
 81 |             details.append(f"CORS: {bucket.cors or 'None'}")
 82 |             
 83 |             details_str = "\n".join(details)
 84 |             
 85 |             return f"""
 86 | Cloud Storage Bucket Details:
 87 | {details_str}
 88 | """
 89 |         except Exception as e:
 90 |             return f"Error getting bucket details: {str(e)}"
 91 |     
 92 |     @mcp.tool()
 93 |     def list_objects(project_id: str, bucket_name: str, prefix: Optional[str] = None, limit: int = 100) -> str:
 94 |         """
 95 |         List objects in a Cloud Storage bucket.
 96 |         
 97 |         Args:
 98 |             project_id: The ID of the GCP project
 99 |             bucket_name: The name of the bucket to list objects from
100 |             prefix: Optional prefix to filter objects by
101 |             limit: Maximum number of objects to list (default: 100)
102 |         
103 |         Returns:
104 |             List of objects in the specified Cloud Storage bucket
105 |         """
106 |         try:
107 |             from google.cloud import storage
108 |             
109 |             # Initialize the Storage client
110 |             client = storage.Client(project=project_id)
111 |             
112 |             # Get the bucket
113 |             bucket = client.get_bucket(bucket_name)
114 |             
115 |             # List blobs
116 |             blobs = bucket.list_blobs(prefix=prefix, max_results=limit)
117 |             
118 |             # Format the response
119 |             objects_list = []
120 |             for blob in blobs:
121 |                 size_mb = blob.size / (1024 * 1024)
122 |                 updated = blob.updated.strftime("%Y-%m-%d %H:%M:%S UTC") if blob.updated else "Unknown"
123 |                 objects_list.append(f"- {blob.name} (Size: {size_mb:.2f} MB, Updated: {updated}, Content-Type: {blob.content_type})")
124 |             
125 |             if not objects_list:
126 |                 return f"No objects found in bucket {bucket_name}{' with prefix ' + prefix if prefix else ''}."
127 |             
128 |             objects_str = "\n".join(objects_list)
129 |             
130 |             return f"""
131 | Objects in Cloud Storage Bucket {bucket_name}{' with prefix ' + prefix if prefix else ''}:
132 | {objects_str}
133 | """
134 |         except Exception as e:
135 |             return f"Error listing objects: {str(e)}"
136 |     
137 |     @mcp.tool()
138 |     def upload_object(project_id: str, bucket_name: str, source_file_path: str, destination_blob_name: Optional[str] = None, content_type: Optional[str] = None) -> str:
139 |         """
140 |         Upload a file to a Cloud Storage bucket.
141 |         
142 |         Args:
143 |             project_id: The ID of the GCP project
144 |             bucket_name: The name of the bucket to upload to
145 |             source_file_path: The local file path to upload
146 |             destination_blob_name: The name to give the file in GCS (default: filename from source)
147 |             content_type: The content type of the file (default: auto-detect)
148 |         
149 |         Returns:
150 |             Result of the upload operation
151 |         """
152 |         try:
153 |             import os
154 |             from google.cloud import storage
155 |             
156 |             # Initialize the Storage client
157 |             client = storage.Client(project=project_id)
158 |             
159 |             # Get the bucket
160 |             bucket = client.get_bucket(bucket_name)
161 |             
162 |             # If no destination name is provided, use the source filename
163 |             if not destination_blob_name:
164 |                 destination_blob_name = os.path.basename(source_file_path)
165 |             
166 |             # Create a blob object
167 |             blob = bucket.blob(destination_blob_name)
168 |             
169 |             # Upload the file
170 |             blob.upload_from_filename(source_file_path, content_type=content_type)
171 |             
172 |             return f"""
173 | File successfully uploaded:
174 | - Source: {source_file_path}
175 | - Destination: gs://{bucket_name}/{destination_blob_name}
176 | - Size: {blob.size / (1024 * 1024):.2f} MB
177 | - Content-Type: {blob.content_type}
178 | """
179 |         except Exception as e:
180 |             return f"Error uploading file: {str(e)}"
181 |     
182 |     @mcp.tool()
183 |     def download_object(project_id: str, bucket_name: str, source_blob_name: str, destination_file_path: str) -> str:
184 |         """
185 |         Download a file from a Cloud Storage bucket.
186 |         
187 |         Args:
188 |             project_id: The ID of the GCP project
189 |             bucket_name: The name of the bucket to download from
190 |             source_blob_name: The name of the file in the bucket
191 |             destination_file_path: The local path to save the file to
192 |         
193 |         Returns:
194 |             Result of the download operation
195 |         """
196 |         try:
197 |             from google.cloud import storage
198 |             
199 |             # Initialize the Storage client
200 |             client = storage.Client(project=project_id)
201 |             
202 |             # Get the bucket
203 |             bucket = client.get_bucket(bucket_name)
204 |             
205 |             # Get the blob
206 |             blob = bucket.blob(source_blob_name)
207 |             
208 |             # Download the file
209 |             blob.download_to_filename(destination_file_path)
210 |             
211 |             return f"""
212 | File successfully downloaded:
213 | - Source: gs://{bucket_name}/{source_blob_name}
214 | - Destination: {destination_file_path}
215 | - Size: {blob.size / (1024 * 1024):.2f} MB
216 | - Content-Type: {blob.content_type}
217 | """
218 |         except Exception as e:
219 |             return f"Error downloading file: {str(e)}"
220 |     
221 |     @mcp.tool()
222 |     def delete_object(project_id: str, bucket_name: str, blob_name: str) -> str:
223 |         """
224 |         Delete an object from a Cloud Storage bucket.
225 |         
226 |         Args:
227 |             project_id: The ID of the GCP project
228 |             bucket_name: The name of the bucket to delete from
229 |             blob_name: The name of the file to delete
230 |         
231 |         Returns:
232 |             Result of the delete operation
233 |         """
234 |         try:
235 |             from google.cloud import storage
236 |             
237 |             # Initialize the Storage client
238 |             client = storage.Client(project=project_id)
239 |             
240 |             # Get the bucket
241 |             bucket = client.get_bucket(bucket_name)
242 |             
243 |             # Delete the blob
244 |             blob = bucket.blob(blob_name)
245 |             blob.delete()
246 |             
247 |             return f"Object gs://{bucket_name}/{blob_name} has been successfully deleted."
248 |         except Exception as e:
249 |             return f"Error deleting object: {str(e)}"
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/iam/tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Google Cloud Platform IAM tools.
  3 | """
  4 | from typing import List, Dict, Any, Optional
  5 | 
  6 | def register_tools(mcp):
  7 |     """Register all IAM tools with the MCP server."""
  8 |     
  9 |     @mcp.tool()
 10 |     def check_iam_permissions(project_id: str) -> str:
 11 |         """
 12 |         Check IAM permissions for the current user in a GCP project.
 13 |         
 14 |         Args:
 15 |             project_id: The ID of the GCP project to check permissions for
 16 |         
 17 |         Returns:
 18 |             List of IAM permissions for the current user in the specified GCP project
 19 |         """
 20 |         try:
 21 |             from google.cloud import resourcemanager_v3
 22 |             from google.iam.v1 import iam_policy_pb2
 23 |             
 24 |             # Initialize the Resource Manager client
 25 |             client = resourcemanager_v3.ProjectsClient()
 26 |             
 27 |             # Get the IAM policy for the project
 28 |             request = iam_policy_pb2.GetIamPolicyRequest(
 29 |                 resource=f"projects/{project_id}"
 30 |             )
 31 |             policy = client.get_iam_policy(request=request)
 32 |             
 33 |             # Get the current user
 34 |             import google.auth
 35 |             credentials, _ = google.auth.default()
 36 |             user = credentials.service_account_email if hasattr(credentials, 'service_account_email') else "current user"
 37 |             
 38 |             # Check which roles the user has
 39 |             user_bindings = []
 40 |             for binding in policy.bindings:
 41 |                 role = binding.role
 42 |                 members = binding.members
 43 |                 
 44 |                 # Check if the current user is in the members list
 45 |                 for member in members:
 46 |                     if member == f"user:{user}" or member == "serviceAccount:{user}" or member == "allUsers" or member == "allAuthenticatedUsers":
 47 |                         user_bindings.append(f"- {role}")
 48 |                         break
 49 |             
 50 |             if not user_bindings:
 51 |                 return f"No explicit IAM permissions found for {user} in project {project_id}."
 52 |             
 53 |             user_bindings_str = "\n".join(user_bindings)
 54 |             
 55 |             return f"""
 56 | IAM Permissions for {user} in GCP Project {project_id}:
 57 | {user_bindings_str}
 58 | """
 59 |         except Exception as e:
 60 |             return f"Error checking IAM permissions: {str(e)}"
 61 |     
 62 |     @mcp.tool()
 63 |     def list_roles(project_id: Optional[str] = None) -> str:
 64 |         """
 65 |         List IAM roles (predefined or custom).
 66 |         
 67 |         Args:
 68 |             project_id: Optional project ID for listing custom roles. If not provided, lists predefined roles.
 69 |         
 70 |         Returns:
 71 |             List of IAM roles
 72 |         """
 73 |         try:
 74 |             from google.cloud import iam_v1
 75 |             
 76 |             # Initialize the IAM client
 77 |             client = iam_v1.IAMClient()
 78 |             
 79 |             roles_list = []
 80 |             
 81 |             if project_id:
 82 |                 # List custom roles for the project
 83 |                 request = iam_v1.ListRolesRequest(
 84 |                     parent=f"projects/{project_id}",
 85 |                     view=iam_v1.ListRolesRequest.RoleView.FULL
 86 |                 )
 87 |                 roles = client.list_roles(request=request)
 88 |                 
 89 |                 for role in roles:
 90 |                     description = role.description or "No description"
 91 |                     roles_list.append(f"- {role.name} - {description}")
 92 |                 
 93 |                 if not roles_list:
 94 |                     return f"No custom IAM roles found in project {project_id}."
 95 |                 
 96 |                 return f"""
 97 | Custom IAM Roles in GCP Project {project_id}:
 98 | {chr(10).join(roles_list)}
 99 | """
100 |             else:
101 |                 # List predefined roles
102 |                 request = iam_v1.ListRolesRequest(
103 |                     view=iam_v1.ListRolesRequest.RoleView.BASIC
104 |                 )
105 |                 roles = client.list_roles(request=request)
106 |                 
107 |                 for role in roles:
108 |                     if role.name.startswith("roles/"):
109 |                         description = role.description or "No description"
110 |                         roles_list.append(f"- {role.name} - {description}")
111 |                 
112 |                 if not roles_list:
113 |                     return "No predefined IAM roles found."
114 |                 
115 |                 return f"""
116 | Predefined IAM Roles in GCP:
117 | {chr(10).join(roles_list[:100])}
118 | (Limited to 100 roles. To see more specific roles, narrow your search criteria.)
119 | """
120 |         except Exception as e:
121 |             return f"Error listing IAM roles: {str(e)}"
122 |     
123 |     @mcp.tool()
124 |     def get_role_permissions(role_name: str, project_id: Optional[str] = None) -> str:
125 |         """
126 |         Get detailed information about an IAM role, including its permissions.
127 |         
128 |         Args:
129 |             role_name: The name of the role (e.g., "roles/compute.admin" or "projects/my-project/roles/myCustomRole")
130 |             project_id: Optional project ID for custom roles. Not needed if role_name is fully qualified.
131 |         
132 |         Returns:
133 |             Detailed information about the IAM role
134 |         """
135 |         try:
136 |             from google.cloud import iam_v1
137 |             
138 |             # Initialize the IAM client
139 |             client = iam_v1.IAMClient()
140 |             
141 |             # If project_id is provided and role_name doesn't include it, create fully qualified role name
142 |             if project_id and not role_name.startswith("projects/") and not role_name.startswith("roles/"):
143 |                 role_name = f"projects/{project_id}/roles/{role_name}"
144 |             elif not role_name.startswith("projects/") and not role_name.startswith("roles/"):
145 |                 role_name = f"roles/{role_name}"
146 |             
147 |             # Get role details
148 |             request = iam_v1.GetRoleRequest(name=role_name)
149 |             role = client.get_role(request=request)
150 |             
151 |             details = []
152 |             details.append(f"Name: {role.name}")
153 |             details.append(f"Title: {role.title}")
154 |             details.append(f"Description: {role.description or 'No description'}")
155 |             
156 |             if role.included_permissions:
157 |                 permissions_str = "\n".join([f"- {permission}" for permission in role.included_permissions])
158 |                 details.append(f"Permissions ({len(role.included_permissions)}):\n{permissions_str}")
159 |             else:
160 |                 details.append("Permissions: None")
161 |             
162 |             if hasattr(role, 'stage'):
163 |                 details.append(f"Stage: {role.stage}")
164 |             
165 |             if hasattr(role, 'etag'):
166 |                 details.append(f"ETag: {role.etag}")
167 |             
168 |             return f"""
169 | IAM Role Details for {role.name}:
170 | {chr(10).join(details)}
171 | """
172 |         except Exception as e:
173 |             return f"Error getting role permissions: {str(e)}"
174 |     
175 |     @mcp.tool()
176 |     def list_service_accounts(project_id: str) -> str:
177 |         """
178 |         List service accounts in a GCP project.
179 |         
180 |         Args:
181 |             project_id: The ID of the GCP project
182 |         
183 |         Returns:
184 |             List of service accounts in the project
185 |         """
186 |         try:
187 |             from google.cloud import iam_v1
188 |             
189 |             # Initialize the IAM client
190 |             client = iam_v1.IAMClient()
191 |             
192 |             # List service accounts
193 |             request = iam_v1.ListServiceAccountsRequest(
194 |                 name=f"projects/{project_id}"
195 |             )
196 |             service_accounts = client.list_service_accounts(request=request)
197 |             
198 |             accounts_list = []
199 |             for account in service_accounts:
200 |                 display_name = account.display_name or "No display name"
201 |                 accounts_list.append(f"- {account.email} ({display_name})")
202 |             
203 |             if not accounts_list:
204 |                 return f"No service accounts found in project {project_id}."
205 |             
206 |             accounts_str = "\n".join(accounts_list)
207 |             
208 |             return f"""
209 | Service Accounts in GCP Project {project_id}:
210 | {accounts_str}
211 | """
212 |         except Exception as e:
213 |             return f"Error listing service accounts: {str(e)}"
214 |     
215 |     @mcp.tool()
216 |     def create_service_account(project_id: str, account_id: str, display_name: str, description: Optional[str] = None) -> str:
217 |         """
218 |         Create a new service account in a GCP project.
219 |         
220 |         Args:
221 |             project_id: The ID of the GCP project
222 |             account_id: The ID for the service account (must be between 6 and 30 characters)
223 |             display_name: A user-friendly name for the service account
224 |             description: Optional description for the service account
225 |         
226 |         Returns:
227 |             Result of the service account creation
228 |         """
229 |         try:
230 |             from google.cloud import iam_v1
231 |             
232 |             # Initialize the IAM client
233 |             client = iam_v1.IAMClient()
234 |             
235 |             # Create service account
236 |             request = iam_v1.CreateServiceAccountRequest(
237 |                 name=f"projects/{project_id}",
238 |                 account_id=account_id,
239 |                 service_account=iam_v1.ServiceAccount(
240 |                     display_name=display_name,
241 |                     description=description
242 |                 )
243 |             )
244 |             service_account = client.create_service_account(request=request)
245 |             
246 |             return f"""
247 | Service Account created successfully:
248 | - Email: {service_account.email}
249 | - Name: {service_account.name}
250 | - Display Name: {service_account.display_name}
251 | - Description: {service_account.description or 'None'}
252 | """
253 |         except Exception as e:
254 |             return f"Error creating service account: {str(e)}"
255 |     
256 |     @mcp.tool()
257 |     def add_iam_policy_binding(project_id: str, role: str, member: str) -> str:
258 |         """
259 |         Add an IAM policy binding to a GCP project.
260 |         
261 |         Args:
262 |             project_id: The ID of the GCP project
263 |             role: The role to grant (e.g., "roles/compute.admin")
264 |             member: The member to grant the role to (e.g., "user:[email protected]", "serviceAccount:[email protected]")
265 |         
266 |         Returns:
267 |             Result of the policy binding operation
268 |         """
269 |         try:
270 |             from google.cloud import resourcemanager_v3
271 |             from google.iam.v1 import iam_policy_pb2, policy_pb2
272 |             
273 |             # Initialize the Resource Manager client
274 |             client = resourcemanager_v3.ProjectsClient()
275 |             
276 |             # Get the current IAM policy
277 |             get_request = iam_policy_pb2.GetIamPolicyRequest(
278 |                 resource=f"projects/{project_id}"
279 |             )
280 |             policy = client.get_iam_policy(request=get_request)
281 |             
282 |             # Check if the binding already exists
283 |             binding_exists = False
284 |             for binding in policy.bindings:
285 |                 if binding.role == role and member in binding.members:
286 |                     binding_exists = True
287 |                     break
288 |             
289 |             if binding_exists:
290 |                 return f"IAM policy binding already exists: {member} already has role {role} in project {project_id}."
291 |             
292 |             # Add the new binding
293 |             binding = policy_pb2.Binding()
294 |             binding.role = role
295 |             binding.members.append(member)
296 |             policy.bindings.append(binding)
297 |             
298 |             # Set the updated IAM policy
299 |             set_request = iam_policy_pb2.SetIamPolicyRequest(
300 |                 resource=f"projects/{project_id}",
301 |                 policy=policy
302 |             )
303 |             updated_policy = client.set_iam_policy(request=set_request)
304 |             
305 |             return f"""
306 | IAM policy binding added successfully:
307 | - Project: {project_id}
308 | - Role: {role}
309 | - Member: {member}
310 | """
311 |         except Exception as e:
312 |             return f"Error adding IAM policy binding: {str(e)}"
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/databases/tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Google Cloud Platform Database tools.
  3 | """
  4 | from typing import List, Dict, Any, Optional
  5 | 
  6 | def register_tools(mcp):
  7 |     """Register all database tools with the MCP server."""
  8 |     
  9 |     @mcp.tool()
 10 |     def list_cloud_sql_instances(project_id: str) -> str:
 11 |         """
 12 |         List Cloud SQL instances in a GCP project.
 13 |         
 14 |         Args:
 15 |             project_id: The ID of the GCP project to list Cloud SQL instances for
 16 |         
 17 |         Returns:
 18 |             List of Cloud SQL instances in the specified GCP project
 19 |         """
 20 |         try:
 21 |             from googleapiclient import discovery
 22 |             
 23 |             # Initialize the Cloud SQL Admin API client
 24 |             service = discovery.build('sqladmin', 'v1')
 25 |             
 26 |             # List SQL instances
 27 |             request = service.instances().list(project=project_id)
 28 |             response = request.execute()
 29 |             
 30 |             # Format the response
 31 |             instances_list = []
 32 |             
 33 |             if 'items' in response:
 34 |                 for instance in response['items']:
 35 |                     name = instance.get('name', 'Unknown')
 36 |                     db_version = instance.get('databaseVersion', 'Unknown')
 37 |                     state = instance.get('state', 'Unknown')
 38 |                     region = instance.get('region', 'Unknown')
 39 |                     tier = instance.get('settings', {}).get('tier', 'Unknown')
 40 |                     storage_size = instance.get('settings', {}).get('dataDiskSizeGb', 'Unknown')
 41 |                     
 42 |                     instances_list.append(f"- {name} (Type: {db_version}, Region: {region}, Tier: {tier}, Storage: {storage_size}GB, State: {state})")
 43 |             
 44 |             if not instances_list:
 45 |                 return f"No Cloud SQL instances found in project {project_id}."
 46 |             
 47 |             instances_str = "\n".join(instances_list)
 48 |             
 49 |             return f"""
 50 | Cloud SQL Instances in GCP Project {project_id}:
 51 | {instances_str}
 52 | """
 53 |         except Exception as e:
 54 |             return f"Error listing Cloud SQL instances: {str(e)}"
 55 |     
 56 |     @mcp.tool()
 57 |     def get_sql_instance_details(project_id: str, instance_id: str) -> str:
 58 |         """
 59 |         Get detailed information about a specific Cloud SQL instance.
 60 |         
 61 |         Args:
 62 |             project_id: The ID of the GCP project
 63 |             instance_id: The ID of the Cloud SQL instance
 64 |         
 65 |         Returns:
 66 |             Detailed information about the specified Cloud SQL instance
 67 |         """
 68 |         try:
 69 |             from googleapiclient import discovery
 70 |             
 71 |             # Initialize the Cloud SQL Admin API client
 72 |             service = discovery.build('sqladmin', 'v1')
 73 |             
 74 |             # Get instance details
 75 |             request = service.instances().get(project=project_id, instance=instance_id)
 76 |             instance = request.execute()
 77 |             
 78 |             # Format the response
 79 |             details = []
 80 |             details.append(f"Name: {instance.get('name', 'Unknown')}")
 81 |             details.append(f"Self Link: {instance.get('selfLink', 'Unknown')}")
 82 |             details.append(f"Database Version: {instance.get('databaseVersion', 'Unknown')}")
 83 |             details.append(f"State: {instance.get('state', 'Unknown')}")
 84 |             details.append(f"Region: {instance.get('region', 'Unknown')}")
 85 |             
 86 |             # Settings information
 87 |             if 'settings' in instance:
 88 |                 settings = instance['settings']
 89 |                 details.append(f"Tier: {settings.get('tier', 'Unknown')}")
 90 |                 details.append(f"Storage Size: {settings.get('dataDiskSizeGb', 'Unknown')}GB")
 91 |                 details.append(f"Availability Type: {settings.get('availabilityType', 'Unknown')}")
 92 |                 
 93 |                 # Backup configuration
 94 |                 if 'backupConfiguration' in settings:
 95 |                     backup = settings['backupConfiguration']
 96 |                     enabled = backup.get('enabled', False)
 97 |                     details.append(f"Automated Backups: {'Enabled' if enabled else 'Disabled'}")
 98 |                     if enabled:
 99 |                         details.append(f"Backup Start Time: {backup.get('startTime', 'Unknown')}")
100 |                         details.append(f"Binary Log Enabled: {backup.get('binaryLogEnabled', False)}")
101 |                 
102 |                 # IP configuration
103 |                 if 'ipConfiguration' in settings:
104 |                     ip_config = settings['ipConfiguration']
105 |                     public_ip = "Enabled" if not ip_config.get('privateNetwork') else "Disabled"
106 |                     details.append(f"Public IP: {public_ip}")
107 |                     
108 |                     if 'authorizedNetworks' in ip_config:
109 |                         networks = []
110 |                         for network in ip_config['authorizedNetworks']:
111 |                             name = network.get('name', 'Unnamed')
112 |                             value = network.get('value', 'Unknown')
113 |                             networks.append(f"    - {name}: {value}")
114 |                         
115 |                         if networks:
116 |                             details.append("Authorized Networks:")
117 |                             details.extend(networks)
118 |             
119 |             # IP Addresses
120 |             if 'ipAddresses' in instance:
121 |                 ip_addresses = []
122 |                 for ip in instance['ipAddresses']:
123 |                     ip_type = ip.get('type', 'Unknown')
124 |                     ip_address = ip.get('ipAddress', 'Unknown')
125 |                     ip_addresses.append(f"    - {ip_type}: {ip_address}")
126 |                 
127 |                 if ip_addresses:
128 |                     details.append("IP Addresses:")
129 |                     details.extend(ip_addresses)
130 |             
131 |             details_str = "\n".join(details)
132 |             
133 |             return f"""
134 | Cloud SQL Instance Details:
135 | {details_str}
136 | """
137 |         except Exception as e:
138 |             return f"Error getting SQL instance details: {str(e)}"
139 |     
140 |     @mcp.tool()
141 |     def list_databases(project_id: str, instance_id: str) -> str:
142 |         """
143 |         List databases in a Cloud SQL instance.
144 |         
145 |         Args:
146 |             project_id: The ID of the GCP project
147 |             instance_id: The ID of the Cloud SQL instance
148 |         
149 |         Returns:
150 |             List of databases in the specified Cloud SQL instance
151 |         """
152 |         try:
153 |             from googleapiclient import discovery
154 |             
155 |             # Initialize the Cloud SQL Admin API client
156 |             service = discovery.build('sqladmin', 'v1')
157 |             
158 |             # List databases
159 |             request = service.databases().list(project=project_id, instance=instance_id)
160 |             response = request.execute()
161 |             
162 |             # Format the response
163 |             databases_list = []
164 |             
165 |             if 'items' in response:
166 |                 for database in response['items']:
167 |                     name = database.get('name', 'Unknown')
168 |                     charset = database.get('charset', 'Unknown')
169 |                     collation = database.get('collation', 'Unknown')
170 |                     
171 |                     databases_list.append(f"- {name} (Charset: {charset}, Collation: {collation})")
172 |             
173 |             if not databases_list:
174 |                 return f"No databases found in Cloud SQL instance {instance_id}."
175 |             
176 |             databases_str = "\n".join(databases_list)
177 |             
178 |             return f"""
179 | Databases in Cloud SQL Instance {instance_id}:
180 | {databases_str}
181 | """
182 |         except Exception as e:
183 |             return f"Error listing databases: {str(e)}"
184 |     
185 |     @mcp.tool()
186 |     def create_backup(project_id: str, instance_id: str, description: Optional[str] = None) -> str:
187 |         """
188 |         Create a backup for a Cloud SQL instance.
189 |         
190 |         Args:
191 |             project_id: The ID of the GCP project
192 |             instance_id: The ID of the Cloud SQL instance
193 |             description: Optional description for the backup
194 |         
195 |         Returns:
196 |             Result of the backup operation
197 |         """
198 |         try:
199 |             from googleapiclient import discovery
200 |             
201 |             # Initialize the Cloud SQL Admin API client
202 |             service = discovery.build('sqladmin', 'v1')
203 |             
204 |             # Create backup
205 |             backup_run_body = {}
206 |             if description:
207 |                 backup_run_body['description'] = description
208 |             
209 |             request = service.backupRuns().insert(project=project_id, instance=instance_id, body=backup_run_body)
210 |             operation = request.execute()
211 |             
212 |             # Get operation ID and status
213 |             operation_id = operation.get('name', 'Unknown')
214 |             status = operation.get('status', 'Unknown')
215 |             
216 |             return f"""
217 | Backup operation initiated:
218 | - Instance: {instance_id}
219 | - Project: {project_id}
220 | - Description: {description or 'None provided'}
221 | 
222 | Operation ID: {operation_id}
223 | Status: {status}
224 | 
225 | The backup process may take some time to complete. You can check the status of the backup using the Cloud SQL Admin API or Google Cloud Console.
226 | """
227 |         except Exception as e:
228 |             return f"Error creating backup: {str(e)}"
229 |     
230 |     @mcp.tool()
231 |     def list_firestore_databases(project_id: str) -> str:
232 |         """
233 |         List Firestore databases in a GCP project.
234 |         
235 |         Args:
236 |             project_id: The ID of the GCP project to list Firestore databases for
237 |         
238 |         Returns:
239 |             List of Firestore databases in the specified GCP project
240 |         """
241 |         try:
242 |             from google.cloud import firestore_admin_v1
243 |             
244 |             # Initialize the Firestore Admin client
245 |             client = firestore_admin_v1.FirestoreAdminClient()
246 |             
247 |             # List databases
248 |             parent = f"projects/{project_id}"
249 |             databases = client.list_databases(parent=parent)
250 |             
251 |             # Format the response
252 |             databases_list = []
253 |             
254 |             for database in databases:
255 |                 name = database.name.split('/')[-1]
256 |                 db_type = "Firestore Native" if database.type_ == firestore_admin_v1.Database.DatabaseType.FIRESTORE_NATIVE else "Datastore Mode"
257 |                 location = database.location_id
258 |                 
259 |                 databases_list.append(f"- {name} (Type: {db_type}, Location: {location})")
260 |             
261 |             if not databases_list:
262 |                 return f"No Firestore databases found in project {project_id}."
263 |             
264 |             databases_str = "\n".join(databases_list)
265 |             
266 |             return f"""
267 | Firestore Databases in GCP Project {project_id}:
268 | {databases_str}
269 | """
270 |         except Exception as e:
271 |             return f"Error listing Firestore databases: {str(e)}"
272 |     
273 |     @mcp.tool()
274 |     def list_firestore_collections(project_id: str, database_id: str = "(default)") -> str:
275 |         """
276 |         List collections in a Firestore database.
277 |         
278 |         Args:
279 |             project_id: The ID of the GCP project
280 |             database_id: The ID of the Firestore database (default is "(default)")
281 |         
282 |         Returns:
283 |             List of collections in the specified Firestore database
284 |         """
285 |         try:
286 |             from google.cloud import firestore
287 |             
288 |             # Initialize the Firestore client
289 |             client = firestore.Client(project=project_id, database=database_id)
290 |             
291 |             # List collections
292 |             collections = client.collections()
293 |             
294 |             # Format the response
295 |             collections_list = []
296 |             
297 |             for collection in collections:
298 |                 collections_list.append(f"- {collection.id}")
299 |             
300 |             if not collections_list:
301 |                 return f"No collections found in Firestore database {database_id}."
302 |             
303 |             collections_str = "\n".join(collections_list)
304 |             
305 |             return f"""
306 | Collections in Firestore Database {database_id} (Project: {project_id}):
307 | {collections_str}
308 | """
309 |         except Exception as e:
310 |             return f"Error listing Firestore collections: {str(e)}"
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/kubernetes/tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Google Cloud Platform Kubernetes Engine (GKE) tools.
  3 | """
  4 | from typing import List, Dict, Any, Optional
  5 | 
  6 | def register_tools(mcp):
  7 |     """Register all kubernetes tools with the MCP server."""
  8 |     
  9 |     @mcp.tool()
 10 |     def list_gke_clusters(project_id: str, region: str = "") -> str:
 11 |         """
 12 |         List Google Kubernetes Engine (GKE) clusters in a GCP project.
 13 |         
 14 |         Args:
 15 |             project_id: The ID of the GCP project to list GKE clusters for
 16 |             region: Optional region to filter clusters (e.g., "us-central1")
 17 |         
 18 |         Returns:
 19 |             List of GKE clusters in the specified GCP project
 20 |         """
 21 |         try:
 22 |             from google.cloud import container_v1
 23 |             
 24 |             # Initialize the GKE client
 25 |             client = container_v1.ClusterManagerClient()
 26 |             
 27 |             clusters_list = []
 28 |             
 29 |             if region:
 30 |                 # List clusters in the specified region
 31 |                 parent = f"projects/{project_id}/locations/{region}"
 32 |                 response = client.list_clusters(parent=parent)
 33 |                 
 34 |                 for cluster in response.clusters:
 35 |                     version = cluster.current_master_version
 36 |                     node_count = sum(pool.initial_node_count for pool in cluster.node_pools)
 37 |                     status = "Running" if cluster.status == container_v1.Cluster.Status.RUNNING else cluster.status.name
 38 |                     clusters_list.append(f"- {cluster.name} (Region: {region}, Version: {version}, Nodes: {node_count}, Status: {status})")
 39 |             else:
 40 |                 # List clusters in all regions
 41 |                 from google.cloud import compute_v1
 42 |                 
 43 |                 # Get all regions
 44 |                 regions_client = compute_v1.RegionsClient()
 45 |                 regions_request = compute_v1.ListRegionsRequest(project=project_id)
 46 |                 regions = regions_client.list(request=regions_request)
 47 |                 
 48 |                 for region_item in regions:
 49 |                     region_name = region_item.name
 50 |                     parent = f"projects/{project_id}/locations/{region_name}"
 51 |                     try:
 52 |                         response = client.list_clusters(parent=parent)
 53 |                         
 54 |                         for cluster in response.clusters:
 55 |                             version = cluster.current_master_version
 56 |                             node_count = sum(pool.initial_node_count for pool in cluster.node_pools)
 57 |                             status = "Running" if cluster.status == container_v1.Cluster.Status.RUNNING else cluster.status.name
 58 |                             clusters_list.append(f"- {cluster.name} (Region: {region_name}, Version: {version}, Nodes: {node_count}, Status: {status})")
 59 |                     except Exception:
 60 |                         # Skip regions where we can't list clusters
 61 |                         continue
 62 |                     
 63 |                 # Also check zonal clusters
 64 |                 zones_client = compute_v1.ZonesClient()
 65 |                 zones_request = compute_v1.ListZonesRequest(project=project_id)
 66 |                 zones = zones_client.list(request=zones_request)
 67 |                 
 68 |                 for zone_item in zones:
 69 |                     zone_name = zone_item.name
 70 |                     parent = f"projects/{project_id}/locations/{zone_name}"
 71 |                     try:
 72 |                         response = client.list_clusters(parent=parent)
 73 |                         
 74 |                         for cluster in response.clusters:
 75 |                             version = cluster.current_master_version
 76 |                             node_count = sum(pool.initial_node_count for pool in cluster.node_pools)
 77 |                             status = "Running" if cluster.status == container_v1.Cluster.Status.RUNNING else cluster.status.name
 78 |                             clusters_list.append(f"- {cluster.name} (Zone: {zone_name}, Version: {version}, Nodes: {node_count}, Status: {status})")
 79 |                     except Exception:
 80 |                         # Skip zones where we can't list clusters
 81 |                         continue
 82 |             
 83 |             if not clusters_list:
 84 |                 region_msg = f" in region {region}" if region else ""
 85 |                 return f"No GKE clusters found{region_msg} for project {project_id}."
 86 |             
 87 |             clusters_str = "\n".join(clusters_list)
 88 |             region_msg = f" in region {region}" if region else ""
 89 |             
 90 |             return f"""
 91 | Google Kubernetes Engine (GKE) Clusters{region_msg} in GCP Project {project_id}:
 92 | {clusters_str}
 93 | """
 94 |         except Exception as e:
 95 |             return f"Error listing GKE clusters: {str(e)}"
 96 |     
 97 |     @mcp.tool()
 98 |     def get_cluster_details(project_id: str, cluster_name: str, location: str) -> str:
 99 |         """
100 |         Get detailed information about a specific GKE cluster.
101 |         
102 |         Args:
103 |             project_id: The ID of the GCP project
104 |             cluster_name: The name of the GKE cluster
105 |             location: The location (region or zone) of the cluster
106 |         
107 |         Returns:
108 |             Detailed information about the specified GKE cluster
109 |         """
110 |         try:
111 |             from google.cloud import container_v1
112 |             
113 |             # Initialize the GKE client
114 |             client = container_v1.ClusterManagerClient()
115 |             
116 |             # Get cluster details
117 |             cluster_path = f"projects/{project_id}/locations/{location}/clusters/{cluster_name}"
118 |             cluster = client.get_cluster(name=cluster_path)
119 |             
120 |             # Format the response
121 |             details = []
122 |             details.append(f"Name: {cluster.name}")
123 |             details.append(f"Description: {cluster.description or 'None'}")
124 |             details.append(f"Location: {location}")
125 |             details.append(f"Location Type: {'Regional' if '-' not in location else 'Zonal'}")
126 |             details.append(f"Status: {'Running' if cluster.status == container_v1.Cluster.Status.RUNNING else cluster.status.name}")
127 |             details.append(f"Kubernetes Version: {cluster.current_master_version}")
128 |             details.append(f"Network: {cluster.network}")
129 |             details.append(f"Subnetwork: {cluster.subnetwork}")
130 |             details.append(f"Cluster CIDR: {cluster.cluster_ipv4_cidr}")
131 |             details.append(f"Services CIDR: {cluster.services_ipv4_cidr}")
132 |             details.append(f"Endpoint: {cluster.endpoint}")
133 |             
134 |             # Add Node Pools information
135 |             node_pools = []
136 |             for pool in cluster.node_pools:
137 |                 machine_type = pool.config.machine_type
138 |                 disk_size_gb = pool.config.disk_size_gb
139 |                 autoscaling = "Enabled" if pool.autoscaling and pool.autoscaling.enabled else "Disabled"
140 |                 min_nodes = pool.autoscaling.min_node_count if pool.autoscaling and pool.autoscaling.enabled else "N/A"
141 |                 max_nodes = pool.autoscaling.max_node_count if pool.autoscaling and pool.autoscaling.enabled else "N/A"
142 |                 initial_nodes = pool.initial_node_count
143 |                 
144 |                 node_pools.append(f"  - {pool.name} (Machine: {machine_type}, Disk: {disk_size_gb}GB, Initial Nodes: {initial_nodes})")
145 |                 if autoscaling == "Enabled":
146 |                     node_pools.append(f"    Autoscaling: {autoscaling} (Min: {min_nodes}, Max: {max_nodes})")
147 |             
148 |             if node_pools:
149 |                 details.append(f"Node Pools ({len(cluster.node_pools)}):\n" + "\n".join(node_pools))
150 |             
151 |             # Add Addons information
152 |             addons = []
153 |             if cluster.addons_config:
154 |                 config = cluster.addons_config
155 |                 addons.append(f"  - HTTP Load Balancing: {'Enabled' if not config.http_load_balancing or not config.http_load_balancing.disabled else 'Disabled'}")
156 |                 addons.append(f"  - Horizontal Pod Autoscaling: {'Enabled' if not config.horizontal_pod_autoscaling or not config.horizontal_pod_autoscaling.disabled else 'Disabled'}")
157 |                 addons.append(f"  - Kubernetes Dashboard: {'Enabled' if not config.kubernetes_dashboard or not config.kubernetes_dashboard.disabled else 'Disabled'}")
158 |                 addons.append(f"  - Network Policy: {'Enabled' if config.network_policy_config and not config.network_policy_config.disabled else 'Disabled'}")
159 |             
160 |             if addons:
161 |                 details.append(f"Addons:\n" + "\n".join(addons))
162 |             
163 |             details_str = "\n".join(details)
164 |             
165 |             return f"""
166 | GKE Cluster Details:
167 | {details_str}
168 | """
169 |         except Exception as e:
170 |             return f"Error getting cluster details: {str(e)}"
171 |     
172 |     @mcp.tool()
173 |     def list_node_pools(project_id: str, cluster_name: str, location: str) -> str:
174 |         """
175 |         List node pools in a GKE cluster.
176 |         
177 |         Args:
178 |             project_id: The ID of the GCP project
179 |             cluster_name: The name of the GKE cluster
180 |             location: The location (region or zone) of the cluster
181 |         
182 |         Returns:
183 |             List of node pools in the specified GKE cluster
184 |         """
185 |         try:
186 |             from google.cloud import container_v1
187 |             
188 |             # Initialize the GKE client
189 |             client = container_v1.ClusterManagerClient()
190 |             
191 |             # List node pools
192 |             cluster_path = f"projects/{project_id}/locations/{location}/clusters/{cluster_name}"
193 |             node_pools = client.list_node_pools(parent=cluster_path)
194 |             
195 |             # Format the response
196 |             pools_list = []
197 |             for pool in node_pools.node_pools:
198 |                 machine_type = pool.config.machine_type
199 |                 disk_size_gb = pool.config.disk_size_gb
200 |                 autoscaling = "Enabled" if pool.autoscaling and pool.autoscaling.enabled else "Disabled"
201 |                 min_nodes = pool.autoscaling.min_node_count if pool.autoscaling and pool.autoscaling.enabled else "N/A"
202 |                 max_nodes = pool.autoscaling.max_node_count if pool.autoscaling and pool.autoscaling.enabled else "N/A"
203 |                 initial_nodes = pool.initial_node_count
204 |                 
205 |                 pool_info = [
206 |                     f"- {pool.name}:",
207 |                     f"  Machine Type: {machine_type}",
208 |                     f"  Disk Size: {disk_size_gb}GB",
209 |                     f"  Initial Node Count: {initial_nodes}",
210 |                     f"  Autoscaling: {autoscaling}"
211 |                 ]
212 |                 
213 |                 if autoscaling == "Enabled":
214 |                     pool_info.append(f"  Min Nodes: {min_nodes}")
215 |                     pool_info.append(f"  Max Nodes: {max_nodes}")
216 |                 
217 |                 if pool.config.labels:
218 |                     labels = [f"{k}: {v}" for k, v in pool.config.labels.items()]
219 |                     pool_info.append(f"  Labels: {', '.join(labels)}")
220 |                 
221 |                 pools_list.append("\n".join(pool_info))
222 |             
223 |             if not pools_list:
224 |                 return f"No node pools found in GKE cluster {cluster_name} in location {location}."
225 |             
226 |             pools_str = "\n".join(pools_list)
227 |             
228 |             return f"""
229 | Node Pools in GKE Cluster {cluster_name} (Location: {location}):
230 | {pools_str}
231 | """
232 |         except Exception as e:
233 |             return f"Error listing node pools: {str(e)}"
234 |     
235 |     @mcp.tool()
236 |     def resize_node_pool(project_id: str, cluster_name: str, location: str, node_pool_name: str, node_count: int) -> str:
237 |         """
238 |         Resize a node pool in a GKE cluster.
239 |         
240 |         Args:
241 |             project_id: The ID of the GCP project
242 |             cluster_name: The name of the GKE cluster
243 |             location: The location (region or zone) of the cluster
244 |             node_pool_name: The name of the node pool to resize
245 |             node_count: The new node count for the pool
246 |         
247 |         Returns:
248 |             Result of the node pool resize operation
249 |         """
250 |         try:
251 |             from google.cloud import container_v1
252 |             
253 |             # Initialize the GKE client
254 |             client = container_v1.ClusterManagerClient()
255 |             
256 |             # Create the node pool path
257 |             node_pool_path = f"projects/{project_id}/locations/{location}/clusters/{cluster_name}/nodePools/{node_pool_name}"
258 |             
259 |             # Get the current node pool
260 |             node_pool = client.get_node_pool(name=node_pool_path)
261 |             current_node_count = node_pool.initial_node_count
262 |             
263 |             # Check if autoscaling is enabled
264 |             if node_pool.autoscaling and node_pool.autoscaling.enabled:
265 |                 return f"""
266 | Cannot resize node pool {node_pool_name} because autoscaling is enabled.
267 | To manually set the node count, you must first disable autoscaling for this node pool.
268 | Current autoscaling settings:
269 | - Min nodes: {node_pool.autoscaling.min_node_count}
270 | - Max nodes: {node_pool.autoscaling.max_node_count}
271 | """
272 |             
273 |             # Resize the node pool
274 |             request = container_v1.SetNodePoolSizeRequest(
275 |                 name=node_pool_path,
276 |                 node_count=node_count
277 |             )
278 |             operation = client.set_node_pool_size(request=request)
279 |             
280 |             return f"""
281 | Node pool resize operation initiated:
282 | - Cluster: {cluster_name}
283 | - Location: {location}
284 | - Node Pool: {node_pool_name}
285 | - Current Node Count: {current_node_count}
286 | - New Node Count: {node_count}
287 | 
288 | Operation ID: {operation.name}
289 | Status: {operation.status.name if hasattr(operation.status, 'name') else operation.status}
290 | """
291 |         except Exception as e:
292 |             return f"Error resizing node pool: {str(e)}"
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/monitoring/tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Google Cloud Platform Monitoring tools.
  3 | """
  4 | from typing import List, Dict, Any, Optional
  5 | 
  6 | def register_tools(mcp):
  7 |     """Register all monitoring tools with the MCP server."""
  8 |     
  9 |     @mcp.tool()
 10 |     def list_monitoring_metrics(project_id: str, filter_str: str = "") -> str:
 11 |         """
 12 |         List available monitoring metrics for a GCP project.
 13 |         
 14 |         Args:
 15 |             project_id: The ID of the GCP project to list metrics for
 16 |             filter_str: Optional filter string to narrow down the metrics
 17 |         
 18 |         Returns:
 19 |             List of available monitoring metrics in the specified GCP project
 20 |         """
 21 |         try:
 22 |             try:
 23 |                 from google.cloud import monitoring_v3
 24 |             except ImportError:
 25 |                 return "Error: The Google Cloud monitoring library is not installed. Please install it with 'pip install google-cloud-monitoring'."
 26 |             
 27 |             # Initialize the Monitoring client
 28 |             client = monitoring_v3.MetricServiceClient()
 29 |             
 30 |             # Format the project name
 31 |             project_name = f"projects/{project_id}"
 32 |             
 33 |             # Create the request object with the filter
 34 |             request = monitoring_v3.ListMetricDescriptorsRequest(
 35 |                 name=project_name
 36 |             )
 37 |             
 38 |             # Add filter if provided
 39 |             if filter_str:
 40 |                 request.filter = filter_str
 41 |             
 42 |             # List metric descriptors with optional filter
 43 |             descriptors = client.list_metric_descriptors(request=request)
 44 |             
 45 |             # Format the response
 46 |             metrics_list = []
 47 |             for descriptor in descriptors:
 48 |                 metric_type = descriptor.type
 49 |                 display_name = descriptor.display_name or metric_type.split('/')[-1]
 50 |                 description = descriptor.description or "No description"
 51 |                 metrics_list.append(f"- {display_name}: {metric_type}\n  {description}")
 52 |             
 53 |             if not metrics_list:
 54 |                 filter_msg = f" with filter '{filter_str}'" if filter_str else ""
 55 |                 return f"No metrics found{filter_msg} for project {project_id}."
 56 |             
 57 |             # Limit to 50 metrics to avoid overwhelming response
 58 |             if len(metrics_list) > 50:
 59 |                 metrics_str = "\n".join(metrics_list[:50])
 60 |                 return f"Found {len(metrics_list)} metrics for project {project_id}. Showing first 50:\n\n{metrics_str}\n\nUse a filter to narrow down results."
 61 |             else:
 62 |                 metrics_str = "\n".join(metrics_list)
 63 |                 return f"Found {len(metrics_list)} metrics for project {project_id}:\n\n{metrics_str}"
 64 |         except Exception as e:
 65 |             return f"Error listing monitoring metrics: {str(e)}"
 66 |     
 67 |     @mcp.tool()
 68 |     def get_monitoring_alerts(project_id: str) -> str:
 69 |         """
 70 |         Get active monitoring alerts for a GCP project.
 71 |         
 72 |         Args:
 73 |             project_id: The ID of the GCP project to get alerts for
 74 |         
 75 |         Returns:
 76 |             Active alerts for the specified GCP project
 77 |         """
 78 |         try:
 79 |             from google.cloud import monitoring_v3
 80 |             from google.protobuf.json_format import MessageToDict
 81 |             
 82 |             # Initialize the Alert Policy Service client
 83 |             alert_client = monitoring_v3.AlertPolicyServiceClient()
 84 |             
 85 |             # Format the project name
 86 |             project_name = f"projects/{project_id}"
 87 |             
 88 |             # Create the request object
 89 |             request = monitoring_v3.ListAlertPoliciesRequest(
 90 |                 name=project_name
 91 |             )
 92 |             
 93 |             # List all alert policies
 94 |             alert_policies = alert_client.list_alert_policies(request=request)
 95 |             
 96 |             # Initialize the Metric Service client for metric data
 97 |             metric_client = monitoring_v3.MetricServiceClient()
 98 |             
 99 |             # Format the response
100 |             active_alerts = []
101 |             
102 |             for policy in alert_policies:
103 |                 # Check if the policy is enabled
104 |                 if not policy.enabled:
105 |                     continue
106 |                 
107 |                 # Check for active incidents
108 |                 filter_str = f'resource.type="alerting_policy" AND metric.type="monitoring.googleapis.com/alert_policy/incident_count" AND metric.label.policy_name="{policy.name.split("/")[-1]}"'
109 |                 
110 |                 # Create a time interval for the last hour
111 |                 import datetime
112 |                 from google.protobuf import timestamp_pb2
113 |                 
114 |                 now = datetime.datetime.utcnow()
115 |                 seconds = int(now.timestamp())
116 |                 end_time = timestamp_pb2.Timestamp(seconds=seconds)
117 |                 
118 |                 start_time = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
119 |                 seconds = int(start_time.timestamp())
120 |                 start_time_proto = timestamp_pb2.Timestamp(seconds=seconds)
121 |                 
122 |                 # Create the time interval
123 |                 interval = monitoring_v3.TimeInterval(
124 |                     start_time=start_time_proto,
125 |                     end_time=end_time
126 |                 )
127 |                 
128 |                 # List the time series
129 |                 try:
130 |                     # Create the request object
131 |                     request = monitoring_v3.ListTimeSeriesRequest(
132 |                         name=project_name,
133 |                         filter=filter_str,
134 |                         interval=interval,
135 |                         view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
136 |                     )
137 |                     
138 |                     # List the time series
139 |                     time_series = metric_client.list_time_series(request=request)
140 |                     
141 |                     is_active = False
142 |                     for series in time_series:
143 |                         # Check if there's a non-zero value in the time series
144 |                         for point in series.points:
145 |                             if point.value.int64_value > 0:
146 |                                 is_active = True
147 |                                 break
148 |                         if is_active:
149 |                             break
150 |                     
151 |                     if is_active:
152 |                         # Format conditions
153 |                         conditions = []
154 |                         for condition in policy.conditions:
155 |                             if condition.display_name:
156 |                                 conditions.append(f"    - {condition.display_name}: {condition.condition_threshold.filter}")
157 |                         
158 |                         # Add to active alerts
159 |                         alert_info = [
160 |                             f"- {policy.display_name} (ID: {policy.name.split('/')[-1]})",
161 |                             f"  Description: {policy.documentation.content if policy.documentation else 'No description'}",
162 |                             f"  Severity: {policy.alert_strategy.notification_rate_limit.period.seconds}s between notifications" if policy.alert_strategy.notification_rate_limit else "  No rate limiting"
163 |                         ]
164 |                         
165 |                         if conditions:
166 |                             alert_info.append("  Conditions:")
167 |                             alert_info.extend(conditions)
168 |                         
169 |                         active_alerts.append("\n".join(alert_info))
170 |                 except Exception:
171 |                     # Skip if we can't check for active incidents
172 |                     continue
173 |             
174 |             if not active_alerts:
175 |                 return f"No active alerts found for project {project_id}."
176 |             
177 |             alerts_str = "\n".join(active_alerts)
178 |             
179 |             return f"""
180 | Active Monitoring Alerts in GCP Project {project_id}:
181 | {alerts_str}
182 | """
183 |         except Exception as e:
184 |             return f"Error getting monitoring alerts: {str(e)}"
185 |     
186 |     @mcp.tool()
187 |     def create_alert_policy(project_id: str, display_name: str, metric_type: str, 
188 |                           filter_str: str, duration_seconds: int = 60, 
189 |                           threshold_value: float = 0.0, comparison: str = "COMPARISON_GT",
190 |                           notification_channels: Optional[List[str]] = None) -> str:
191 |         """
192 |         Create a new alert policy in a GCP project.
193 |         
194 |         Args:
195 |             project_id: The ID of the GCP project
196 |             display_name: The display name for the alert policy
197 |             metric_type: The metric type to monitor (e.g., "compute.googleapis.com/instance/cpu/utilization")
198 |             filter_str: The filter for the metric data
199 |             duration_seconds: The duration in seconds over which to evaluate the condition (default: 60)
200 |             threshold_value: The threshold value for the condition (default: 0.0)
201 |             comparison: The comparison type (COMPARISON_GT, COMPARISON_LT, etc.) (default: COMPARISON_GT)
202 |             notification_channels: Optional list of notification channel IDs
203 |         
204 |         Returns:
205 |             Result of the alert policy creation
206 |         """
207 |         try:
208 |             from google.cloud import monitoring_v3
209 |             from google.protobuf import duration_pb2
210 |             
211 |             # Initialize the Alert Policy Service client
212 |             client = monitoring_v3.AlertPolicyServiceClient()
213 |             
214 |             # Format the project name
215 |             project_name = f"projects/{project_id}"
216 |             
217 |             # Create a duration object
218 |             duration = duration_pb2.Duration(seconds=duration_seconds)
219 |             
220 |             # Create the alert condition
221 |             condition = monitoring_v3.AlertPolicy.Condition(
222 |                 display_name=f"Condition for {display_name}",
223 |                 condition_threshold=monitoring_v3.AlertPolicy.Condition.MetricThreshold(
224 |                     filter=filter_str,
225 |                     comparison=getattr(monitoring_v3.ComparisonType, comparison),
226 |                     threshold_value=threshold_value,
227 |                     duration=duration,
228 |                     trigger=monitoring_v3.AlertPolicy.Condition.Trigger(
229 |                         count=1
230 |                     ),
231 |                     aggregations=[
232 |                         monitoring_v3.Aggregation(
233 |                             alignment_period=duration_pb2.Duration(seconds=60),
234 |                             per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN,
235 |                             cross_series_reducer=monitoring_v3.Aggregation.Reducer.REDUCE_MEAN
236 |                         )
237 |                     ]
238 |                 )
239 |             )
240 |             
241 |             # Create the alert policy
242 |             alert_policy = monitoring_v3.AlertPolicy(
243 |                 display_name=display_name,
244 |                 conditions=[condition],
245 |                 combiner=monitoring_v3.AlertPolicy.ConditionCombinerType.OR
246 |             )
247 |             
248 |             # Add notification channels if provided
249 |             if notification_channels:
250 |                 alert_policy.notification_channels = [
251 |                     f"projects/{project_id}/notificationChannels/{channel_id}" 
252 |                     for channel_id in notification_channels
253 |                 ]
254 |             
255 |             # Create the policy
256 |             policy = client.create_alert_policy(name=project_name, alert_policy=alert_policy)
257 |             
258 |             # Format response
259 |             conditions_str = "\n".join([
260 |                 f"- {c.display_name}: {c.condition_threshold.filter}" 
261 |                 for c in policy.conditions
262 |             ])
263 |             
264 |             notifications_str = "None"
265 |             if policy.notification_channels:
266 |                 notifications_str = "\n".join([
267 |                     f"- {channel.split('/')[-1]}" 
268 |                     for channel in policy.notification_channels
269 |                 ])
270 |             
271 |             return f"""
272 | Alert Policy created successfully:
273 | - Name: {policy.display_name}
274 | - Policy ID: {policy.name.split('/')[-1]}
275 | - Combiner: {policy.combiner.name}
276 | 
277 | Conditions:
278 | {conditions_str}
279 | 
280 | Notification Channels:
281 | {notifications_str}
282 | """
283 |         except Exception as e:
284 |             return f"Error creating alert policy: {str(e)}"
285 |     
286 |     @mcp.tool()
287 |     def list_uptime_checks(project_id: str) -> str:
288 |         """
289 |         List Uptime checks in a GCP project.
290 |         
291 |         Args:
292 |             project_id: The ID of the GCP project to list Uptime checks for
293 |         
294 |         Returns:
295 |             List of Uptime checks in the specified GCP project
296 |         """
297 |         try:
298 |             from google.cloud import monitoring_v3
299 |             
300 |             # Initialize the Uptime Check Service client
301 |             client = monitoring_v3.UptimeCheckServiceClient()
302 |             
303 |             # Format the project name
304 |             project_name = f"projects/{project_id}"
305 |             
306 |             # Create the request object
307 |             request = monitoring_v3.ListUptimeCheckConfigsRequest(
308 |                 parent=project_name
309 |             )
310 |             
311 |             # List uptime checks
312 |             uptime_checks = client.list_uptime_check_configs(request=request)
313 |             
314 |             # Format the response
315 |             checks_list = []
316 |             
317 |             for check in uptime_checks:
318 |                 check_id = check.name.split('/')[-1]
319 |                 display_name = check.display_name
320 |                 period_seconds = check.period.seconds
321 |                 timeout_seconds = check.timeout.seconds
322 |                 
323 |                 # Get check type and details
324 |                 check_details = []
325 |                 if check.HasField('http_check'):
326 |                     check_type = "HTTP"
327 |                     url = check.http_check.path
328 |                     if check.resource.HasField('monitored_resource'):
329 |                         host = check.monitored_resource.labels.get('host', 'Unknown')
330 |                         url = f"{host}{url}"
331 |                     elif check.http_check.HasField('host'):
332 |                         url = f"{check.http_check.host}{url}"
333 |                     check_details.append(f"URL: {url}")
334 |                     check_details.append(f"Port: {check.http_check.port}")
335 |                     
336 |                 elif check.HasField('tcp_check'):
337 |                     check_type = "TCP"
338 |                     if check.resource.HasField('monitored_resource'):
339 |                         host = check.monitored_resource.labels.get('host', 'Unknown')
340 |                     else:
341 |                         host = check.tcp_check.host
342 |                     check_details.append(f"Host: {host}")
343 |                     check_details.append(f"Port: {check.tcp_check.port}")
344 |                 else:
345 |                     check_type = "Unknown"
346 |                 
347 |                 checks_list.append(f"- {display_name} (ID: {check_id}, Type: {check_type})")
348 |                 checks_list.append(f"  Frequency: {period_seconds}s, Timeout: {timeout_seconds}s")
349 |                 checks_list.extend([f"  {detail}" for detail in check_details])
350 |             
351 |             if not checks_list:
352 |                 return f"No Uptime checks found for project {project_id}."
353 |             
354 |             checks_str = "\n".join(checks_list)
355 |             
356 |             return f"""
357 | Uptime Checks in GCP Project {project_id}:
358 | {checks_str}
359 | """
360 |         except Exception as e:
361 |             return f"Error listing Uptime checks: {str(e)}"
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/networking/tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Google Cloud Platform Networking tools.
  3 | """
  4 | from typing import List, Dict, Any, Optional
  5 | 
  6 | def register_tools(mcp):
  7 |     """Register all networking tools with the MCP server."""
  8 |     
  9 |     @mcp.tool()
 10 |     def list_vpc_networks(project_id: str) -> str:
 11 |         """
 12 |         List Virtual Private Cloud (VPC) networks in a GCP project.
 13 |         
 14 |         Args:
 15 |             project_id: The ID of the GCP project to list VPC networks for
 16 |         
 17 |         Returns:
 18 |             List of VPC networks in the specified GCP project
 19 |         """
 20 |         try:
 21 |             from google.cloud import compute_v1
 22 |             
 23 |             # Initialize the Compute Engine client for networks
 24 |             client = compute_v1.NetworksClient()
 25 |             
 26 |             # List networks
 27 |             request = compute_v1.ListNetworksRequest(project=project_id)
 28 |             networks = client.list(request=request)
 29 |             
 30 |             # Format the response
 31 |             networks_list = []
 32 |             for network in networks:
 33 |                 subnet_mode = "Auto" if network.auto_create_subnetworks else "Custom"
 34 |                 creation_time = network.creation_timestamp if network.creation_timestamp else "Unknown"
 35 |                 
 36 |                 # Get subnet information if available
 37 |                 subnets = []
 38 |                 if not network.auto_create_subnetworks and network.subnetworks:
 39 |                     for subnet_url in network.subnetworks:
 40 |                         subnet_name = subnet_url.split('/')[-1]
 41 |                         subnet_region = subnet_url.split('/')[-3]
 42 |                         subnets.append(f"    - {subnet_name} (Region: {subnet_region})")
 43 |                 
 44 |                 network_info = f"- {network.name} (Mode: {subnet_mode}, Created: {creation_time})"
 45 |                 if subnets:
 46 |                     network_info += "\n  Subnets:\n" + "\n".join(subnets)
 47 |                     
 48 |                 networks_list.append(network_info)
 49 |             
 50 |             if not networks_list:
 51 |                 return f"No VPC networks found in project {project_id}."
 52 |             
 53 |             networks_str = "\n".join(networks_list)
 54 |             
 55 |             return f"""
 56 | VPC Networks in GCP Project {project_id}:
 57 | {networks_str}
 58 | """
 59 |         except Exception as e:
 60 |             return f"Error listing VPC networks: {str(e)}"
 61 |     
 62 |     @mcp.tool()
 63 |     def get_vpc_details(project_id: str, network_name: str) -> str:
 64 |         """
 65 |         Get detailed information about a specific VPC network.
 66 |         
 67 |         Args:
 68 |             project_id: The ID of the GCP project
 69 |             network_name: The name of the VPC network
 70 |         
 71 |         Returns:
 72 |             Detailed information about the specified VPC network
 73 |         """
 74 |         try:
 75 |             from google.cloud import compute_v1
 76 |             
 77 |             # Initialize the Compute Engine client for networks
 78 |             network_client = compute_v1.NetworksClient()
 79 |             subnet_client = compute_v1.SubnetworksClient()
 80 |             
 81 |             # Get network details
 82 |             network = network_client.get(project=project_id, network=network_name)
 83 |             
 84 |             # Format the response
 85 |             details = []
 86 |             details.append(f"Name: {network.name}")
 87 |             details.append(f"ID: {network.id}")
 88 |             details.append(f"Description: {network.description or 'None'}")
 89 |             details.append(f"Self Link: {network.self_link}")
 90 |             details.append(f"Creation Time: {network.creation_timestamp}")
 91 |             details.append(f"Subnet Mode: {'Auto' if network.auto_create_subnetworks else 'Custom'}")
 92 |             details.append(f"Routing Mode: {network.routing_config.routing_mode if network.routing_config else 'Unknown'}")
 93 |             details.append(f"MTU: {network.mtu}")
 94 |             
 95 |             # If it's a custom subnet mode network, get all subnets
 96 |             if not network.auto_create_subnetworks:
 97 |                 # List all subnets in this network
 98 |                 request = compute_v1.ListSubnetworksRequest(project=project_id)
 99 |                 subnets = []
100 |                 
101 |                 for item in subnet_client.list(request=request):
102 |                     # Check if the subnet belongs to this network
103 |                     if network.name in item.network:
104 |                         cidr = item.ip_cidr_range
105 |                         region = item.region.split('/')[-1]
106 |                         purpose = f", Purpose: {item.purpose}" if item.purpose else ""
107 |                         private_ip = ", Private Google Access: Enabled" if item.private_ip_google_access else ""
108 |                         subnets.append(f"  - {item.name} (Region: {region}, CIDR: {cidr}{purpose}{private_ip})")
109 |                 
110 |                 if subnets:
111 |                     details.append(f"Subnets ({len(subnets)}):\n" + "\n".join(subnets))
112 |             
113 |             # List peering connections if any
114 |             if network.peerings:
115 |                 peerings = []
116 |                 for peering in network.peerings:
117 |                     state = peering.state
118 |                     network_name = peering.network.split('/')[-1]
119 |                     peerings.append(f"  - {network_name} (State: {state})")
120 |                 
121 |                 if peerings:
122 |                     details.append(f"Peerings ({len(peerings)}):\n" + "\n".join(peerings))
123 |             
124 |             details_str = "\n".join(details)
125 |             
126 |             return f"""
127 | VPC Network Details:
128 | {details_str}
129 | """
130 |         except Exception as e:
131 |             return f"Error getting VPC network details: {str(e)}"
132 |     
133 |     @mcp.tool()
134 |     def list_subnets(project_id: str, region: Optional[str] = None) -> str:
135 |         """
136 |         List subnets in a GCP project, optionally filtered by region.
137 |         
138 |         Args:
139 |             project_id: The ID of the GCP project
140 |             region: Optional region to filter subnets by
141 |         
142 |         Returns:
143 |             List of subnets in the specified GCP project
144 |         """
145 |         try:
146 |             from google.cloud import compute_v1
147 |             
148 |             # Initialize the Compute Engine client for subnets
149 |             client = compute_v1.SubnetworksClient()
150 |             
151 |             # List subnets
152 |             request = compute_v1.ListSubnetworksRequest(project=project_id, region=region) if region else compute_v1.ListSubnetworksRequest(project=project_id)
153 |             subnets = client.list(request=request)
154 |             
155 |             # Format the response
156 |             subnets_list = []
157 |             for subnet in subnets:
158 |                 network_name = subnet.network.split('/')[-1]
159 |                 region_name = subnet.region.split('/')[-1]
160 |                 cidr = subnet.ip_cidr_range
161 |                 purpose = f", Purpose: {subnet.purpose}" if subnet.purpose else ""
162 |                 private_ip = ", Private Google Access: Enabled" if subnet.private_ip_google_access else ""
163 |                 
164 |                 subnet_info = f"- {subnet.name} (Network: {network_name}, Region: {region_name}, CIDR: {cidr}{purpose}{private_ip})"
165 |                 subnets_list.append(subnet_info)
166 |             
167 |             if not subnets_list:
168 |                 return f"No subnets found in project {project_id}{' for region ' + region if region else ''}."
169 |             
170 |             subnets_str = "\n".join(subnets_list)
171 |             
172 |             return f"""
173 | Subnets in GCP Project {project_id}{' for region ' + region if region else ''}:
174 | {subnets_str}
175 | """
176 |         except Exception as e:
177 |             return f"Error listing subnets: {str(e)}"
178 |     
179 |     @mcp.tool()
180 |     def create_firewall_rule(project_id: str, name: str, network: str, direction: str, priority: int, 
181 |                            source_ranges: Optional[List[str]] = None, destination_ranges: Optional[List[str]] = None,
182 |                            allowed_protocols: Optional[List[Dict[str, Any]]] = None, denied_protocols: Optional[List[Dict[str, Any]]] = None,
183 |                            target_tags: Optional[List[str]] = None, source_tags: Optional[List[str]] = None, 
184 |                            description: Optional[str] = None) -> str:
185 |         """
186 |         Create a firewall rule in a GCP project.
187 |         
188 |         Args:
189 |             project_id: The ID of the GCP project
190 |             name: The name of the firewall rule
191 |             network: The name of the network to create the firewall rule for
192 |             direction: The direction of traffic to match ('INGRESS' or 'EGRESS')
193 |             priority: The priority of the rule (lower number = higher priority, 0-65535)
194 |             source_ranges: Optional list of source IP ranges (for INGRESS)
195 |             destination_ranges: Optional list of destination IP ranges (for EGRESS)
196 |             allowed_protocols: Optional list of allowed protocols, e.g. [{"IPProtocol": "tcp", "ports": ["80", "443"]}]
197 |             denied_protocols: Optional list of denied protocols, e.g. [{"IPProtocol": "tcp", "ports": ["22"]}]
198 |             target_tags: Optional list of target instance tags
199 |             source_tags: Optional list of source instance tags (for INGRESS)
200 |             description: Optional description for the firewall rule
201 |         
202 |         Returns:
203 |             Result of the firewall rule creation
204 |         """
205 |         try:
206 |             from google.cloud import compute_v1
207 |             
208 |             # Initialize the Compute Engine client for firewall
209 |             client = compute_v1.FirewallsClient()
210 |             
211 |             # Create the firewall resource
212 |             firewall = compute_v1.Firewall()
213 |             firewall.name = name
214 |             firewall.network = f"projects/{project_id}/global/networks/{network}"
215 |             firewall.direction = direction
216 |             firewall.priority = priority
217 |             
218 |             if description:
219 |                 firewall.description = description
220 |             
221 |             # Set source/destination ranges based on direction
222 |             if direction == "INGRESS" and source_ranges:
223 |                 firewall.source_ranges = source_ranges
224 |             elif direction == "EGRESS" and destination_ranges:
225 |                 firewall.destination_ranges = destination_ranges
226 |             
227 |             # Set allowed protocols
228 |             if allowed_protocols:
229 |                 firewall.allowed = []
230 |                 for protocol in allowed_protocols:
231 |                     allowed = compute_v1.Allowed()
232 |                     allowed.I_p_protocol = protocol["IPProtocol"]
233 |                     if "ports" in protocol:
234 |                         allowed.ports = protocol["ports"]
235 |                     firewall.allowed.append(allowed)
236 |             
237 |             # Set denied protocols
238 |             if denied_protocols:
239 |                 firewall.denied = []
240 |                 for protocol in denied_protocols:
241 |                     denied = compute_v1.Denied()
242 |                     denied.I_p_protocol = protocol["IPProtocol"]
243 |                     if "ports" in protocol:
244 |                         denied.ports = protocol["ports"]
245 |                     firewall.denied.append(denied)
246 |             
247 |             # Set target tags
248 |             if target_tags:
249 |                 firewall.target_tags = target_tags
250 |             
251 |             # Set source tags
252 |             if source_tags and direction == "INGRESS":
253 |                 firewall.source_tags = source_tags
254 |             
255 |             # Create the firewall rule
256 |             operation = client.insert(project=project_id, firewall_resource=firewall)
257 |             
258 |             return f"""
259 | Firewall rule creation initiated:
260 | - Name: {name}
261 | - Network: {network}
262 | - Direction: {direction}
263 | - Priority: {priority}
264 | - Description: {description or 'None'}
265 | - Source Ranges: {source_ranges or 'None'}
266 | - Destination Ranges: {destination_ranges or 'None'}
267 | - Allowed Protocols: {allowed_protocols or 'None'}
268 | - Denied Protocols: {denied_protocols or 'None'}
269 | - Target Tags: {target_tags or 'None'}
270 | - Source Tags: {source_tags or 'None'}
271 | 
272 | Operation ID: {operation.id}
273 | Status: {operation.status}
274 | """
275 |         except Exception as e:
276 |             return f"Error creating firewall rule: {str(e)}"
277 |     
278 |     @mcp.tool()
279 |     def list_firewall_rules(project_id: str, network: Optional[str] = None) -> str:
280 |         """
281 |         List firewall rules in a GCP project, optionally filtered by network.
282 |         
283 |         Args:
284 |             project_id: The ID of the GCP project
285 |             network: Optional network name to filter firewall rules by
286 |         
287 |         Returns:
288 |             List of firewall rules in the specified GCP project
289 |         """
290 |         try:
291 |             from google.cloud import compute_v1
292 |             
293 |             # Initialize the Compute Engine client for firewall
294 |             client = compute_v1.FirewallsClient()
295 |             
296 |             # List firewall rules
297 |             request = compute_v1.ListFirewallsRequest(project=project_id)
298 |             firewalls = client.list(request=request)
299 |             
300 |             # Format the response
301 |             firewalls_list = []
302 |             for firewall in firewalls:
303 |                 # If network filter is applied, skip firewalls not in that network
304 |                 if network and network not in firewall.network:
305 |                     continue
306 |                 
307 |                 # Get network name from the full URL
308 |                 network_name = firewall.network.split('/')[-1]
309 |                 
310 |                 # Get allowed/denied protocols
311 |                 allowed = []
312 |                 for allow in firewall.allowed:
313 |                     ports = f":{','.join(allow.ports)}" if allow.ports else ""
314 |                     allowed.append(f"{allow.I_p_protocol}{ports}")
315 |                 
316 |                 denied = []
317 |                 for deny in firewall.denied:
318 |                     ports = f":{','.join(deny.ports)}" if deny.ports else ""
319 |                     denied.append(f"{deny.I_p_protocol}{ports}")
320 |                 
321 |                 # Format sources/destinations based on direction
322 |                 if firewall.direction == "INGRESS":
323 |                     sources = firewall.source_ranges or firewall.source_tags or ["Any"]
324 |                     destinations = ["Any"]
325 |                 else:  # EGRESS
326 |                     sources = ["Any"]
327 |                     destinations = firewall.destination_ranges or ["Any"]
328 |                 
329 |                 # Create the firewall rule info string
330 |                 rule_info = [
331 |                     f"- {firewall.name}",
332 |                     f"  Network: {network_name}",
333 |                     f"  Direction: {firewall.direction}",
334 |                     f"  Priority: {firewall.priority}",
335 |                     f"  Action: {'Allow' if firewall.allowed else 'Deny'}"
336 |                 ]
337 |                 
338 |                 if allowed:
339 |                     rule_info.append(f"  Allowed: {', '.join(allowed)}")
340 |                 if denied:
341 |                     rule_info.append(f"  Denied: {', '.join(denied)}")
342 |                 
343 |                 rule_info.append(f"  Sources: {', '.join(sources)}")
344 |                 rule_info.append(f"  Destinations: {', '.join(destinations)}")
345 |                 
346 |                 if firewall.target_tags:
347 |                     rule_info.append(f"  Target Tags: {', '.join(firewall.target_tags)}")
348 |                 
349 |                 firewalls_list.append("\n".join(rule_info))
350 |             
351 |             if not firewalls_list:
352 |                 return f"No firewall rules found in project {project_id}{' for network ' + network if network else ''}."
353 |             
354 |             firewalls_str = "\n".join(firewalls_list)
355 |             
356 |             return f"""
357 | Firewall Rules in GCP Project {project_id}{' for network ' + network if network else ''}:
358 | {firewalls_str}
359 | """
360 |         except Exception as e:
361 |             return f"Error listing firewall rules: {str(e)}"
362 |     
363 |     @mcp.tool()
364 |     def list_gcp_services(project_id: str) -> str:
365 |         """
366 |         List enabled services/APIs in a GCP project.
367 |         
368 |         Args:
369 |             project_id: The ID of the GCP project to list services for
370 |         
371 |         Returns:
372 |             List of enabled services in the specified GCP project
373 |         """
374 |         try:
375 |             try:
376 |                 from google.cloud import service_usage
377 |             except ImportError:
378 |                 return "Error: The Google Cloud service usage library is not installed. Please install it with 'pip install google-cloud-service-usage'."
379 |             
380 |             # Initialize the Service Usage client
381 |             client = service_usage.ServiceUsageClient()
382 |             
383 |             # Create the request
384 |             request = service_usage.ListServicesRequest(
385 |                 parent=f"projects/{project_id}",
386 |                 filter="state:ENABLED"
387 |             )
388 |             
389 |             # List enabled services
390 |             services = client.list_services(request=request)
391 |             
392 |             # Format the response
393 |             services_list = []
394 |             for service in services:
395 |                 name = service.name.split('/')[-1] if service.name else "Unknown"
396 |                 title = service.config.title if service.config else "Unknown"
397 |                 services_list.append(f"- {name}: {title}")
398 |             
399 |             if not services_list:
400 |                 return f"No services are enabled in project {project_id}."
401 |             
402 |             services_str = "\n".join(services_list)
403 |             
404 |             return f"""
405 | Enabled Services in GCP Project {project_id}:
406 | {services_str}
407 | """
408 |         except Exception as e:
409 |             return f"Error listing GCP services: {str(e)}"
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/auth/tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Google Cloud Platform Authentication tools.
  3 | """
  4 | import os
  5 | import json
  6 | from typing import Dict, Any, Optional, List
  7 | import webbrowser
  8 | import tempfile
  9 | from pathlib import Path
 10 | 
 11 | def register_tools(mcp):
 12 |     """Register all authentication tools with the MCP server."""
 13 |     
 14 |     # Global variable to store the current project ID
 15 |     _current_project_id = None
 16 |     
 17 |     @mcp.tool()
 18 |     def auth_login(project_id: str = "") -> str:
 19 |         """
 20 |         Authenticate with Google Cloud Platform using browser-based OAuth flow.
 21 |         
 22 |         Args:
 23 |             project_id: Optional project ID to set as default after login
 24 |         
 25 |         Returns:
 26 |             Status message indicating whether authentication was successful
 27 |         """
 28 |         nonlocal _current_project_id
 29 |         
 30 |         try:
 31 |             from google.auth.transport.requests import Request
 32 |             from google.auth.exceptions import DefaultCredentialsError
 33 |             from google_auth_oauthlib.flow import InstalledAppFlow
 34 |             import google.auth
 35 |             
 36 |             # First, attempt to use existing credentials to see if we're already authenticated
 37 |             try:
 38 |                 credentials, project = google.auth.default()
 39 |                 
 40 |                 # Test if credentials are valid
 41 |                 if hasattr(credentials, 'refresh'):
 42 |                     credentials.refresh(Request())
 43 |                 
 44 |                 # If we get here, credentials are valid
 45 |                 if project_id:
 46 |                     # Update global project ID
 47 |                     _current_project_id = project_id
 48 |                     
 49 |                     # Create a credential configuration file for the project
 50 |                     _set_project_id_in_config(project_id)
 51 |                     return f"Using existing credentials. Project set to {project_id}."
 52 |                 else:
 53 |                     return f"Using existing credentials. Current project: {project or 'Not set'}"
 54 |                     
 55 |             except (DefaultCredentialsError, Exception) as e:
 56 |                 # If we can't use existing credentials, proceed with login
 57 |                 pass
 58 |             
 59 |             # Set up the OAuth flow
 60 |             print("Opening browser for authentication...")
 61 |             
 62 |             # Create a temporary client_secrets.json file for OAuth flow
 63 |             client_secrets = {
 64 |                 "installed": {
 65 |                     "client_id": "764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com",
 66 |                     "project_id": "gcp-mcp",
 67 |                     "auth_uri": "https://accounts.google.com/o/oauth2/auth",
 68 |                     "token_uri": "https://oauth2.googleapis.com/token",
 69 |                     "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
 70 |                     "client_secret": "d-FL95Q19q7MQmFpd7hHD0Ty",
 71 |                     "redirect_uris": ["http://localhost", "urn:ietf:wg:oauth:2.0:oob"]
 72 |                 }
 73 |             }
 74 |             
 75 |             with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp:
 76 |                 temp_client_secrets_path = temp.name
 77 |                 json.dump(client_secrets, temp)
 78 |                 
 79 |             try:
 80 |                 # Create the OAuth flow
 81 |                 flow = InstalledAppFlow.from_client_secrets_file(
 82 |                     temp_client_secrets_path,
 83 |                     scopes=['https://www.googleapis.com/auth/cloud-platform']
 84 |                 )
 85 |                 
 86 |                 # Run the flow
 87 |                 creds = flow.run_local_server(port=0)
 88 |                 
 89 |                 # Save the credentials as application default credentials
 90 |                 adc_path = _get_adc_path()
 91 |                 os.makedirs(os.path.dirname(adc_path), exist_ok=True)
 92 |                 
 93 |                 # Write credentials to ADC file
 94 |                 with open(adc_path, 'w') as f:
 95 |                     creds_data = {
 96 |                         "client_id": creds.client_id,
 97 |                         "client_secret": creds.client_secret,
 98 |                         "refresh_token": creds.refresh_token,
 99 |                         "type": "authorized_user"
100 |                     }
101 |                     json.dump(creds_data, f)
102 |                 
103 |                 # Set project if specified
104 |                 if project_id:
105 |                     _current_project_id = project_id
106 |                     _set_project_id_in_config(project_id)
107 |                 
108 |                 success_msg = "Authentication successful!"
109 |                 
110 |                 if project_id:
111 |                     success_msg += f" Default project set to {project_id}."
112 |                 
113 |                 # Test by listing accessible projects
114 |                 try:
115 |                     from google.cloud import resourcemanager_v3
116 |                     
117 |                     # Get fresh credentials after login
118 |                     credentials, _ = google.auth.default()
119 |                     client = resourcemanager_v3.ProjectsClient(credentials=credentials)
120 |                     request = resourcemanager_v3.ListProjectsRequest()
121 |                     projects = list(client.list_projects(request=request))
122 |                     
123 |                     project_count = len(projects)
124 |                     if project_count > 0:
125 |                         project_list = "\n".join([f"- {project.display_name} (ID: {project.project_id})" for project in projects[:5]])
126 |                         if project_count > 5:
127 |                             project_list += f"\n... and {project_count - 5} more"
128 |                         
129 |                         success_msg += f"\n\nFound {project_count} accessible projects:\n{project_list}"
130 |                 except Exception as e:
131 |                     # Don't fail if we can't list projects
132 |                     pass
133 |                 
134 |                 return success_msg
135 |             
136 |             finally:
137 |                 # Clean up the temporary file
138 |                 try:
139 |                     os.unlink(temp_client_secrets_path)
140 |                 except:
141 |                     pass
142 |                 
143 |         except Exception as e:
144 |             return f"Authentication error: {str(e)}"
145 |     
146 |     @mcp.tool()
147 |     def auth_list() -> str:
148 |         """
149 |         List active Google Cloud credentials.
150 |         
151 |         Returns:
152 |             List of active credentials and the current default account
153 |         """
154 |         try:
155 |             import google.auth
156 |             
157 |             # Check application default credentials
158 |             try:
159 |                 credentials, project = google.auth.default()
160 |                 
161 |                 # Try to get email from credentials
162 |                 email = None
163 |                 if hasattr(credentials, 'service_account_email'):
164 |                     email = credentials.service_account_email
165 |                 elif hasattr(credentials, 'refresh_token') and credentials.refresh_token:
166 |                     # This is a user credential
167 |                     adc_path = _get_adc_path()
168 |                     if os.path.exists(adc_path):
169 |                         try:
170 |                             with open(adc_path, 'r') as f:
171 |                                 data = json.load(f)
172 |                                 if 'refresh_token' in data:
173 |                                     # This is a user auth, but we can't get the email directly
174 |                                     email = "User account (ADC)"
175 |                         except:
176 |                             pass
177 |                 
178 |                 credential_type = type(credentials).__name__
179 |                 
180 |                 output = "Active Credentials:\n"
181 |                 if email:
182 |                     output += f"- {email} (Application Default Credentials, type: {credential_type})\n"
183 |                 else:
184 |                     output += f"- Application Default Credentials (type: {credential_type})\n"
185 |                 
186 |                 if project:
187 |                     output += f"\nCurrent Project: {project}\n"
188 |                 else:
189 |                     output += "\nNo project set in default credentials.\n"
190 |                 
191 |                 # Check for other credentials in well-known locations
192 |                 credentials_dir = os.path.expanduser("~/.config/gcloud/credentials")
193 |                 if os.path.isdir(credentials_dir):
194 |                     cred_files = [f for f in os.listdir(credentials_dir) if f.endswith('.json')]
195 |                     if cred_files:
196 |                         output += "\nOther available credentials:\n"
197 |                         for cred_file in cred_files:
198 |                             try:
199 |                                 with open(os.path.join(credentials_dir, cred_file), 'r') as f:
200 |                                     data = json.load(f)
201 |                                     if 'client_id' in data:
202 |                                         output += f"- User account ({cred_file})\n"
203 |                                     elif 'private_key_id' in data:
204 |                                         output += f"- Service account: {data.get('client_email', 'Unknown')} ({cred_file})\n"
205 |                             except:
206 |                                 output += f"- Unknown credential type ({cred_file})\n"
207 |                 
208 |                 return output
209 |             except Exception as e:
210 |                 return f"No active credentials found. Please run auth_login() to authenticate.\nError: {str(e)}"
211 |                 
212 |         except Exception as e:
213 |             return f"Error listing credentials: {str(e)}"
214 |     
215 |     @mcp.tool()
216 |     def auth_revoke() -> str:
217 |         """
218 |         Revoke Google Cloud credentials.
219 |         
220 |         Returns:
221 |             Status message indicating whether the credentials were revoked
222 |         """
223 |         try:
224 |             import google.auth
225 |             from google.auth.transport.requests import Request
226 |             
227 |             # Check if we have application default credentials
228 |             try:
229 |                 credentials, _ = google.auth.default()
230 |                 
231 |                 # If credentials have a revoke method, use it
232 |                 if hasattr(credentials, 'revoke'):
233 |                     credentials.revoke(Request())
234 |                 
235 |                 # Remove the application default credentials file
236 |                 adc_path = _get_adc_path()
237 |                 if os.path.exists(adc_path):
238 |                     os.remove(adc_path)
239 |                     return "Application default credentials have been revoked and removed."
240 |                 else:
241 |                     return "No application default credentials file found to remove."
242 |             
243 |             except Exception as e:
244 |                 return f"No active credentials found or failed to revoke: {str(e)}"
245 |                 
246 |         except Exception as e:
247 |             return f"Error revoking credentials: {str(e)}"
248 |     
249 |     @mcp.tool()
250 |     def config_set_project(project_id: str) -> str:
251 |         """
252 |         Set the default Google Cloud project.
253 |         
254 |         Args:
255 |             project_id: The ID of the project to set as default
256 |         
257 |         Returns:
258 |             Status message indicating whether the project was set
259 |         """
260 |         nonlocal _current_project_id
261 |         
262 |         try:
263 |             # Update global project ID
264 |             _current_project_id = project_id
265 |             
266 |             # Create or update the config file
267 |             _set_project_id_in_config(project_id)
268 |             
269 |             # Verify the project exists
270 |             try:
271 |                 from google.cloud import resourcemanager_v3
272 |                 import google.auth
273 |                 
274 |                 credentials, _ = google.auth.default()
275 |                 client = resourcemanager_v3.ProjectsClient(credentials=credentials)
276 |                 name = f"projects/{project_id}"
277 |                 
278 |                 try:
279 |                     project = client.get_project(name=name)
280 |                     return f"Default project set to {project_id} ({project.display_name})."
281 |                 except Exception:
282 |                     # Project might not exist or user might not have access
283 |                     return f"Default project set to {project_id}. Note: Could not verify if this project exists or if you have access to it."
284 |             
285 |             except Exception as e:
286 |                 # Don't fail if we can't verify the project
287 |                 return f"Default project set to {project_id}."
288 |                 
289 |         except Exception as e:
290 |             return f"Error setting project: {str(e)}"
291 |             
292 |     @mcp.tool()
293 |     def config_list() -> str:
294 |         """
295 |         List the current Google Cloud configuration.
296 |         
297 |         Returns:
298 |             Current configuration settings
299 |         """
300 |         try:
301 |             # Get project ID from config
302 |             project_id = _get_project_id_from_config()
303 |             
304 |             # Get project ID from global variable if set
305 |             if _current_project_id:
306 |                 project_id = _current_project_id
307 |             
308 |             output = "Current Configuration:\n"
309 |             
310 |             if project_id:
311 |                 output += f"- Project ID: {project_id}\n"
312 |             else:
313 |                 output += "- Project ID: Not set\n"
314 |             
315 |             # Check if we have active credentials
316 |             try:
317 |                 import google.auth
318 |                 
319 |                 credentials, default_project = google.auth.default()
320 |                 
321 |                 if hasattr(credentials, 'service_account_email'):
322 |                     output += f"- Authenticated as: {credentials.service_account_email} (Service Account)\n"
323 |                 else:
324 |                     output += "- Authenticated as: User Account\n"
325 |                 
326 |                 if default_project and default_project != project_id:
327 |                     output += f"- Default Project in Credentials: {default_project}\n"
328 |             except Exception:
329 |                 output += "- Authentication: Not authenticated or credentials not found\n"
330 |             
331 |             # Get additional configuration
332 |             config_file = os.path.join(_get_config_path(), 'configurations', 'config_default')
333 |             if os.path.exists(config_file):
334 |                 try:
335 |                     with open(config_file, 'r') as f:
336 |                         config_lines = f.readlines()
337 |                     
338 |                     if config_lines:
339 |                         output += "\nAdditional Configuration Settings:\n"
340 |                         for line in config_lines:
341 |                             line = line.strip()
342 |                             if line and not line.startswith('#') and '=' in line:
343 |                                 key, value = line.split('=', 1)
344 |                                 key = key.strip()
345 |                                 value = value.strip()
346 |                                 
347 |                                 # Skip project since we already displayed it
348 |                                 if key != 'project':
349 |                                     output += f"- {key}: {value}\n"
350 |                 except:
351 |                     pass
352 |             
353 |             return output
354 |             
355 |         except Exception as e:
356 |             return f"Error listing configuration: {str(e)}"
357 |     
358 |     # Helper functions
359 |     def _get_adc_path() -> str:
360 |         """Get the path to the application default credentials file."""
361 |         # Standard ADC paths by platform
362 |         if os.name == 'nt':  # Windows
363 |             return os.path.join(os.environ.get('APPDATA', ''), 'gcloud', 'application_default_credentials.json')
364 |         else:  # Linux/Mac
365 |             return os.path.expanduser('~/.config/gcloud/application_default_credentials.json')
366 |     
367 |     def _get_config_path() -> str:
368 |         """Get the path to the configuration directory."""
369 |         if os.name == 'nt':  # Windows
370 |             return os.path.join(os.environ.get('APPDATA', ''), 'gcloud')
371 |         else:  # Linux/Mac
372 |             return os.path.expanduser('~/.config/gcloud')
373 |             
374 |     def _set_project_id_in_config(project_id: str) -> None:
375 |         """Set the project ID in the configuration file."""
376 |         config_dir = _get_config_path()
377 |         os.makedirs(config_dir, exist_ok=True)
378 |         
379 |         config_file = os.path.join(config_dir, 'configurations', 'config_default')
380 |         os.makedirs(os.path.dirname(config_file), exist_ok=True)
381 |         
382 |         # Read existing config if it exists
383 |         config_data = {}
384 |         if os.path.exists(config_file):
385 |             try:
386 |                 with open(config_file, 'r') as f:
387 |                     for line in f:
388 |                         if '=' in line:
389 |                             key, value = line.strip().split('=', 1)
390 |                             config_data[key.strip()] = value.strip()
391 |             except:
392 |                 pass
393 |         
394 |         # Update project
395 |         config_data['project'] = project_id
396 |         
397 |         # Write back config
398 |         with open(config_file, 'w') as f:
399 |             for key, value in config_data.items():
400 |                 f.write(f"{key} = {value}\n")
401 |     
402 |     def _get_project_id_from_config() -> Optional[str]:
403 |         """Get the project ID from the configuration file."""
404 |         config_file = os.path.join(_get_config_path(), 'configurations', 'config_default')
405 |         
406 |         if os.path.exists(config_file):
407 |             try:
408 |                 with open(config_file, 'r') as f:
409 |                     for line in f:
410 |                         if line.strip().startswith('project ='):
411 |                             return line.split('=', 1)[1].strip()
412 |             except:
413 |                 pass
414 |         
415 |         return None
```

--------------------------------------------------------------------------------
/src/gcp_mcp/gcp_modules/compute/tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Google Cloud Platform Compute Engine tools.
  3 | """
  4 | from typing import List, Dict, Any, Optional
  5 | 
  6 | def register_tools(mcp):
  7 |     """Register all compute tools with the MCP server."""
  8 |     
  9 |     @mcp.tool()
 10 |     def list_compute_instances(project_id: str, zone: str = "") -> str:
 11 |         """
 12 |         List Compute Engine instances in a GCP project.
 13 |         
 14 |         Args:
 15 |             project_id: The ID of the GCP project to list instances for
 16 |             zone: Optional zone to filter instances (e.g., "us-central1-a")
 17 |         
 18 |         Returns:
 19 |             List of Compute Engine instances in the specified GCP project
 20 |         """
 21 |         try:
 22 |             from google.cloud import compute_v1
 23 |             
 24 |             # Initialize the Compute Engine client
 25 |             client = compute_v1.InstancesClient()
 26 |             
 27 |             instances_list = []
 28 |             
 29 |             if zone:
 30 |                 # List instances in the specified zone
 31 |                 request = compute_v1.ListInstancesRequest(
 32 |                     project=project_id,
 33 |                     zone=zone
 34 |                 )
 35 |                 instances = client.list(request=request)
 36 |                 
 37 |                 for instance in instances:
 38 |                     machine_type = instance.machine_type.split('/')[-1] if instance.machine_type else "Unknown"
 39 |                     status = instance.status
 40 |                     ext_ip = "None"
 41 |                     int_ip = "None"
 42 |                     
 43 |                     # Get IP addresses
 44 |                     if instance.network_interfaces:
 45 |                         int_ip = instance.network_interfaces[0].network_i_p
 46 |                         if instance.network_interfaces[0].access_configs:
 47 |                             ext_ip = instance.network_interfaces[0].access_configs[0].nat_i_p or "None"
 48 |                     
 49 |                     instances_list.append(f"- {instance.name} (Zone: {zone}, Type: {machine_type}, Internal IP: {int_ip}, External IP: {ext_ip}, Status: {status})")
 50 |             else:
 51 |                 # List instances in all zones
 52 |                 zones_client = compute_v1.ZonesClient()
 53 |                 zones_request = compute_v1.ListZonesRequest(project=project_id)
 54 |                 zones = zones_client.list(request=zones_request)
 55 |                 
 56 |                 for zone_item in zones:
 57 |                     zone_name = zone_item.name
 58 |                     request = compute_v1.ListInstancesRequest(
 59 |                         project=project_id,
 60 |                         zone=zone_name
 61 |                     )
 62 |                     try:
 63 |                         instances = client.list(request=request)
 64 |                         
 65 |                         for instance in instances:
 66 |                             machine_type = instance.machine_type.split('/')[-1] if instance.machine_type else "Unknown"
 67 |                             status = instance.status
 68 |                             ext_ip = "None"
 69 |                             int_ip = "None"
 70 |                             
 71 |                             # Get IP addresses
 72 |                             if instance.network_interfaces:
 73 |                                 int_ip = instance.network_interfaces[0].network_i_p
 74 |                                 if instance.network_interfaces[0].access_configs:
 75 |                                     ext_ip = instance.network_interfaces[0].access_configs[0].nat_i_p or "None"
 76 |                             
 77 |                             instances_list.append(f"- {instance.name} (Zone: {zone_name}, Type: {machine_type}, Internal IP: {int_ip}, External IP: {ext_ip}, Status: {status})")
 78 |                     except Exception:
 79 |                         # Skip zones where we can't list instances
 80 |                         continue
 81 |             
 82 |             if not instances_list:
 83 |                 zone_msg = f" in zone {zone}" if zone else ""
 84 |                 return f"No Compute Engine instances found{zone_msg} for project {project_id}."
 85 |             
 86 |             instances_str = "\n".join(instances_list)
 87 |             zone_msg = f" in zone {zone}" if zone else ""
 88 |             
 89 |             return f"""
 90 | Compute Engine Instances{zone_msg} in GCP Project {project_id}:
 91 | {instances_str}
 92 | """
 93 |         except Exception as e:
 94 |             return f"Error listing Compute Engine instances: {str(e)}"
 95 |     
 96 |     @mcp.tool()
 97 |     def get_instance_details(project_id: str, zone: str, instance_name: str) -> str:
 98 |         """
 99 |         Get detailed information about a specific Compute Engine instance.
100 |         
101 |         Args:
102 |             project_id: The ID of the GCP project
103 |             zone: The zone where the instance is located (e.g., "us-central1-a")
104 |             instance_name: The name of the instance to get details for
105 |         
106 |         Returns:
107 |             Detailed information about the specified Compute Engine instance
108 |         """
109 |         try:
110 |             from google.cloud import compute_v1
111 |             
112 |             # Initialize the Compute Engine client
113 |             client = compute_v1.InstancesClient()
114 |             
115 |             # Get the instance details
116 |             instance = client.get(project=project_id, zone=zone, instance=instance_name)
117 |             
118 |             # Format machine type
119 |             machine_type = instance.machine_type.split('/')[-1] if instance.machine_type else "Unknown"
120 |             
121 |             # Format creation timestamp
122 |             creation_timestamp = instance.creation_timestamp if instance.creation_timestamp else "Unknown"
123 |             
124 |             # Format boot disk
125 |             boot_disk = "None"
126 |             if instance.disks:
127 |                 for disk in instance.disks:
128 |                     if disk.boot:
129 |                         boot_disk = disk.source.split('/')[-1] if disk.source else "Unknown"
130 |                         break
131 |             
132 |             # Get IP addresses
133 |             network_interfaces = []
134 |             if instance.network_interfaces:
135 |                 for i, iface in enumerate(instance.network_interfaces):
136 |                     network = iface.network.split('/')[-1] if iface.network else "Unknown"
137 |                     subnetwork = iface.subnetwork.split('/')[-1] if iface.subnetwork else "Unknown"
138 |                     internal_ip = iface.network_i_p or "None"
139 |                     
140 |                     # Check for external IP
141 |                     external_ip = "None"
142 |                     if iface.access_configs:
143 |                         external_ip = iface.access_configs[0].nat_i_p or "None"
144 |                     
145 |                     network_interfaces.append(f"  Interface {i}:\n    Network: {network}\n    Subnetwork: {subnetwork}\n    Internal IP: {internal_ip}\n    External IP: {external_ip}")
146 |             
147 |             networks_str = "\n".join(network_interfaces) if network_interfaces else "  None"
148 |             
149 |             # Get attached disks
150 |             disks = []
151 |             if instance.disks:
152 |                 for i, disk in enumerate(instance.disks):
153 |                     disk_name = disk.source.split('/')[-1] if disk.source else "Unknown"
154 |                     disk_type = "Boot" if disk.boot else "Data"
155 |                     auto_delete = "Yes" if disk.auto_delete else "No"
156 |                     mode = disk.mode if disk.mode else "Unknown"
157 |                     
158 |                     disks.append(f"  Disk {i}:\n    Name: {disk_name}\n    Type: {disk_type}\n    Mode: {mode}\n    Auto-delete: {auto_delete}")
159 |             
160 |             disks_str = "\n".join(disks) if disks else "  None"
161 |             
162 |             # Get labels
163 |             labels = []
164 |             if instance.labels:
165 |                 for key, value in instance.labels.items():
166 |                     labels.append(f"  {key}: {value}")
167 |             
168 |             labels_str = "\n".join(labels) if labels else "  None"
169 |             
170 |             # Get metadata
171 |             metadata_items = []
172 |             if instance.metadata and instance.metadata.items:
173 |                 for item in instance.metadata.items:
174 |                     metadata_items.append(f"  {item.key}: {item.value}")
175 |             
176 |             metadata_str = "\n".join(metadata_items) if metadata_items else "  None"
177 |             
178 |             return f"""
179 | Compute Engine Instance Details for {instance_name}:
180 | 
181 | Project: {project_id}
182 | Zone: {zone}
183 | Machine Type: {machine_type}
184 | Status: {instance.status}
185 | Creation Time: {creation_timestamp}
186 | CPU Platform: {instance.cpu_platform}
187 | Boot Disk: {boot_disk}
188 | 
189 | Network Interfaces:
190 | {networks_str}
191 | 
192 | Disks:
193 | {disks_str}
194 | 
195 | Labels:
196 | {labels_str}
197 | 
198 | Metadata:
199 | {metadata_str}
200 | 
201 | Service Accounts: {"Yes" if instance.service_accounts else "None"}
202 | """
203 |         except Exception as e:
204 |             return f"Error getting instance details: {str(e)}"
205 |     
206 |     @mcp.tool()
207 |     def start_instance(project_id: str, zone: str, instance_name: str) -> str:
208 |         """
209 |         Start a Compute Engine instance.
210 |         
211 |         Args:
212 |             project_id: The ID of the GCP project
213 |             zone: The zone where the instance is located (e.g., "us-central1-a")
214 |             instance_name: The name of the instance to start
215 |         
216 |         Returns:
217 |             Status message indicating whether the instance was started successfully
218 |         """
219 |         try:
220 |             from google.cloud import compute_v1
221 |             
222 |             # Initialize the Compute Engine client
223 |             client = compute_v1.InstancesClient()
224 |             
225 |             # Start the instance
226 |             operation = client.start(project=project_id, zone=zone, instance=instance_name)
227 |             
228 |             # Wait for the operation to complete
229 |             operation_client = compute_v1.ZoneOperationsClient()
230 |             
231 |             # This is a synchronous call that will wait until the operation is complete
232 |             while operation.status != compute_v1.Operation.Status.DONE:
233 |                 operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1])
234 |                 import time
235 |                 time.sleep(1)
236 |             
237 |             if operation.error:
238 |                 return f"Error starting instance {instance_name}: {operation.error.errors[0].message}"
239 |             
240 |             return f"Instance {instance_name} in zone {zone} started successfully."
241 |         except Exception as e:
242 |             return f"Error starting instance: {str(e)}"
243 |     
244 |     @mcp.tool()
245 |     def stop_instance(project_id: str, zone: str, instance_name: str) -> str:
246 |         """
247 |         Stop a Compute Engine instance.
248 |         
249 |         Args:
250 |             project_id: The ID of the GCP project
251 |             zone: The zone where the instance is located (e.g., "us-central1-a")
252 |             instance_name: The name of the instance to stop
253 |         
254 |         Returns:
255 |             Status message indicating whether the instance was stopped successfully
256 |         """
257 |         try:
258 |             from google.cloud import compute_v1
259 |             
260 |             # Initialize the Compute Engine client
261 |             client = compute_v1.InstancesClient()
262 |             
263 |             # Stop the instance
264 |             operation = client.stop(project=project_id, zone=zone, instance=instance_name)
265 |             
266 |             # Wait for the operation to complete
267 |             operation_client = compute_v1.ZoneOperationsClient()
268 |             
269 |             # This is a synchronous call that will wait until the operation is complete
270 |             while operation.status != compute_v1.Operation.Status.DONE:
271 |                 operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1])
272 |                 import time
273 |                 time.sleep(1)
274 |             
275 |             if operation.error:
276 |                 return f"Error stopping instance {instance_name}: {operation.error.errors[0].message}"
277 |             
278 |             return f"Instance {instance_name} in zone {zone} stopped successfully."
279 |         except Exception as e:
280 |             return f"Error stopping instance: {str(e)}"
281 |     
282 |     @mcp.tool()
283 |     def list_machine_types(project_id: str, zone: str) -> str:
284 |         """
285 |         List available machine types in a specific zone.
286 |         
287 |         Args:
288 |             project_id: The ID of the GCP project
289 |             zone: The zone to check machine types in (e.g., "us-central1-a")
290 |         
291 |         Returns:
292 |             List of available machine types in the specified zone
293 |         """
294 |         try:
295 |             from google.cloud import compute_v1
296 |             
297 |             # Initialize the Machine Types client
298 |             client = compute_v1.MachineTypesClient()
299 |             
300 |             # List machine types
301 |             request = compute_v1.ListMachineTypesRequest(
302 |                 project=project_id,
303 |                 zone=zone
304 |             )
305 |             machine_types = client.list(request=request)
306 |             
307 |             # Format the response
308 |             types_list = []
309 |             
310 |             # Group by series
311 |             series = {}
312 |             for mt in machine_types:
313 |                 # Determine series (e.g., e2, n1, c2)
314 |                 name = mt.name
315 |                 series_name = "custom" if name.startswith("custom") else name.split("-")[0]
316 |                 
317 |                 if series_name not in series:
318 |                     series[series_name] = []
319 |                 
320 |                 # Format the machine type details
321 |                 vcpus = mt.guest_cpus
322 |                 memory_gb = mt.memory_mb / 1024  # Convert MB to GB
323 |                 
324 |                 series[series_name].append(f"    {name}: {vcpus} vCPUs, {memory_gb:.1f} GB RAM")
325 |             
326 |             # Create formatted output by series
327 |             for s_name in sorted(series.keys()):
328 |                 types_list.append(f"  {s_name} series:")
329 |                 types_list.extend(sorted(series[s_name]))
330 |             
331 |             if not types_list:
332 |                 return f"No machine types found in zone {zone} for project {project_id}."
333 |             
334 |             types_str = "\n".join(types_list)
335 |             
336 |             return f"""
337 | Available Machine Types in Zone {zone} for Project {project_id}:
338 | {types_str}
339 | """
340 |         except Exception as e:
341 |             return f"Error listing machine types: {str(e)}"
342 |     
343 |     @mcp.tool()
344 |     def list_disks(project_id: str, zone: str = "") -> str:
345 |         """
346 |         List Compute Engine persistent disks in a GCP project.
347 |         
348 |         Args:
349 |             project_id: The ID of the GCP project to list disks for
350 |             zone: Optional zone to filter disks (e.g., "us-central1-a")
351 |         
352 |         Returns:
353 |             List of persistent disks in the specified GCP project
354 |         """
355 |         try:
356 |             from google.cloud import compute_v1
357 |             
358 |             # Initialize the Disks client
359 |             client = compute_v1.DisksClient()
360 |             
361 |             disks_list = []
362 |             
363 |             if zone:
364 |                 # List disks in the specified zone
365 |                 request = compute_v1.ListDisksRequest(
366 |                     project=project_id,
367 |                     zone=zone
368 |                 )
369 |                 disks = client.list(request=request)
370 |                 
371 |                 for disk in disks:
372 |                     size_gb = disk.size_gb
373 |                     disk_type = disk.type.split('/')[-1] if disk.type else "Unknown"
374 |                     status = disk.status
375 |                     users = len(disk.users) if disk.users else 0
376 |                     users_str = f"Attached to {users} instance(s)" if users > 0 else "Not attached"
377 |                     
378 |                     disks_list.append(f"- {disk.name} (Zone: {zone}, Type: {disk_type}, Size: {size_gb} GB, Status: {status}, {users_str})")
379 |             else:
380 |                 # List disks in all zones
381 |                 zones_client = compute_v1.ZonesClient()
382 |                 zones_request = compute_v1.ListZonesRequest(project=project_id)
383 |                 zones = zones_client.list(request=zones_request)
384 |                 
385 |                 for zone_item in zones:
386 |                     zone_name = zone_item.name
387 |                     request = compute_v1.ListDisksRequest(
388 |                         project=project_id,
389 |                         zone=zone_name
390 |                     )
391 |                     try:
392 |                         disks = client.list(request=request)
393 |                         
394 |                         for disk in disks:
395 |                             size_gb = disk.size_gb
396 |                             disk_type = disk.type.split('/')[-1] if disk.type else "Unknown"
397 |                             status = disk.status
398 |                             users = len(disk.users) if disk.users else 0
399 |                             users_str = f"Attached to {users} instance(s)" if users > 0 else "Not attached"
400 |                             
401 |                             disks_list.append(f"- {disk.name} (Zone: {zone_name}, Type: {disk_type}, Size: {size_gb} GB, Status: {status}, {users_str})")
402 |                     except Exception:
403 |                         # Skip zones where we can't list disks
404 |                         continue
405 |             
406 |             if not disks_list:
407 |                 zone_msg = f" in zone {zone}" if zone else ""
408 |                 return f"No persistent disks found{zone_msg} for project {project_id}."
409 |             
410 |             disks_str = "\n".join(disks_list)
411 |             zone_msg = f" in zone {zone}" if zone else ""
412 |             
413 |             return f"""
414 | Persistent Disks{zone_msg} in GCP Project {project_id}:
415 | {disks_str}
416 | """
417 |         except Exception as e:
418 |             return f"Error listing persistent disks: {str(e)}"
419 |     
420 |     @mcp.tool()
421 |     def create_instance(project_id: str, zone: str, instance_name: str, machine_type: str, 
422 |                       source_image: str, boot_disk_size_gb: int = 10, 
423 |                       network: str = "default", subnet: str = "", 
424 |                       external_ip: bool = True) -> str:
425 |         """
426 |         Create a new Compute Engine instance.
427 |         
428 |         Args:
429 |             project_id: The ID of the GCP project
430 |             zone: The zone to create the instance in (e.g., "us-central1-a")
431 |             instance_name: The name for the new instance
432 |             machine_type: The machine type (e.g., "e2-medium")
433 |             source_image: The source image for the boot disk (e.g., "projects/debian-cloud/global/images/family/debian-11")
434 |             boot_disk_size_gb: The size of the boot disk in GB (default: 10)
435 |             network: The network to connect to (default: "default")
436 |             subnet: The subnetwork to connect to (optional)
437 |             external_ip: Whether to allocate an external IP (default: True)
438 |         
439 |         Returns:
440 |             Status message indicating whether the instance was created successfully
441 |         """
442 |         try:
443 |             from google.cloud import compute_v1
444 |             
445 |             # Initialize the clients
446 |             instances_client = compute_v1.InstancesClient()
447 |             
448 |             # Format the machine type
449 |             machine_type_url = f"projects/{project_id}/zones/{zone}/machineTypes/{machine_type}"
450 |             
451 |             # Create the disk configuration
452 |             boot_disk = compute_v1.AttachedDisk()
453 |             boot_disk.boot = True
454 |             initialize_params = compute_v1.AttachedDiskInitializeParams()
455 |             initialize_params.source_image = source_image
456 |             initialize_params.disk_size_gb = boot_disk_size_gb
457 |             boot_disk.initialize_params = initialize_params
458 |             boot_disk.auto_delete = True
459 |             
460 |             # Create the network configuration
461 |             network_interface = compute_v1.NetworkInterface()
462 |             if network.startswith("projects/"):
463 |                 network_interface.network = network
464 |             else:
465 |                 network_interface.network = f"projects/{project_id}/global/networks/{network}"
466 |             
467 |             if subnet:
468 |                 if subnet.startswith("projects/"):
469 |                     network_interface.subnetwork = subnet
470 |                 else:
471 |                     network_interface.subnetwork = f"projects/{project_id}/regions/{zone.rsplit('-', 1)[0]}/subnetworks/{subnet}"
472 |             
473 |             if external_ip:
474 |                 access_config = compute_v1.AccessConfig()
475 |                 access_config.name = "External NAT"
476 |                 access_config.type_ = "ONE_TO_ONE_NAT"
477 |                 access_config.network_tier = "PREMIUM"
478 |                 network_interface.access_configs = [access_config]
479 |             
480 |             # Create the instance
481 |             instance = compute_v1.Instance()
482 |             instance.name = instance_name
483 |             instance.machine_type = machine_type_url
484 |             instance.disks = [boot_disk]
485 |             instance.network_interfaces = [network_interface]
486 |             
487 |             # Create a default service account for the instance
488 |             service_account = compute_v1.ServiceAccount()
489 |             service_account.email = "default"
490 |             service_account.scopes = ["https://www.googleapis.com/auth/cloud-platform"]
491 |             instance.service_accounts = [service_account]
492 |             
493 |             # Create the instance
494 |             operation = instances_client.insert(
495 |                 project=project_id,
496 |                 zone=zone,
497 |                 instance_resource=instance
498 |             )
499 |             
500 |             # Wait for the create operation to complete
501 |             operation_client = compute_v1.ZoneOperationsClient()
502 |             
503 |             # This is a synchronous call that will wait until the operation is complete
504 |             while operation.status != compute_v1.Operation.Status.DONE:
505 |                 operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1])
506 |                 import time
507 |                 time.sleep(1)
508 |             
509 |             if operation.error:
510 |                 return f"Error creating instance {instance_name}: {operation.error.errors[0].message}"
511 |             
512 |             # Get the created instance to return its details
513 |             created_instance = instances_client.get(project=project_id, zone=zone, instance=instance_name)
514 |             
515 |             # Get the instance IP addresses
516 |             internal_ip = "None"
517 |             external_ip = "None"
518 |             
519 |             if created_instance.network_interfaces:
520 |                 internal_ip = created_instance.network_interfaces[0].network_i_p or "None"
521 |                 if created_instance.network_interfaces[0].access_configs:
522 |                     external_ip = created_instance.network_interfaces[0].access_configs[0].nat_i_p or "None"
523 |             
524 |             return f"""
525 | Instance {instance_name} created successfully in zone {zone}.
526 | 
527 | Details:
528 | - Machine Type: {machine_type}
529 | - Internal IP: {internal_ip}
530 | - External IP: {external_ip}
531 | - Status: {created_instance.status}
532 | """
533 |         except Exception as e:
534 |             return f"Error creating instance: {str(e)}"
535 |     
536 |     @mcp.tool()
537 |     def delete_instance(project_id: str, zone: str, instance_name: str) -> str:
538 |         """
539 |         Delete a Compute Engine instance.
540 |         
541 |         Args:
542 |             project_id: The ID of the GCP project
543 |             zone: The zone where the instance is located (e.g., "us-central1-a")
544 |             instance_name: The name of the instance to delete
545 |         
546 |         Returns:
547 |             Status message indicating whether the instance was deleted successfully
548 |         """
549 |         try:
550 |             from google.cloud import compute_v1
551 |             
552 |             # Initialize the Compute Engine client
553 |             client = compute_v1.InstancesClient()
554 |             
555 |             # Delete the instance
556 |             operation = client.delete(project=project_id, zone=zone, instance=instance_name)
557 |             
558 |             # Wait for the operation to complete
559 |             operation_client = compute_v1.ZoneOperationsClient()
560 |             
561 |             # This is a synchronous call that will wait until the operation is complete
562 |             while operation.status != compute_v1.Operation.Status.DONE:
563 |                 operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1])
564 |                 import time
565 |                 time.sleep(1)
566 |             
567 |             if operation.error:
568 |                 return f"Error deleting instance {instance_name}: {operation.error.errors[0].message}"
569 |             
570 |             return f"Instance {instance_name} in zone {zone} deleted successfully."
571 |         except Exception as e:
572 |             return f"Error deleting instance: {str(e)}"
573 |     
574 |     @mcp.tool()
575 |     def create_snapshot(project_id: str, zone: str, disk_name: str, snapshot_name: str, description: str = "") -> str:
576 |         """
577 |         Create a snapshot of a Compute Engine disk.
578 |         
579 |         Args:
580 |             project_id: The ID of the GCP project
581 |             zone: The zone where the disk is located (e.g., "us-central1-a")
582 |             disk_name: The name of the disk to snapshot
583 |             snapshot_name: The name for the new snapshot
584 |             description: Optional description for the snapshot
585 |         
586 |         Returns:
587 |             Status message indicating whether the snapshot was created successfully
588 |         """
589 |         try:
590 |             from google.cloud import compute_v1
591 |             
592 |             # Initialize the Disks client
593 |             disks_client = compute_v1.DisksClient()
594 |             
595 |             # Create the snapshot request
596 |             snapshot = compute_v1.Snapshot()
597 |             snapshot.name = snapshot_name
598 |             if description:
599 |                 snapshot.description = description
600 |             
601 |             # Create the snapshot
602 |             operation = disks_client.create_snapshot(
603 |                 project=project_id,
604 |                 zone=zone,
605 |                 disk=disk_name,
606 |                 snapshot_resource=snapshot
607 |             )
608 |             
609 |             # Wait for the operation to complete
610 |             operation_client = compute_v1.ZoneOperationsClient()
611 |             
612 |             # This is a synchronous call that will wait until the operation is complete
613 |             while operation.status != compute_v1.Operation.Status.DONE:
614 |                 operation = operation_client.get(project=project_id, zone=zone, operation=operation.name.split('/')[-1])
615 |                 import time
616 |                 time.sleep(1)
617 |             
618 |             if operation.error:
619 |                 return f"Error creating snapshot {snapshot_name}: {operation.error.errors[0].message}"
620 |             
621 |             return f"Snapshot {snapshot_name} of disk {disk_name} in zone {zone} created successfully."
622 |         except Exception as e:
623 |             return f"Error creating snapshot: {str(e)}"
624 |     
625 |     @mcp.tool()
626 |     def list_snapshots(project_id: str) -> str:
627 |         """
628 |         List disk snapshots in a GCP project.
629 |         
630 |         Args:
631 |             project_id: The ID of the GCP project to list snapshots for
632 |         
633 |         Returns:
634 |             List of disk snapshots in the specified GCP project
635 |         """
636 |         try:
637 |             from google.cloud import compute_v1
638 |             
639 |             # Initialize the Snapshots client
640 |             client = compute_v1.SnapshotsClient()
641 |             
642 |             # List snapshots
643 |             request = compute_v1.ListSnapshotsRequest(project=project_id)
644 |             snapshots = client.list(request=request)
645 |             
646 |             # Format the response
647 |             snapshots_list = []
648 |             for snapshot in snapshots:
649 |                 size_gb = snapshot.disk_size_gb
650 |                 status = snapshot.status
651 |                 source_disk = snapshot.source_disk.split('/')[-1] if snapshot.source_disk else "Unknown"
652 |                 creation_time = snapshot.creation_timestamp if snapshot.creation_timestamp else "Unknown"
653 |                 
654 |                 snapshots_list.append(f"- {snapshot.name} (Source: {source_disk}, Size: {size_gb} GB, Status: {status}, Created: {creation_time})")
655 |             
656 |             if not snapshots_list:
657 |                 return f"No snapshots found for project {project_id}."
658 |             
659 |             snapshots_str = "\n".join(snapshots_list)
660 |             
661 |             return f"""
662 | Disk Snapshots in GCP Project {project_id}:
663 | {snapshots_str}
664 | """
665 |         except Exception as e:
666 |             return f"Error listing snapshots: {str(e)}"
```