#
tokens: 43589/50000 29/30 files (page 1/2)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 1 of 2. Use http://codebase.md/enesbol/gcp-mcp?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .gitignore
├── .python-version
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── requirements.txt
├── smithery.yaml
├── src
│   └── gcp-mcp-server
│       ├── __init__.py
│       ├── core
│       │   ├── __init__.py
│       │   ├── auth.py
│       │   ├── context.py
│       │   ├── logging_handler.py
│       │   ├── security.py
│       │   └── server.py
│       ├── main.py
│       ├── prompts
│       │   ├── __init__.py
│       │   └── common.py
│       └── services
│           ├── __init__.py
│           ├── artifact_registry.py
│           ├── client_instances.py
│           ├── cloud_audit_logs.py
│           ├── cloud_bigquery.py
│           ├── cloud_build.py
│           ├── cloud_compute_engine.py
│           ├── cloud_monitoring.py
│           ├── cloud_run.py
│           ├── cloud_storage.py
│           └── README.md
├── utils
│   ├── __init__.py
│   └── helpers.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
1 | 3.13
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
 1 | # Python-generated files
 2 | __pycache__/
 3 | *.py[oc]
 4 | build/
 5 | dist/
 6 | wheels/
 7 | *.egg-info
 8 | 
 9 | # Virtual environments
10 | .venv
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | # GCP Services MCP Implementation
  2 | 
  3 | This repository contains Model Context Protocol (MCP) implementations for various Google Cloud Platform (GCP) services. These modules expose GCP resources, tools, and prompts in a standardized way for AI assistants to interact with GCP infrastructure.
  4 | 
  5 | ## Overview
  6 | 
  7 | The MCP framework provides a consistent way to expose cloud resources to AI assistants. Each service module follows a common pattern:
  8 | 
  9 | 1. A `register` function that takes an MCP instance as a parameter
 10 | 2. Resources for retrieving information about GCP resources
 11 | 3. Tools for performing operations on GCP resources
 12 | 4. Prompts for guiding users through common tasks
 13 | 
 14 | ## Implementation Pattern
 15 | 
 16 | All service modules follow a consistent implementation pattern:
 17 | 
 18 | ```python
 19 | import json
 20 | from services import client_instances
 21 | from google.cloud import service_client
 22 | 
 23 | def register(mcp_instance):
 24 |     """Register all resources and tools with the MCP instance."""
 25 |     
 26 |     # Resources
 27 |     @mcp_instance.resource("gcp://service/{project_id}/resources")
 28 |     def list_resources(project_id: str = None) -> str:
 29 |         # Implement resource listing
 30 |         pass
 31 |         
 32 |     # Tools
 33 |     @mcp_instance.tool()
 34 |     def create_resource(resource_name: str, project_id: str = None) -> str:
 35 |         # Implement resource creation
 36 |         pass
 37 |         
 38 |     # Prompts
 39 |     @mcp_instance.prompt()
 40 |     def configuration_help() -> str:
 41 |         # Return helpful prompt text
 42 |         return """Help text here"""
 43 | ```
 44 | 
 45 | ## Key Design Principles
 46 | 
 47 | ### 1. Client Management
 48 | 
 49 | Clients are accessed through a central `client_instances` module:
 50 | 
 51 | ```python
 52 | client = client_instances.get_clients().service_name
 53 | ```
 54 | 
 55 | This approach:
 56 | - Centralizes client creation and authentication
 57 | - Avoids duplicating client instantiation logic
 58 | - Enables consistent credential management
 59 | 
 60 | ### 2. Parameter Defaults
 61 | 
 62 | Required parameters come first, followed by optional parameters with sensible defaults:
 63 | 
 64 | ```python
 65 | def create_resource(
 66 |     resource_name: str,     # Required parameter first
 67 |     project_id: str = None, # Optional parameters with defaults
 68 |     location: str = None
 69 | ) -> str:
 70 |     # Use defaults from client_instances when not provided
 71 |     project_id = project_id or client_instances.get_project_id()
 72 |     location = location or client_instances.get_location()
 73 | ```
 74 | 
 75 | ### 3. Error Handling
 76 | 
 77 | All functions include consistent error handling:
 78 | 
 79 | ```python
 80 | try:
 81 |     # Implementation code
 82 |     return json.dumps(result, indent=2)
 83 | except Exception as e:
 84 |     return json.dumps({"error": str(e)}, indent=2)
 85 | ```
 86 | 
 87 | ### 4. JSON Responses
 88 | 
 89 | All responses are consistently formatted JSON strings with proper indentation:
 90 | 
 91 | ```python
 92 | return json.dumps(
 93 |     {
 94 |         "status": "success",
 95 |         "resources": result,
 96 |         "count": len(result)
 97 |     },
 98 |     indent=2
 99 | )
100 | ```
101 | 
102 | ### 5. Defensive Coding
103 | 
104 | All implementations use defensive coding practices to handle potentially missing or null values:
105 | 
106 | ```python
107 | "labels": dict(resource.labels) if resource.labels else {}
108 | ```
109 | 
110 | ## Available Services
111 | 
112 | The following GCP services are implemented:
113 | 
114 | 1. **BigQuery** - Data warehouse operations
115 | 2. **Cloud Storage** - Object storage operations 
116 | 3. **Cloud Run** - Serverless container management
117 | 4. **Cloud Build** - CI/CD pipeline management
118 | 5. **Artifact Registry** - Container and package registry
119 | 6. **Compute Engine** - VM instance management
120 | 7. **Cloud Monitoring** - Metrics and alerting
121 | 8. **Cloud Audit Logs** - Logging and auditing
122 | 
123 | ## Service-Specific Features
124 | 
125 | ### BigQuery
126 | 
127 | - Dataset and table management
128 | - Query execution
129 | - Data import/export
130 | 
131 | ### Cloud Storage
132 | 
133 | - Bucket management
134 | - Object operations (upload, download, delete)
135 | - Bucket configuration
136 | 
137 | ### Cloud Run
138 | 
139 | - Service deployment
140 | - Service configuration
141 | - Traffic management
142 | 
143 | ### Cloud Build
144 | 
145 | - Trigger management
146 | - Build execution
147 | - Build monitoring
148 | 
149 | ### Artifact Registry
150 | 
151 | - Repository management
152 | - Package operations
153 | 
154 | ### Compute Engine
155 | 
156 | - Instance lifecycle management
157 | - Instance configuration
158 | 
159 | ### Cloud Monitoring
160 | 
161 | - Metric exploration
162 | - Alert policy management
163 | - Notification channel configuration
164 | 
165 | ### Cloud Audit Logs
166 | 
167 | - Log querying
168 | - Log sink management
169 | 
170 | ## Usage Example
171 | 
172 | To register all services with an MCP instance:
173 | 
174 | ```python
175 | from mcp.server.fastmcp import FastMCP
176 | from services import (
177 |     bigquery,
178 |     storage,
179 |     cloudrun,
180 |     cloudbuild,
181 |     artifactregistry,
182 |     compute,
183 |     monitoring,
184 |     auditlogs
185 | )
186 | 
187 | # Create MCP instance
188 | mcp = FastMCP("GCP Services")
189 | 
190 | # Register all services
191 | bigquery.register(mcp)
192 | storage.register(mcp)
193 | cloudrun.register(mcp)
194 | cloudbuild.register(mcp)
195 | artifactregistry.register(mcp)
196 | compute.register(mcp)
197 | monitoring.register(mcp)
198 | auditlogs.register(mcp)
199 | ```
200 | 
201 | ## Important Notes
202 | 
203 | 1. **Default Project and Location**:
204 |    - Services default to the project and location configured in `client_instances`
205 |    - Always allow these to be overridden by parameters
206 | 
207 | 2. **Authentication**:
208 |    - Authentication is handled by the `client_instances` module
209 |    - No credentials should be hardcoded in service implementations
210 | 
211 | 3. **Error Handling**:
212 |    - All operations should include robust error handling
213 |    - Error responses should be informative but not expose sensitive information
214 | 
215 | 4. **Response Formatting**:
216 |    - All responses should be valid JSON with proper indentation
217 |    - Success responses should include a "status": "success" field
218 |    - Error responses should include a "status": "error" field with a "message"
219 | 
220 | ## Best Practices for MCP Integration
221 | 
222 | 1. **Resource Naming**:
223 |    - Use consistent URI patterns for resources (gcp://service/{project_id}/resource)
224 |    - Group related resources logically
225 | 
226 | 2. **Tool Design**:
227 |    - Design tools around specific user intents
228 |    - Order parameters with required ones first, followed by optional ones
229 |    - Provide sensible defaults for optional parameters
230 | 
231 | 3. **Prompt Design**:
232 |    - Create prompts for common user scenarios
233 |    - Include enough context for the AI to provide helpful responses
234 |    - Structure prompts as guiding questions or templates
235 | 
236 | 4. **Documentation**:
237 |    - Include descriptive docstrings for all resources, tools, and prompts
238 |    - Document all parameters and return values
239 |    - Include usage examples where appropriate
240 | 
241 | ## Extending the Services
242 | 
243 | To add a new GCP service:
244 | 
245 | 1. Create a new module following the pattern above
246 | 2. Implement resources for retrieving information
247 | 3. Implement tools for operations
248 | 4. Create prompts for common user tasks
249 | 5. Register the new service in your MCP initialization
250 | 
251 | ## Troubleshooting
252 | 
253 | If you encounter issues:
254 | 
255 | 1. Check client initialization in `client_instances`
256 | 2. Verify required permissions for the service operations
257 | 3. Look for additional error details in the returned JSON
258 | 4. Add print statements for debugging complex operations
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | # This is not a Ready MCP Server
  2 | 
  3 | 
  4 | 
  5 | # GCP MCP Server
  6 | 
  7 | A comprehensive Model Context Protocol (MCP) server implementation for Google Cloud Platform (GCP) services, enabling AI assistants to interact with and manage GCP resources through a standardized interface.
  8 | 
  9 | ## Overview
 10 | 
 11 | GCP MCP Server provides AI assistants with capabilities to:
 12 | 
 13 | - **Query GCP Resources**: Get information about your cloud infrastructure
 14 | - **Manage Cloud Resources**: Create, configure, and manage GCP services
 15 | - **Receive Assistance**: Get AI-guided help with GCP configurations and best practices
 16 | 
 17 | The implementation follows the MCP specification to enable AI systems to interact with GCP services in a secure, controlled manner.
 18 | 
 19 | ## Supported GCP Services
 20 | 
 21 | This implementation includes support for the following GCP services:
 22 | 
 23 | - **Artifact Registry**: Container and package management
 24 | - **BigQuery**: Data warehousing and analytics
 25 | - **Cloud Audit Logs**: Logging and audit trail analysis
 26 | - **Cloud Build**: CI/CD pipeline management
 27 | - **Cloud Compute Engine**: Virtual machine instances
 28 | - **Cloud Monitoring**: Metrics, alerting, and dashboards
 29 | - **Cloud Run**: Serverless container deployments
 30 | - **Cloud Storage**: Object storage management
 31 | 
 32 | ## Architecture
 33 | 
 34 | The project is structured as follows:
 35 | 
 36 | ```
 37 | gcp-mcp-server/
 38 | ├── core/            # Core MCP server functionality auth context logging_handler security 
 39 | ├── prompts/         # AI assistant prompts for GCP operations
 40 | ├── services/        # GCP service implementations
 41 | │   ├── README.md    # Service implementation details
 42 | │   └── ...          # Individual service modules
 43 | ├── main.py          # Main server entry point
 44 | └── ...
 45 | ```
 46 | 
 47 | Key components:
 48 | 
 49 | - **Service Modules**: Each GCP service has its own module with resources, tools, and prompts
 50 | - **Client Instances**: Centralized client management for authentication and resource access
 51 | - **Core Components**: Base functionality for the MCP server implementation
 52 | 
 53 | ## Getting Started
 54 | 
 55 | ### Prerequisites
 56 | 
 57 | - Python 3.10+
 58 | - GCP project with enabled APIs for the services you want to use
 59 | - Authenticated GCP credentials (Application Default Credentials recommended)
 60 | 
 61 | ### Installation
 62 | 
 63 | 1. Clone the repository:
 64 |    ```bash
 65 |    git clone https://github.com/yourusername/gcp-mcp-server.git
 66 |    cd gcp-mcp-server
 67 |    ```
 68 | 
 69 | 2. Set up a virtual environment:
 70 |    ```bash
 71 |    python -m venv venv
 72 |    source venv/bin/activate  # On Windows: venv\Scripts\activate
 73 |    ```
 74 | 
 75 | 3. Install dependencies:
 76 |    ```bash
 77 |    pip install -r requirements.txt
 78 |    ```
 79 | 
 80 | 4. Configure your GCP credentials:
 81 |    ```bash
 82 |    # Using gcloud
 83 |    gcloud auth application-default login
 84 |    
 85 |    # Or set GOOGLE_APPLICATION_CREDENTIALS
 86 |    export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"
 87 |    ```
 88 | 
 89 | 5. Set up environment variables:
 90 |    ```bash
 91 |    cp .env.example .env
 92 |    # Edit .env with your configuration
 93 |    ```
 94 | 
 95 | ### Running the Server
 96 | 
 97 | Start the MCP server:
 98 | 
 99 | ```bash
100 | python main.py
101 | ```
102 | 
103 | For development and testing:
104 | 
105 | ```bash
106 | # Development mode with auto-reload
107 | python main.py --dev
108 | 
109 | # Run with specific configuration
110 | python main.py --config config.yaml
111 | ```
112 | 
113 | ## Docker Deployment
114 | 
115 | Build and run with Docker:
116 | 
117 | ```bash
118 | # Build the image
119 | docker build -t gcp-mcp-server .
120 | 
121 | # Run the container
122 | docker run -p 8080:8080 -v ~/.config/gcloud:/root/.config/gcloud gcp-mcp-server
123 | ```
124 | 
125 | ## Configuration
126 | 
127 | The server can be configured through environment variables or a configuration file:
128 | 
129 | | Environment Variable | Description | Default |
130 | |----------------------|-------------|---------|
131 | | `GCP_PROJECT_ID` | Default GCP project ID | None (required) |
132 | | `GCP_DEFAULT_LOCATION` | Default region/zone | `us-central1` |
133 | | `MCP_SERVER_PORT` | Server port | `8080` |
134 | | `LOG_LEVEL` | Logging level | `INFO` |
135 | 
136 | See `.env.example` for a complete list of configuration options.
137 |  
138 | ## Development
139 | 
140 | ### Adding a New GCP Service
141 | 
142 | 1. Create a new file in the `services/` directory
143 | 2. Implement the service following the pattern in existing services
144 | 3. Register the service in `main.py`
145 | 
146 | See the [services README](services/README.md) for detailed implementation guidance.
147 |  
148 | 
149 | ## Security Considerations
150 | 
151 | - The server uses Application Default Credentials for authentication
152 | - Authorization is determined by the permissions of the authenticated identity
153 | - No credentials are hardcoded in the service implementations
154 | - Consider running with a service account with appropriate permissions
155 | 
156 | ## Contributing
157 | 
158 | Contributions are welcome! Please feel free to submit a Pull Request.
159 | 
160 | 1. Fork the repository
161 | 2. Create your feature branch (`git checkout -b feature/amazing-feature`)
162 | 3. Commit your changes (`git commit -m 'Add some amazing feature'`)
163 | 4. Push to the branch (`git push origin feature/amazing-feature`)
164 | 5. Open a Pull Request
165 | 
166 | ## License
167 | 
168 | This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
169 | 
170 | ## Acknowledgments
171 | 
172 | - Google Cloud Platform team for their comprehensive APIs
173 | - Model Context Protocol for providing a standardized way for AI to interact with services
174 | 
175 | ### Using the Server
176 | 
177 | To use this server:
178 | 
179 | 1. Place your GCP service account key file as `service-account.json` in the same directory
180 | 2. Install the MCP package: `pip install "mcp[cli]"`
181 | 3. Install the required GCP package: `pip install google-cloud-run`
182 | 4. Run: `mcp dev gcp_cloudrun_server.py`
183 | 
184 | Or install it in Claude Desktop:
185 | ```
186 | mcp install gcp_cloudrun_server.py --name "GCP Cloud Run Manager"
187 | ```
188 | 
189 | 
190 | ## MCP Server Configuration
191 | 
192 | The following configuration can be added to your configuration file for GCP Cloud Tools:
193 | 
194 | ```json
195 | "mcpServers": {
196 |   "GCP Cloud Tools": {
197 |     "command": "uv",
198 |     "args": [
199 |       "run",
200 |       "--with",
201 |       "google-cloud-artifact-registry>=1.10.0",
202 |       "--with",
203 |       "google-cloud-bigquery>=3.27.0",
204 |       "--with",
205 |       "google-cloud-build>=3.0.0",
206 |       "--with",
207 |       "google-cloud-compute>=1.0.0",
208 |       "--with",
209 |       "google-cloud-logging>=3.5.0",
210 |       "--with",
211 |       "google-cloud-monitoring>=2.0.0",
212 |       "--with",
213 |       "google-cloud-run>=0.9.0",
214 |       "--with",
215 |       "google-cloud-storage>=2.10.0",
216 |       "--with",
217 |       "mcp[cli]",
218 |       "--with",
219 |       "python-dotenv>=1.0.0",
220 |       "mcp",
221 |       "run",
222 |       "C:\\Users\\enes_\\Desktop\\mcp-repo-final\\gcp-mcp\\src\\gcp-mcp-server\\main.py"
223 |     ],
224 |     "env": {
225 |       "GOOGLE_APPLICATION_CREDENTIALS": "C:/Users/enes_/Desktop/mcp-repo-final/gcp-mcp/service-account.json",
226 |       "GCP_PROJECT_ID": "gcp-mcp-cloud-project",
227 |       "GCP_LOCATION": "us-east1"
228 |     }
229 |   }
230 | }
231 | ```
232 | 
233 | ### Configuration Details
234 | 
235 | This configuration sets up an MCP server for Google Cloud Platform tools with the following:
236 | 
237 | - **Command**: Uses `uv` package manager to run the server
238 | - **Dependencies**: Includes various Google Cloud libraries (Artifact Registry, BigQuery, Cloud Build, etc.)
239 | - **Environment Variables**:
240 |   - `GOOGLE_APPLICATION_CREDENTIALS`: Path to your GCP service account credentials
241 |   - `GCP_PROJECT_ID`: Your Google Cloud project ID
242 |   - `GCP_LOCATION`: GCP region (us-east1)
243 | 
244 | ### Usage
245 | 
246 | Add this configuration to your MCP configuration file to enable GCP Cloud Tools functionality.
247 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/prompts/common.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/__init__.py:
--------------------------------------------------------------------------------

```python
1 | """Core package for MCP server components."""
2 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/__init__.py:
--------------------------------------------------------------------------------

```python
1 | """GCP MCP server package."""
2 | 
3 | __version__ = "0.1.0"
4 | 
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
 1 | mcp>=1.0.0
 2 | google-cloud-bigquery
 3 | google-cloud-storage
 4 | google-cloud-run
 5 | google-cloud-artifact-registry
 6 | google-cloud-logging
 7 | python-dotenv
 8 | google-cloud-monitoring
 9 | google-cloud-compute
10 | google-cloud-build
11 | 
```

--------------------------------------------------------------------------------
/utils/helpers.py:
--------------------------------------------------------------------------------

```python
1 | """Common utility functions."""
2 | 
3 | 
4 | def format_resource_name(project_id: str, resource_name: str) -> str:
5 |     """Format a resource name with project ID."""
6 |     return f"projects/{project_id}/resources/{resource_name}"
7 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/security.py:
--------------------------------------------------------------------------------

```python
 1 | import re
 2 | 
 3 | 
 4 | class DataSanitizer:
 5 |     PATTERNS = [
 6 |         r"(?i)(token|key|secret|password)",
 7 |         r"\b\d{4}-\d{4}-\d{4}-\d{4}\b",  # CC-like numbers
 8 |     ]
 9 | 
10 |     @classmethod
11 |     def sanitize(cls, text: str) -> str:
12 |         for pattern in cls.PATTERNS:
13 |             text = re.sub(pattern, "[REDACTED]", text)
14 |         return text
15 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/__init__.py:
--------------------------------------------------------------------------------

```python
 1 | """
 2 | GCP MCP Server services package.
 3 | This package contains service modules that register tools and resources with the MCP server.
 4 | """
 5 | 
 6 | # The following allows these modules to be imported directly from the services package
 7 | from . import (
 8 |     artifact_registry,
 9 |     client_instances,
10 |     cloud_audit_logs,
11 |     cloud_bigquery,
12 |     cloud_build,
13 |     cloud_compute_engine,
14 |     cloud_monitoring,
15 |     cloud_run,
16 |     cloud_storage,
17 | )
18 | 
```

--------------------------------------------------------------------------------
/utils/__init__.py:
--------------------------------------------------------------------------------

```python
 1 | """Utility functions for MCP server."""
 2 | 
 3 | import json
 4 | from typing import Any
 5 | 
 6 | 
 7 | def format_json_response(data: Any) -> str:
 8 |     """Format data as a JSON string with consistent styling."""
 9 |     return json.dumps({"status": "success", "data": data}, indent=2)
10 | 
11 | 
12 | def format_error_response(message: str) -> str:
13 |     """Format error message as a JSON string."""
14 |     return json.dumps({"status": "error", "message": message}, indent=2)
15 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/logging_handler.py:
--------------------------------------------------------------------------------

```python
 1 | import logging
 2 | 
 3 | 
 4 | class MCPLogger:
 5 |     """Simple logger for MCP server"""
 6 | 
 7 |     def __init__(self, name):
 8 |         self.logger = logging.getLogger(f"mcp.{name}")
 9 | 
10 |     def info(self, message):
11 |         self.logger.info(message)
12 | 
13 |     def warning(self, message):
14 |         self.logger.warning(message)
15 | 
16 |     def error(self, message):
17 |         self.logger.error(message)
18 | 
19 |     def critical(self, message):
20 |         self.logger.critical(message)
21 | 
22 |     def debug(self, message):
23 |         self.logger.debug(message)
24 | 
25 |     def audit_log(self, action, resource, details=None):
26 |         """Simple audit logging"""
27 |         self.logger.info(f"AUDIT: {action} on {resource}")
28 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/prompts/__init__.py:
--------------------------------------------------------------------------------

```python
 1 | """Common prompts for GCP operations."""
 2 | 
 3 | from ..core import mcp
 4 | 
 5 | 
 6 | @mcp.prompt()
 7 | def gcp_service_help(service_name: str) -> str:
 8 |     """Get help for using a GCP service."""
 9 |     return f"""
10 |     I need help with using {service_name} in Google Cloud Platform.
11 |     
12 |     Please help me understand:
13 |     1. Common operations and best practices
14 |     2. Required parameters and configuration
15 |     3. Security considerations
16 |     4. Recommended patterns for {service_name}
17 |     """
18 | 
19 | 
20 | @mcp.prompt()
21 | def error_analysis(error_message: str) -> str:
22 |     """Analyze a GCP error message."""
23 |     return f"""
24 |     I received this error from GCP:
25 |     {error_message}
26 |     
27 |     Please help me:
28 |     1. Understand what caused this error
29 |     2. Find potential solutions
30 |     3. Prevent similar errors in the future
31 |     """
32 | 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [project]
 2 | name = "gcp-mcp-server"
 3 | version = "0.1.0"
 4 | description = "A Model Context Protocol server that provides access to Google Cloud Platform services, enabling LLMs to manage and interact with GCP resources."
 5 | readme = "README.md"
 6 | requires-python = ">=3.13"
 7 | 
 8 | dependencies = [
 9 |     "mcp[cli]>=1.0.0",
10 |     "google-cloud-bigquery",
11 |     "google-cloud-storage",
12 |     "google-cloud-run",
13 |     "google-cloud-artifact-registry",
14 |     "google-cloud-logging",
15 |     "python-dotenv",
16 |     "google-cloud-monitoring",
17 |     "google-cloud-compute",
18 |     "google-cloud-build",
19 | ]
20 | [[project.authors]]
21 | name = "Enes Bol"
22 | email = "[email protected]"
23 | 
24 | [build-system]
25 | requires = ["hatchling"]
26 | build-backend = "hatchling.build"
27 | 
28 | [tool.hatch.build.targets.wheel]
29 | packages = ["src/gcp_mcp"]
30 | 
31 | [project.scripts]
32 | gcp-mcp-server = "gcp_mcp.main:main"
33 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/client_instances.py:
--------------------------------------------------------------------------------

```python
 1 | import logging
 2 | 
 3 | from core.context import GCPClients
 4 | 
 5 | # Set up logging
 6 | logger = logging.getLogger(__name__)
 7 | 
 8 | # Global client instance
 9 | _gcp_clients = None
10 | _project_id = None
11 | _location = None
12 | 
13 | 
14 | def initialize_clients(credentials=None):
15 |     """Initialize GCP clients with credentials."""
16 |     global _gcp_clients, _project_id, _location
17 | 
18 |     try:
19 |         # Initialize GCP clients
20 |         _gcp_clients = GCPClients(credentials)
21 |         _project_id = _gcp_clients.project_id
22 |         _location = _gcp_clients.location
23 |         logger.info(f"GCP clients initialized with project: {_project_id}")
24 |         return True
25 |     except Exception as e:
26 |         logger.error(f"Failed to initialize GCP clients: {str(e)}")
27 |         return False
28 | 
29 | 
30 | def get_clients():
31 |     """Get the initialized GCP clients."""
32 |     if _gcp_clients is None:
33 |         logger.warning("GCP clients not initialized. Attempting to initialize...")
34 |         initialize_clients()
35 |     return _gcp_clients
36 | 
37 | 
38 | def get_project_id():
39 |     """Get the GCP project ID."""
40 |     return _project_id
41 | 
42 | 
43 | def get_location():
44 |     """Get the GCP location."""
45 |     return _location
46 | 
```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
 1 | # Smithery.ai configuration
 2 | startCommand:
 3 |   type: stdio
 4 |   configSchema:
 5 |     type: object
 6 |     properties:
 7 |       project_id:
 8 |         type: string
 9 |         description: "Google Cloud Project ID"
10 |       region:
11 |         type: string
12 |         description: "Default GCP region"
13 |         default: "us-central1"
14 |       timeout:
15 |         type: integer
16 |         description: "Default timeout for operations in seconds"
17 |         default: 300
18 |       service_account_json:
19 |         type: string
20 |         description: "GCP Service Account key JSON (as string)"
21 |       service_account_path:
22 |         type: string
23 |         description: "Path to GCP Service Account key file"
24 |     required:
25 |       - project_id
26 |   commandFunction: |-
27 |     (config) => ({
28 |       "command": "python",
29 |       "args": [
30 |         "main.py"
31 |       ],
32 |       "env": {
33 |         "GCP_PROJECT_ID": config.project_id,
34 |         "GCP_REGION": config.region || "us-central1",
35 |         "GCP_TIMEOUT": String(config.timeout || 300),
36 |         "GCP_SERVICE_ACCOUNT_JSON": config.service_account_json || "",
37 |         "GCP_SERVICE_ACCOUNT_KEY_PATH": config.service_account_path || ""
38 |       }
39 |     })
```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
 1 | # Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
 2 | # Use a Python image with uv pre-installed
 3 | FROM ghcr.io/astral-sh/uv:python3.13-bookworm-slim AS uv
 4 | 
 5 | # Set working directory
 6 | WORKDIR /app
 7 | 
 8 | # Enable bytecode compilation
 9 | ENV UV_COMPILE_BYTECODE=1
10 | 
11 | # Copy pyproject.toml and lock file for dependencies
12 | COPY pyproject.toml uv.lock ./
13 | 
14 | # Install the project's dependencies
15 | RUN --mount=type=cache,target=/root/.cache/uv \
16 |     uv sync --frozen --no-install-project --no-dev --no-editable
17 | 
18 | # Add the rest of the project source code and install it
19 | ADD src /app/src
20 | 
21 | # Sync and install the project
22 | RUN --mount=type=cache,target=/root/.cache/uv \
23 |     uv sync --frozen --no-dev --no-editable
24 | 
25 | FROM python:3.13-slim-bookworm
26 | 
27 | # Set working directory
28 | WORKDIR /app
29 | 
30 | # Copy virtual environment from the builder
31 | COPY --from=uv /root/.local /root/.local
32 | COPY --from=uv --chown=app:app /app/.venv /app/.venv
33 | 
34 | # Place executables in the environment at the front of the path
35 | ENV PATH="/app/.venv/bin:$PATH"
36 | 
37 | # Define the entry point
38 | ENTRYPOINT ["gcp-mcp-server"]
39 | 
40 | # Example command
41 | # CMD ["--project", "your-gcp-project-id", "--location", "your-gcp-location"]
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/server.py:
--------------------------------------------------------------------------------

```python
  1 | # import logging
  2 | # import os
  3 | # from contextlib import asynccontextmanager
  4 | # from typing import Any, AsyncIterator, Dict
  5 | 
  6 | # from auth import get_credentials
  7 | # from mcp.server.fastmcp import FastMCP
  8 | 
  9 | # from ..services import bigquery, compute, iam, storage
 10 | 
 11 | # logging.basicConfig(level=logging.INFO)
 12 | # logger = logging.getLogger(__name__)
 13 | 
 14 | 
 15 | # @asynccontextmanager
 16 | # async def gcp_lifespan(server) -> AsyncIterator[Dict[str, Any]]:
 17 | #     """Set up GCP context with credentials."""
 18 | #     logger.info("Initializing GCP MCP server...")
 19 | #     try:
 20 | #         # Get GCP credentials
 21 | #         credentials = get_credentials()
 22 | #         project_id = os.environ.get("GCP_PROJECT_ID")
 23 | 
 24 | #         if not project_id:
 25 | #             logger.warning(
 26 | #                 "GCP_PROJECT_ID not set in environment. Some features may not work correctly."
 27 | #             )
 28 | 
 29 | #         logger.info(f"Server initialized with project: {project_id or 'Not set'}")
 30 | 
 31 | #         # Yield context to be used by handlers
 32 | #         yield {"credentials": credentials, "project_id": project_id}
 33 | #     except Exception as e:
 34 | #         logger.error(f"Failed to initialize GCP context: {str(e)}")
 35 | #         raise
 36 | #     finally:
 37 | #         logger.info("Shutting down GCP MCP server...")
 38 | 
 39 | 
 40 | # # Create main server FIRST
 41 | # mcp = FastMCP(
 42 | #     "GCP Manager",
 43 | #     description="Manage Google Cloud Platform Resources",
 44 | #     lifespan=gcp_lifespan,
 45 | # )
 46 | 
 47 | # # Register all services
 48 | # compute.register(mcp)
 49 | # storage.register(mcp)
 50 | # bigquery.register(mcp)
 51 | # iam.register(mcp)
 52 | 
 53 | 
 54 | # # THEN define resources and tools
 55 | # @mcp.resource("gcp://project")
 56 | # def get_project_info():
 57 | #     """Get information about the current GCP project"""
 58 | #     project_id = os.environ.get("GCP_PROJECT_ID")
 59 | #     return f"Project ID: {project_id}"
 60 | 
 61 | 
 62 | # @mcp.resource("gcp://storage/buckets")
 63 | # def list_buckets():
 64 | #     """List GCP storage buckets"""
 65 | #     from google.cloud import storage
 66 | 
 67 | #     client = storage.Client()
 68 | #     buckets = list(client.list_buckets(max_results=10))
 69 | #     return "\n".join([f"- {bucket.name}" for bucket in buckets])
 70 | 
 71 | 
 72 | # @mcp.resource("test://hello")
 73 | # def hello_resource():
 74 | #     """A simple test resource"""
 75 | #     return "Hello World"
 76 | 
 77 | 
 78 | # @mcp.tool()
 79 | # def list_gcp_instances(region: str = "us-central1") -> str:
 80 | #     """List GCP compute instances in a region"""
 81 | #     # Use your credentials to list instances
 82 | #     return f"Instances in {region}: [instance list would go here]"
 83 | 
 84 | 
 85 | # @mcp.tool()
 86 | # def test_gcp_auth() -> str:
 87 | #     """Test GCP authentication"""
 88 | #     try:
 89 | #         from google.cloud import storage
 90 | 
 91 | #         client = storage.Client()
 92 | #         buckets = list(client.list_buckets(max_results=5))
 93 | #         return f"Authentication successful. Found {len(buckets)} buckets."
 94 | #     except Exception as e:
 95 | #         return f"Authentication failed: {str(e)}"
 96 | 
 97 | 
 98 | # if __name__ == "__main__":
 99 | #     mcp.run()
100 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/auth.py:
--------------------------------------------------------------------------------

```python
  1 | import json
  2 | import os
  3 | 
  4 | import google.auth
  5 | from google.auth.exceptions import DefaultCredentialsError
  6 | from google.auth.transport.requests import Request
  7 | from google.oauth2 import service_account
  8 | 
  9 | from .logging_handler import MCPLogger
 10 | 
 11 | # Define required scopes for GCP APIs
 12 | REQUIRED_SCOPES = [
 13 |     "https://www.googleapis.com/auth/cloud-platform",
 14 | ]
 15 | 
 16 | logger = MCPLogger("auth")
 17 | 
 18 | 
 19 | def get_credentials():
 20 |     """
 21 |     Get Google Cloud credentials from environment.
 22 |     Attempts to load credentials in the following order:
 23 |     1. From GOOGLE_APPLICATION_CREDENTIALS environment variable
 24 |     2. From GCP_SERVICE_ACCOUNT_JSON environment variable containing JSON
 25 |     3. Default application credentials
 26 |     """
 27 |     try:
 28 |         # Check for credentials file path
 29 |         creds_file = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
 30 |         if creds_file and os.path.exists(creds_file):
 31 |             logger.audit_log(
 32 |                 action="credential_load",
 33 |                 resource="service_account",
 34 |                 details={"method": "file", "file": creds_file},
 35 |             )
 36 |             logger.info(f"Loading credentials from file: {creds_file}")
 37 |             credentials = service_account.Credentials.from_service_account_file(
 38 |                 creds_file, scopes=REQUIRED_SCOPES
 39 |             )
 40 |             return validate_credentials(credentials)
 41 | 
 42 |         # Check for service account JSON in environment
 43 |         sa_json = os.environ.get("GCP_SERVICE_ACCOUNT_JSON")
 44 |         if sa_json:
 45 |             try:
 46 |                 service_account_info = json.loads(sa_json)
 47 |                 logger.info(
 48 |                     "Loading credentials from GCP_SERVICE_ACCOUNT_JSON environment variable"
 49 |                 )
 50 |                 credentials = service_account.Credentials.from_service_account_info(
 51 |                     service_account_info, scopes=REQUIRED_SCOPES
 52 |                 )
 53 |                 logger.audit_log(
 54 |                     action="credential_load",
 55 |                     resource="service_account",
 56 |                     details={"method": "environment_json"},
 57 |                 )
 58 |                 return validate_credentials(credentials)
 59 |             except json.JSONDecodeError as e:
 60 |                 logger.error(f"Failed to parse GCP_SERVICE_ACCOUNT_JSON: {str(e)}")
 61 | 
 62 |         # Fall back to default credentials
 63 |         try:
 64 |             logger.audit_log(
 65 |                 action="credential_load",
 66 |                 resource="application_default",
 67 |                 details={"method": "default"},
 68 |             )
 69 |             logger.info("Loading default application credentials")
 70 |             credentials, project_id = google.auth.default(scopes=REQUIRED_SCOPES)
 71 |             if project_id and not os.environ.get("GCP_PROJECT_ID"):
 72 |                 # Set project ID from default credentials if not already set
 73 |                 os.environ["GCP_PROJECT_ID"] = project_id
 74 |                 logger.info(f"Using project ID from default credentials: {project_id}")
 75 |             return validate_credentials(credentials)
 76 |         except DefaultCredentialsError as e:
 77 |             logger.error(f"Failed to load GCP credentials: {str(e)}")
 78 |             raise AuthenticationError("Failed to obtain valid credentials")
 79 | 
 80 |     except Exception as e:
 81 |         logger.critical(f"Authentication failure: {str(e)}")
 82 |         raise AuthenticationError(f"Failed to obtain valid credentials: {str(e)}")
 83 | 
 84 | 
 85 | def validate_credentials(credentials):
 86 |     """Validate credential permissions and expiration"""
 87 |     # Some credentials don't have a valid attribute or may not need refresh
 88 |     if hasattr(credentials, "valid") and not credentials.valid:
 89 |         credentials.refresh(Request())
 90 | 
 91 |     # Check expiration
 92 |     if hasattr(credentials, "expiry"):
 93 |         if hasattr(credentials, "expired") and credentials.expired:
 94 |             try:
 95 |                 credentials.refresh(Request())
 96 |             except Exception as e:
 97 |                 raise AuthenticationError(
 98 |                     f"Failed to refresh expired credentials: {str(e)}"
 99 |                 )
100 | 
101 |     return credentials
102 | 
103 | 
104 | class AuthenticationError(Exception):
105 |     """Custom exception for authentication failures"""
106 | 
107 |     pass
108 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/main.py:
--------------------------------------------------------------------------------

```python
  1 | import os
  2 | 
  3 | # At the top of main.py, add these imports and logging statements
  4 | import sys
  5 | 
  6 | # Get the directory containing main.py
  7 | current_dir = os.path.dirname(os.path.abspath(__file__))
  8 | # Add it to Python's module search path
  9 | sys.path.append(current_dir)
 10 | # Also add the parent directory if needed
 11 | parent_dir = os.path.dirname(current_dir)
 12 | sys.path.append(parent_dir)
 13 | 
 14 | 
 15 | import inspect
 16 | import logging
 17 | from contextlib import asynccontextmanager
 18 | from typing import Any, AsyncIterator, Dict
 19 | 
 20 | # Import the GCP Clients and Context
 21 | from mcp.server.fastmcp import FastMCP
 22 | from services import client_instances
 23 | 
 24 | # Set up logging
 25 | logging.basicConfig(level=logging.INFO)
 26 | logger = logging.getLogger(__name__)
 27 | 
 28 | # After adding paths, print them to see what's happening
 29 | print(f"Current working directory: {os.getcwd()}")
 30 | print(f"Updated Python path: {sys.path}")
 31 | 
 32 | 
 33 | @asynccontextmanager
 34 | async def gcp_lifespan(server) -> AsyncIterator[Dict[str, Any]]:
 35 |     """Set up GCP context with credentials."""
 36 |     logger.info("Initializing GCP MCP server...")
 37 |     try:
 38 |         # Initialize GCP credentials
 39 |         credentials = None
 40 |         # Check for service account key file path
 41 |         sa_path = os.environ.get("GCP_SERVICE_ACCOUNT_KEY_PATH")
 42 |         if sa_path and os.path.exists(sa_path):
 43 |             logger.info(f"Using service account key from: {sa_path}")
 44 | 
 45 |         # Initialize GCP clients using the client_instances module
 46 |         client_instances.initialize_clients(credentials)
 47 |         clients = client_instances.get_clients()
 48 |         project_id = client_instances.get_project_id()
 49 |         location = client_instances.get_location()
 50 | 
 51 |         logger.info(f"Server initialized with project: {project_id}")
 52 |         # Yield context with clients and credentials
 53 |         yield {
 54 |             "clients": clients,
 55 |             "project_id": project_id,
 56 |             "credentials": credentials,
 57 |             "location": location,
 58 |         }
 59 |     except Exception as e:
 60 |         logger.error(f"Failed to initialize GCP context: {str(e)}")
 61 |         raise
 62 |     finally:
 63 |         logger.info("Shutting down GCP MCP server...")
 64 | 
 65 | 
 66 | # Create the global MCP instance
 67 | mcp = FastMCP(
 68 |     "GCP Manager",
 69 |     description="Manage Google Cloud Platform Resources",
 70 |     lifespan=gcp_lifespan,
 71 |     dependencies=[
 72 |         "mcp>=1.0.0",
 73 |         "google-cloud-bigquery",
 74 |         "google-cloud-storage",
 75 |         "google-cloud-run",
 76 |         "google-cloud-artifact-registry",
 77 |         "google-cloud-logging",
 78 |         "python-dotenv",
 79 |         "google-cloud-monitoring",
 80 |         "google-cloud-compute",
 81 |         "google-cloud-build",
 82 |     ],
 83 | )
 84 | 
 85 | 
 86 | # Basic resources and tools
 87 | @mcp.resource("test://hello")
 88 | def hello_resource():
 89 |     """A simple test resource"""
 90 |     return "Hello World"
 91 | 
 92 | 
 93 | @mcp.tool()
 94 | def test_gcp_auth() -> str:
 95 |     """Test GCP authentication"""
 96 |     try:
 97 |         # Get clients from client_instances module instead of context
 98 |         clients = client_instances.get_clients()
 99 | 
100 |         # Test if we can list storage buckets
101 |         if hasattr(clients, "storage"):
102 |             try:
103 |                 buckets = list(clients.storage.list_buckets(max_results=5))
104 |                 return f"Authentication successful. Found {len(buckets)} buckets. {buckets}"
105 |             except Exception as e:
106 |                 return f"Storage authentication failed: {str(e)}"
107 |     except Exception as e:
108 |         return f"Authentication failed: {str(e)}"
109 | 
110 | 
111 | # # Function to register services
112 | # def register_services():
113 | #     """Register all service modules with the MCP instance."""
114 | #     services_dir = os.path.join(os.path.dirname(__file__), "services")
115 | #     logger.info(f"Loading services from {services_dir}")
116 | #     if not os.path.exists(services_dir):
117 | #         logger.warning(f"Services directory not found: {services_dir}")
118 | #         return
119 | #     # Get all Python files in the services directory
120 | #     for filename in os.listdir(services_dir):
121 | #         if (
122 | #             filename.endswith(".py")
123 | #             and filename != "__init__.py"
124 | #             and filename != "client_instances.py"
125 | #         ):
126 | #             module_name = filename[:-3]  # Remove .py extension
127 | #             try:
128 | #                 # Load the module using importlib
129 | #                 module_path = os.path.join(services_dir, filename)
130 | #                 spec = importlib.util.spec_from_file_location(
131 | #                     f"services.{module_name}", module_path
132 | #                 )
133 | #                 module = importlib.util.module_from_spec(spec)
134 | #                 # Inject mcp instance into the module
135 | #                 module.mcp = mcp
136 | #                 # Execute the module
137 | #                 spec.loader.exec_module(module)
138 | #                 logger.info(f"Loaded service module: {module_name}")
139 | #                 # If the module has a register function, call it
140 | #                 if hasattr(module, "register"):
141 | #                     # Pass the mcp instance and the server's request_context to register
142 | #                     module.register(mcp)
143 | #                     logger.info(f"Registered service: {module_name}")
144 | #             except Exception as e:
145 | #                 logger.error(f"Error loading service {module_name}: {e}")
146 | 
147 | 
148 | def register_services():
149 |     """Register all service modules with the MCP instance."""
150 |     logger.info("Explicitly registering service modules")
151 | 
152 |     # Print diagnostic information
153 |     logger.info(f"Python sys.path: {sys.path}")
154 |     logger.info(f"Current working directory: {os.getcwd()}")
155 |     logger.info(f"Script location: {os.path.abspath(__file__)}")
156 |     logger.info(f"Parent directory: {os.path.dirname(os.path.abspath(__file__))}")
157 | 
158 |     try:
159 |         # Try importing a service module to check if imports work
160 |         logger.info("Attempting to import artifact_registry")
161 |         import services.artifact_registry as artifact_registry
162 | 
163 |         logger.info(f"Module location: {inspect.getfile(artifact_registry)}")
164 |         from services import (
165 |             artifact_registry,
166 |             cloud_audit_logs,
167 |             cloud_bigquery,
168 |             cloud_build,
169 |             cloud_compute_engine,
170 |             cloud_monitoring,
171 |             cloud_run,
172 |             cloud_storage,
173 |         )
174 | 
175 |         # Register each service module
176 |         artifact_registry.register(mcp)
177 |         cloud_audit_logs.register(mcp)
178 |         cloud_bigquery.register(mcp)
179 |         cloud_build.register(mcp)
180 |         cloud_compute_engine.register(mcp)
181 |         cloud_monitoring.register(mcp)
182 |         cloud_run.register(mcp)
183 |         cloud_storage.register(mcp)
184 | 
185 |         logger.info("All service modules registered successfully")
186 |     except Exception as e:
187 |         logger.error(f"Error registering service modules: {e}")
188 |         # Add detailed error logging
189 |         import traceback
190 | 
191 |         logger.error(traceback.format_exc())
192 | 
193 | 
194 | if __name__ == "__main__":
195 |     logger.info("Starting GCP MCP server")
196 |     # Register services
197 |     register_services()
198 |     # Run the server
199 |     mcp.run()
200 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/context.py:
--------------------------------------------------------------------------------

```python
  1 | # context.py
  2 | import json
  3 | import os
  4 | from typing import Any, Optional, Type
  5 | 
  6 | import google.auth
  7 | 
  8 | # Import other services as needed
  9 | from google.cloud import (
 10 |     artifactregistry_v1,
 11 |     bigquery,
 12 |     compute_v1,
 13 |     monitoring_v3,
 14 |     storage,
 15 | )
 16 | from google.oauth2 import service_account
 17 | 
 18 | 
 19 | class GCPClients:
 20 |     """Client manager for GCP services"""
 21 | 
 22 |     def __init__(self, credentials=None):
 23 |         self.credentials = self._get_credentials(credentials)
 24 |         self.project_id = self._get_project_id()
 25 |         self.location = self._get_location()
 26 |         self._clients = {}
 27 | 
 28 |         # Initialize clients
 29 |         self._storage_client = None
 30 |         self._bigquery_client = None
 31 |         self._run_client = None
 32 |         self._logging_client = None
 33 |         self._monitoring_client = None
 34 |         self._compute_client = None
 35 |         self._sql_client = None
 36 |         self._cloudbuild_client = None
 37 |         self._artifactregistry_client = None
 38 | 
 39 |     def _get_credentials(self, credentials=None):
 40 |         """Get credentials from various sources"""
 41 |         # If credentials are directly provided
 42 |         if credentials:
 43 |             return credentials
 44 | 
 45 |         # Check for service account JSON in environment variable
 46 |         sa_json = os.environ.get("GCP_SERVICE_ACCOUNT_JSON")
 47 |         if sa_json:
 48 |             try:
 49 |                 sa_info = json.loads(sa_json)
 50 |                 return service_account.Credentials.from_service_account_info(sa_info)
 51 |             except Exception as e:
 52 |                 print(f"Error loading service account JSON: {e}")
 53 | 
 54 |         # Check for service account key file path
 55 |         sa_path = os.environ.get("GCP_SERVICE_ACCOUNT_KEY_PATH")
 56 |         if sa_path:
 57 |             try:
 58 |                 return service_account.Credentials.from_service_account_file(sa_path)
 59 |             except Exception as e:
 60 |                 print(f"Error loading service account key file: {e}")
 61 | 
 62 |         # Fall back to application default credentials
 63 |         try:
 64 |             credentials, project = google.auth.default()
 65 |             return credentials
 66 |         except Exception as e:
 67 |             print(f"Error getting default credentials: {e}")
 68 |             raise RuntimeError("No valid GCP credentials found")
 69 | 
 70 |     def _get_project_id(self) -> str:
 71 |         """Get project ID from environment or credentials"""
 72 |         project_id = os.environ.get("GCP_PROJECT_ID")
 73 |         if project_id:
 74 |             return project_id
 75 | 
 76 |         if hasattr(self.credentials, "project_id"):
 77 |             return self.credentials.project_id
 78 | 
 79 |         try:
 80 |             _, project_id = google.auth.default()
 81 |             return project_id
 82 |         except Exception:
 83 |             raise RuntimeError("Unable to determine GCP project ID")
 84 | 
 85 |     def _get_location(self) -> str:
 86 |         """Get default location/region from environment"""
 87 |         return os.environ.get("GCP_LOCATION", "us-central1")
 88 | 
 89 |     # def _init_client(
 90 |     #     self, client_class: Type, current_client: Optional[Any], **kwargs
 91 |     # ) -> Any:
 92 |     #     """Helper method to initialize clients with error handling"""
 93 |     #     if not current_client:
 94 |     #         try:
 95 |     #             return client_class(
 96 |     #                 project=self.project_id,  # <-- This adds project automatically
 97 |     #                 credentials=self.credentials,  # <-- This adds credentials automatically
 98 |     #                 **kwargs,  # <-- This adds any extra params (like database)
 99 |     #             )
100 |     #         except Exception as e:
101 |     #             raise RuntimeError(
102 |     #                 f"Failed to initialize {client_class.__name__}: {str(e)}"
103 |     #             )
104 |     #     return current_client
105 | 
106 |     def _init_client(
107 |         self, client_class: Type, current_client: Optional[Any], **kwargs
108 |     ) -> Any:
109 |         """Helper method to initialize clients with error handling"""
110 |         if not current_client:
111 |             try:
112 |                 # Check if this client accepts project parameter
113 |                 if client_class in [
114 |                     storage.Client
115 |                 ]:  # Add other clients that accept project
116 |                     kwargs["project"] = self.project_id
117 | 
118 |                 return client_class(credentials=self.credentials, **kwargs)
119 |             except Exception as e:
120 |                 raise RuntimeError(
121 |                     f"Failed to initialize {client_class.__name__}: {str(e)}"
122 |                 )
123 |         return current_client
124 | 
125 |     @property
126 |     def storage(self) -> storage.Client:
127 |         self._storage_client = self._init_client(storage.Client, self._storage_client)
128 |         return self._storage_client
129 | 
130 |     @property
131 |     def bigquery(self) -> bigquery.Client:
132 |         self._bigquery_client = self._init_client(
133 |             bigquery.Client, self._bigquery_client
134 |         )
135 |         return self._bigquery_client
136 | 
137 |     @property
138 |     def artifactregistry(self) -> artifactregistry_v1.ArtifactRegistryClient:
139 |         """Get the Artifact Registry client."""
140 |         if not self._artifactregistry_client:
141 |             try:
142 |                 # ArtifactRegistryClient doesn't accept project parameter
143 |                 self._artifactregistry_client = (
144 |                     artifactregistry_v1.ArtifactRegistryClient(
145 |                         credentials=self.credentials
146 |                     )
147 |                 )
148 |             except Exception as e:
149 |                 raise RuntimeError(
150 |                     f"Failed to initialize ArtifactRegistryClient: {str(e)}"
151 |                 )
152 |         return self._artifactregistry_client
153 | 
154 |     # Uncomment and implement other client properties as needed
155 |     # @property
156 |     # def storage(self) -> storage.Client:
157 |     #     self._storage_client = self._init_client(storage.Client, self._storage_client)
158 |     #     return self._storage_client
159 | 
160 |     def close_all(self):
161 |         """Close all open clients"""
162 |         for client in self._clients.values():
163 |             if hasattr(client, "transport") and hasattr(client.transport, "close"):
164 |                 client.transport.close()
165 |             elif hasattr(client, "close"):
166 |                 client.close()
167 |         self._clients.clear()
168 | 
169 | 
170 | # Update Context class to handle credentials properly
171 | class Context:
172 |     def __init__(self, request_context):
173 |         self.request_context = request_context
174 |         # Get credentials from lifespan context
175 |         credentials = request_context.lifespan_context.get("credentials")
176 |         # Initialize GCPClients with those credentials
177 |         self.clients = GCPClients(credentials)
178 | 
179 |     def close(self):
180 |         """Clean up when request ends"""
181 |         if hasattr(self, "clients"):
182 |             self.clients.close_all()
183 | 
184 |     # @property
185 |     # def run(self) -> run_v2.CloudRunClient:
186 |     #     self._run_client = self._init_client(run_v2.CloudRunClient, self._run_client)
187 |     #     return self._run_client
188 | 
189 |     # @property
190 |     # def logging(self) -> logging_v2.LoggingServiceV2Client:
191 |     #     self._logging_client = self._init_client(
192 |     #         logging_v2.LoggingServiceV2Client, self._logging_client
193 |     #     )
194 |     #     return self._logging_client
195 | 
196 |     @property
197 |     def monitoring(self) -> monitoring_v3.MetricServiceClient:
198 |         self._monitoring_client = self._init_client(
199 |             monitoring_v3.MetricServiceClient, self._monitoring_client
200 |         )
201 |         return self._monitoring_client
202 | 
203 |     @property
204 |     def compute(self) -> compute_v1.InstancesClient:
205 |         self._compute_client = self._init_client(
206 |             compute_v1.InstancesClient, self._compute_client
207 |         )
208 |         return self._compute_client
209 | 
210 |     # @property
211 |     # def sql(self) -> sql_v1.InstancesClient:
212 |     #     self._sql_client = self._init_client(sql_v1.InstancesClient, self._sql_client)
213 |     #     return self._sql_client
214 | 
215 |     # @property
216 |     # def cloudbuild(self) -> cloudbuild_v1.CloudBuildClient:
217 |     #     self._cloudbuild_client = self._init_client(
218 |     #         cloudbuild_v1.CloudBuildClient, self._cloudbuild_client
219 |     #     )
220 |     #     return self._cloudbuild_client
221 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_build.py:
--------------------------------------------------------------------------------

```python
  1 | import json
  2 | from typing import Dict, Optional
  3 | 
  4 | from services import client_instances
  5 | 
  6 | 
  7 | def register(mcp_instance):
  8 |     """Register all Cloud Build resources and tools with the MCP instance."""
  9 | 
 10 |     # Resources
 11 |     @mcp_instance.resource("gcp://cloudbuild/{project_id}/builds")
 12 |     def list_builds_resource(project_id: str = None) -> str:
 13 |         """List all Cloud Build executions for a project"""
 14 |         try:
 15 |             # Get client from client_instances
 16 |             client = client_instances.get_clients().cloudbuild
 17 |             project_id = project_id or client_instances.get_project_id()
 18 | 
 19 |             builds = client.list_builds(project_id=project_id)
 20 |             result = []
 21 |             for build in builds:
 22 |                 result.append(
 23 |                     {
 24 |                         "id": build.id,
 25 |                         "status": build.status.name if build.status else None,
 26 |                         "source": build.source.repo_source.commit_sha
 27 |                         if build.source and build.source.repo_source
 28 |                         else None,
 29 |                         "create_time": build.create_time.isoformat()
 30 |                         if build.create_time
 31 |                         else None,
 32 |                         "logs_url": build.logs_url,
 33 |                         "substitutions": dict(build.substitutions)
 34 |                         if build.substitutions
 35 |                         else {},
 36 |                     }
 37 |                 )
 38 |             return json.dumps(result, indent=2)
 39 |         except Exception as e:
 40 |             return json.dumps({"error": str(e)}, indent=2)
 41 | 
 42 |     @mcp_instance.resource("gcp://cloudbuild/{project_id}/triggers")
 43 |     def list_triggers_resource(project_id: str = None) -> str:
 44 |         """List all Cloud Build triggers for a project"""
 45 |         try:
 46 |             # Get client from client_instances
 47 |             client = client_instances.get_clients().cloudbuild
 48 |             project_id = project_id or client_instances.get_project_id()
 49 | 
 50 |             triggers = client.list_triggers(project_id=project_id)
 51 |             result = []
 52 |             for trigger in triggers:
 53 |                 result.append(
 54 |                     {
 55 |                         "id": trigger.id,
 56 |                         "name": trigger.name,
 57 |                         "description": trigger.description,
 58 |                         "trigger_template": {
 59 |                             "repo_name": trigger.trigger_template.repo_name,
 60 |                             "branch_name": trigger.trigger_template.branch_name,
 61 |                         }
 62 |                         if trigger.trigger_template
 63 |                         else None,
 64 |                     }
 65 |                 )
 66 |             return json.dumps(result, indent=2)
 67 |         except Exception as e:
 68 |             return json.dumps({"error": str(e)}, indent=2)
 69 | 
 70 |     # Tools
 71 |     @mcp_instance.tool()
 72 |     def trigger_build(
 73 |         build_config: Dict,
 74 |         project_id: str = None,
 75 |     ) -> str:
 76 |         """
 77 |         Trigger a new Cloud Build execution
 78 | 
 79 |         Args:
 80 |             build_config: Dictionary containing the build configuration
 81 |             project_id: GCP project ID (defaults to configured project)
 82 |         """
 83 |         try:
 84 |             # Get client from client_instances
 85 |             client = client_instances.get_clients().cloudbuild
 86 |             project_id = project_id or client_instances.get_project_id()
 87 | 
 88 |             # Convert config dict to Build object
 89 |             build = cloudbuild_v1.Build.from_json(json.dumps(build_config))
 90 | 
 91 |             print(f"Triggering build in project {project_id}...")
 92 |             operation = client.create_build(project_id=project_id, build=build)
 93 |             response = operation.result()
 94 | 
 95 |             return json.dumps(
 96 |                 {
 97 |                     "status": "success",
 98 |                     "build_id": response.id,
 99 |                     "status": response.status.name if response.status else None,
100 |                     "logs_url": response.logs_url,
101 |                     "build_name": response.name,
102 |                 },
103 |                 indent=2,
104 |             )
105 |         except Exception as e:
106 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
107 | 
108 |     @mcp_instance.tool()
109 |     def list_builds(
110 |         project_id: str = None,
111 |         filter: Optional[str] = None,
112 |         page_size: Optional[int] = 100,
113 |     ) -> str:
114 |         """
115 |         List Cloud Build executions with optional filtering
116 | 
117 |         Args:
118 |             project_id: GCP project ID (defaults to configured project)
119 |             filter: Filter expression for builds
120 |             page_size: Maximum number of results to return
121 |         """
122 |         try:
123 |             # Get client from client_instances
124 |             client = client_instances.get_clients().cloudbuild
125 |             project_id = project_id or client_instances.get_project_id()
126 | 
127 |             request = cloudbuild_v1.ListBuildsRequest(
128 |                 project_id=project_id, filter=filter, page_size=page_size
129 |             )
130 | 
131 |             builds = []
132 |             for build in client.list_builds(request=request):
133 |                 builds.append(
134 |                     {
135 |                         "id": build.id,
136 |                         "status": build.status.name if build.status else None,
137 |                         "create_time": build.create_time.isoformat()
138 |                         if build.create_time
139 |                         else None,
140 |                         "source": build.source.repo_source.commit_sha
141 |                         if build.source and build.source.repo_source
142 |                         else None,
143 |                         "logs_url": build.logs_url,
144 |                     }
145 |                 )
146 | 
147 |             return json.dumps(
148 |                 {"status": "success", "builds": builds, "count": len(builds)}, indent=2
149 |             )
150 |         except Exception as e:
151 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
152 | 
153 |     @mcp_instance.tool()
154 |     def create_build_trigger(
155 |         trigger_config: Dict,
156 |         project_id: str = None,
157 |     ) -> str:
158 |         """
159 |         Create a new Cloud Build trigger
160 | 
161 |         Args:
162 |             trigger_config: Dictionary containing the trigger configuration
163 |             project_id: GCP project ID (defaults to configured project)
164 |         """
165 |         try:
166 |             # Get client from client_instances
167 |             client = client_instances.get_clients().cloudbuild
168 |             project_id = project_id or client_instances.get_project_id()
169 | 
170 |             # Convert config dict to Trigger object
171 |             trigger = cloudbuild_v1.BuildTrigger.from_json(json.dumps(trigger_config))
172 | 
173 |             print(f"Creating trigger in project {project_id}...")
174 |             response = client.create_trigger(project_id=project_id, trigger=trigger)
175 | 
176 |             return json.dumps(
177 |                 {
178 |                     "status": "success",
179 |                     "trigger_id": response.id,
180 |                     "name": response.name,
181 |                     "description": response.description,
182 |                     "create_time": response.create_time.isoformat()
183 |                     if response.create_time
184 |                     else None,
185 |                 },
186 |                 indent=2,
187 |             )
188 |         except Exception as e:
189 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
190 | 
191 |     # Prompts
192 |     @mcp_instance.prompt()
193 |     def build_config_prompt(service_type: str = "docker") -> str:
194 |         """Prompt for creating a Cloud Build configuration"""
195 |         return f"""
196 | I need to create a Cloud Build configuration for {service_type} deployments. Please help with:
197 | 
198 | 1. Recommended build steps for {service_type}
199 | 2. Proper use of substitutions
200 | 3. Caching strategies
201 | 4. Security best practices
202 | 5. Integration with other GCP services
203 | """
204 | 
205 |     @mcp_instance.prompt()
206 |     def trigger_setup_prompt(repo_type: str = "github") -> str:
207 |         """Prompt for configuring build triggers"""
208 |         return f"""
209 | I want to set up a {repo_type} trigger for Cloud Build. Please explain:
210 | 
211 | 1. Required permissions and connections
212 | 2. Event types (push, PR, etc.)
213 | 3. File pattern matching
214 | 4. Branch/tag filtering
215 | 5. Approval workflows
216 | """
217 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_compute_engine.py:
--------------------------------------------------------------------------------

```python
  1 | import json
  2 | from typing import Optional
  3 | 
  4 | from services import client_instances
  5 | 
  6 | 
  7 | def register(mcp_instance):
  8 |     """Register all Compute Engine resources and tools with the MCP instance."""
  9 | 
 10 |     # Resources
 11 |     @mcp_instance.resource("gcp://compute/{project_id}/{zone}/instances")
 12 |     def list_instances_resource(project_id: str = None, zone: str = None) -> str:
 13 |         """List all Compute Engine instances in a zone"""
 14 |         try:
 15 |             # Get client from client_instances
 16 |             client = client_instances.get_clients().compute
 17 |             project_id = project_id or client_instances.get_project_id()
 18 |             zone = zone or client_instances.get_location()
 19 | 
 20 |             instance_list = client.list(project=project_id, zone=zone)
 21 |             result = []
 22 |             for instance in instance_list:
 23 |                 result.append(
 24 |                     {
 25 |                         "name": instance.name,
 26 |                         "status": instance.status,
 27 |                         "machine_type": instance.machine_type.split("/")[-1]
 28 |                         if instance.machine_type
 29 |                         else None,
 30 |                         "internal_ip": instance.network_interfaces[0].network_i_p
 31 |                         if instance.network_interfaces
 32 |                         and len(instance.network_interfaces) > 0
 33 |                         else None,
 34 |                         "creation_timestamp": instance.creation_timestamp,
 35 |                     }
 36 |                 )
 37 |             return json.dumps(result, indent=2)
 38 |         except Exception as e:
 39 |             return json.dumps({"error": str(e)}, indent=2)
 40 | 
 41 |     # Tools
 42 |     @mcp_instance.tool()
 43 |     def create_instance(
 44 |         instance_name: str,
 45 |         machine_type: str,
 46 |         project_id: str = None,
 47 |         zone: str = None,
 48 |         image: str = "projects/debian-cloud/global/images/family/debian-11",
 49 |         network: str = "global/networks/default",
 50 |         disk_size_gb: int = 10,
 51 |     ) -> str:
 52 |         """
 53 |         Create a new Compute Engine instance
 54 | 
 55 |         Args:
 56 |             instance_name: Name for the new instance
 57 |             machine_type: Machine type (e.g., n2-standard-2)
 58 |             project_id: GCP project ID (defaults to configured project)
 59 |             zone: GCP zone (defaults to configured location)
 60 |             image: Source image for boot disk
 61 |             network: Network to connect to
 62 |             disk_size_gb: Size of boot disk in GB
 63 |         """
 64 |         try:
 65 |             # Get client from client_instances
 66 |             client = client_instances.get_clients().compute
 67 |             project_id = project_id or client_instances.get_project_id()
 68 |             zone = zone or client_instances.get_location()
 69 | 
 70 |             # Build instance configuration
 71 |             instance_config = {
 72 |                 "name": instance_name,
 73 |                 "machine_type": f"zones/{zone}/machineTypes/{machine_type}",
 74 |                 "disks": [
 75 |                     {
 76 |                         "boot": True,
 77 |                         "auto_delete": True,
 78 |                         "initialize_params": {
 79 |                             "source_image": image,
 80 |                             "disk_size_gb": disk_size_gb,
 81 |                         },
 82 |                     }
 83 |                 ],
 84 |                 "network_interfaces": [
 85 |                     {
 86 |                         "network": network,
 87 |                         "access_configs": [
 88 |                             {"type": "ONE_TO_ONE_NAT", "name": "External NAT"}
 89 |                         ],
 90 |                     }
 91 |                 ],
 92 |             }
 93 | 
 94 |             print(f"Creating instance {instance_name} in {zone}...")
 95 |             operation = client.insert(
 96 |                 project=project_id, zone=zone, instance_resource=instance_config
 97 |             )
 98 | 
 99 |             # Wait for operation to complete
100 |             operation.result()
101 |             return json.dumps(
102 |                 {
103 |                     "status": "success",
104 |                     "instance_name": instance_name,
105 |                     "zone": zone,
106 |                     "operation_type": "insert",
107 |                     "operation_status": "DONE",
108 |                 },
109 |                 indent=2,
110 |             )
111 |         except Exception as e:
112 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
113 | 
114 |     @mcp_instance.tool()
115 |     def list_instances(
116 |         project_id: str = None,
117 |         zone: str = None,
118 |         filter: Optional[str] = None,
119 |     ) -> str:
120 |         """
121 |         List Compute Engine instances in a zone
122 | 
123 |         Args:
124 |             project_id: GCP project ID (defaults to configured project)
125 |             zone: GCP zone (defaults to configured location)
126 |             filter: Optional filter expression (e.g., "status=RUNNING")
127 |         """
128 |         try:
129 |             # Get client from client_instances
130 |             client = client_instances.get_clients().compute
131 |             project_id = project_id or client_instances.get_project_id()
132 |             zone = zone or client_instances.get_location()
133 | 
134 |             instance_list = client.list(project=project_id, zone=zone, filter=filter)
135 | 
136 |             instances = []
137 |             for instance in instance_list:
138 |                 instances.append(
139 |                     {
140 |                         "name": instance.name,
141 |                         "status": instance.status,
142 |                         "machine_type": instance.machine_type.split("/")[-1]
143 |                         if instance.machine_type
144 |                         else None,
145 |                         "internal_ip": instance.network_interfaces[0].network_i_p
146 |                         if instance.network_interfaces
147 |                         and len(instance.network_interfaces) > 0
148 |                         else None,
149 |                         "creation_timestamp": instance.creation_timestamp,
150 |                     }
151 |                 )
152 | 
153 |             return json.dumps(
154 |                 {"status": "success", "instances": instances, "count": len(instances)},
155 |                 indent=2,
156 |             )
157 |         except Exception as e:
158 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
159 | 
160 |     @mcp_instance.tool()
161 |     def start_instance(
162 |         instance_name: str,
163 |         project_id: str = None,
164 |         zone: str = None,
165 |     ) -> str:
166 |         """
167 |         Start a stopped Compute Engine instance
168 | 
169 |         Args:
170 |             instance_name: Name of the instance to start
171 |             project_id: GCP project ID (defaults to configured project)
172 |             zone: GCP zone (defaults to configured location)
173 |         """
174 |         try:
175 |             # Get client from client_instances
176 |             client = client_instances.get_clients().compute
177 |             project_id = project_id or client_instances.get_project_id()
178 |             zone = zone or client_instances.get_location()
179 | 
180 |             print(f"Starting instance {instance_name}...")
181 |             operation = client.start(
182 |                 project=project_id, zone=zone, instance=instance_name
183 |             )
184 |             operation.result()
185 | 
186 |             return json.dumps(
187 |                 {
188 |                     "status": "success",
189 |                     "instance_name": instance_name,
190 |                     "operation_status": "DONE",
191 |                 },
192 |                 indent=2,
193 |             )
194 |         except Exception as e:
195 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
196 | 
197 |     @mcp_instance.tool()
198 |     def stop_instance(
199 |         instance_name: str,
200 |         project_id: str = None,
201 |         zone: str = None,
202 |     ) -> str:
203 |         """
204 |         Stop a running Compute Engine instance
205 | 
206 |         Args:
207 |             instance_name: Name of the instance to stop
208 |             project_id: GCP project ID (defaults to configured project)
209 |             zone: GCP zone (defaults to configured location)
210 |         """
211 |         try:
212 |             # Get client from client_instances
213 |             client = client_instances.get_clients().compute
214 |             project_id = project_id or client_instances.get_project_id()
215 |             zone = zone or client_instances.get_location()
216 | 
217 |             print(f"Stopping instance {instance_name}...")
218 |             operation = client.stop(
219 |                 project=project_id, zone=zone, instance=instance_name
220 |             )
221 |             operation.result()
222 | 
223 |             return json.dumps(
224 |                 {
225 |                     "status": "success",
226 |                     "instance_name": instance_name,
227 |                     "operation_status": "DONE",
228 |                 },
229 |                 indent=2,
230 |             )
231 |         except Exception as e:
232 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
233 | 
234 |     @mcp_instance.tool()
235 |     def delete_instance(
236 |         instance_name: str,
237 |         project_id: str = None,
238 |         zone: str = None,
239 |     ) -> str:
240 |         """
241 |         Delete a Compute Engine instance
242 | 
243 |         Args:
244 |             instance_name: Name of the instance to delete
245 |             project_id: GCP project ID (defaults to configured project)
246 |             zone: GCP zone (defaults to configured location)
247 |         """
248 |         try:
249 |             # Get client from client_instances
250 |             client = client_instances.get_clients().compute
251 |             project_id = project_id or client_instances.get_project_id()
252 |             zone = zone or client_instances.get_location()
253 | 
254 |             print(f"Deleting instance {instance_name}...")
255 |             operation = client.delete(
256 |                 project=project_id, zone=zone, instance=instance_name
257 |             )
258 |             operation.result()
259 | 
260 |             return json.dumps(
261 |                 {
262 |                     "status": "success",
263 |                     "instance_name": instance_name,
264 |                     "operation_status": "DONE",
265 |                 },
266 |                 indent=2,
267 |             )
268 |         except Exception as e:
269 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
270 | 
271 |     # Prompts
272 |     @mcp_instance.prompt()
273 |     def instance_config_prompt(workload_type: str = "web-server") -> str:
274 |         """Prompt for creating Compute Engine configurations"""
275 |         return f"""
276 | I need to configure a Compute Engine instance for {workload_type}. Please help with:
277 | 
278 | 1. Recommended machine types for {workload_type}
279 | 2. Disk type and size recommendations
280 | 3. Network configuration best practices
281 | 4. Security considerations (service accounts, firewall rules)
282 | 5. Cost optimization strategies
283 | """
284 | 
285 |     @mcp_instance.prompt()
286 |     def troubleshooting_prompt(issue: str = "instance not responding") -> str:
287 |         """Prompt for troubleshooting Compute Engine issues"""
288 |         return f"""
289 | I'm experiencing {issue} with my Compute Engine instance. Please guide me through:
290 | 
291 | 1. Common causes for this issue
292 | 2. Diagnostic steps using Cloud Console and CLI
293 | 3. Log analysis techniques
294 | 4. Recovery procedures
295 | 5. Prevention strategies
296 | """
297 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_audit_logs.py:
--------------------------------------------------------------------------------

```python
  1 | import json
  2 | 
  3 | from google.cloud import logging_v2
  4 | from services import client_instances
  5 | 
  6 | 
  7 | def register(mcp_instance):
  8 |     """Register all Cloud Audit Logs resources and tools with the MCP instance."""
  9 | 
 10 |     # Resources
 11 |     @mcp_instance.resource("gcp://audit-logs/{project_id}")
 12 |     def list_audit_logs_resource(project_id: str = None) -> str:
 13 |         """List recent audit logs from a specific project"""
 14 |         try:
 15 |             # Get client from client_instances
 16 |             client = client_instances.get_clients().logging
 17 |             project_id = project_id or client_instances.get_project_id()
 18 | 
 19 |             # Filter for audit logs only
 20 |             filter_str = 'logName:"cloudaudit.googleapis.com"'
 21 | 
 22 |             # List log entries
 23 |             entries = client.list_log_entries(
 24 |                 request={
 25 |                     "resource_names": [f"projects/{project_id}"],
 26 |                     "filter": filter_str,
 27 |                     "page_size": 10,  # Limiting to 10 for responsiveness
 28 |                 }
 29 |             )
 30 | 
 31 |             result = []
 32 |             for entry in entries:
 33 |                 log_data = {
 34 |                     "timestamp": entry.timestamp.isoformat()
 35 |                     if entry.timestamp
 36 |                     else None,
 37 |                     "severity": entry.severity.name if entry.severity else None,
 38 |                     "log_name": entry.log_name,
 39 |                     "resource": {
 40 |                         "type": entry.resource.type,
 41 |                         "labels": dict(entry.resource.labels)
 42 |                         if entry.resource.labels
 43 |                         else {},
 44 |                     }
 45 |                     if entry.resource
 46 |                     else {},
 47 |                 }
 48 | 
 49 |                 # Handle payload based on type
 50 |                 if hasattr(entry, "json_payload") and entry.json_payload:
 51 |                     log_data["payload"] = dict(entry.json_payload)
 52 |                 elif hasattr(entry, "proto_payload") and entry.proto_payload:
 53 |                     log_data["payload"] = "Proto payload (details omitted)"
 54 |                 elif hasattr(entry, "text_payload") and entry.text_payload:
 55 |                     log_data["payload"] = entry.text_payload
 56 | 
 57 |                 result.append(log_data)
 58 | 
 59 |             return json.dumps(result, indent=2)
 60 |         except Exception as e:
 61 |             return json.dumps({"error": str(e)}, indent=2)
 62 | 
 63 |     @mcp_instance.resource("gcp://audit-logs/{project_id}/sinks")
 64 |     def list_log_sinks_resource(project_id: str = None) -> str:
 65 |         """List log sinks configured for a project"""
 66 |         try:
 67 |             # Get client from client_instances
 68 |             client = client_instances.get_clients().logging
 69 |             project_id = project_id or client_instances.get_project_id()
 70 | 
 71 |             parent = f"projects/{project_id}"
 72 |             sinks = client.list_sinks(parent=parent)
 73 | 
 74 |             result = []
 75 |             for sink in sinks:
 76 |                 result.append(
 77 |                     {
 78 |                         "name": sink.name,
 79 |                         "destination": sink.destination,
 80 |                         "filter": sink.filter,
 81 |                         "description": sink.description,
 82 |                         "disabled": sink.disabled,
 83 |                     }
 84 |                 )
 85 | 
 86 |             return json.dumps(result, indent=2)
 87 |         except Exception as e:
 88 |             return json.dumps({"error": str(e)}, indent=2)
 89 | 
 90 |     # Tools
 91 |     @mcp_instance.tool()
 92 |     def list_audit_logs(
 93 |         project_id: str = None,
 94 |         filter_str: str = 'logName:"cloudaudit.googleapis.com"',
 95 |         page_size: int = 20,
 96 |     ) -> str:
 97 |         """
 98 |         List Google Cloud Audit logs from a project
 99 | 
100 |         Args:
101 |             project_id: GCP project ID (defaults to configured project)
102 |             filter_str: Log filter expression (defaults to audit logs)
103 |             page_size: Maximum number of entries to return
104 |         """
105 |         try:
106 |             # Get client from client_instances
107 |             client = client_instances.get_clients().logging
108 |             project_id = project_id or client_instances.get_project_id()
109 | 
110 |             print(f"Retrieving audit logs from project {project_id}...")
111 | 
112 |             # List log entries
113 |             entries = client.list_log_entries(
114 |                 request={
115 |                     "resource_names": [f"projects/{project_id}"],
116 |                     "filter": filter_str,
117 |                     "page_size": page_size,
118 |                 }
119 |             )
120 | 
121 |             result = []
122 |             for entry in entries:
123 |                 log_data = {
124 |                     "timestamp": entry.timestamp.isoformat()
125 |                     if entry.timestamp
126 |                     else None,
127 |                     "severity": entry.severity.name if entry.severity else None,
128 |                     "log_name": entry.log_name,
129 |                     "resource": {
130 |                         "type": entry.resource.type,
131 |                         "labels": dict(entry.resource.labels)
132 |                         if entry.resource.labels
133 |                         else {},
134 |                     }
135 |                     if entry.resource
136 |                     else {},
137 |                 }
138 | 
139 |                 # Handle payload based on type
140 |                 if hasattr(entry, "json_payload") and entry.json_payload:
141 |                     log_data["payload"] = dict(entry.json_payload)
142 |                 elif hasattr(entry, "proto_payload") and entry.proto_payload:
143 |                     log_data["payload"] = "Proto payload (details omitted)"
144 |                 elif hasattr(entry, "text_payload") and entry.text_payload:
145 |                     log_data["payload"] = entry.text_payload
146 | 
147 |                 result.append(log_data)
148 | 
149 |             return json.dumps(
150 |                 {
151 |                     "status": "success",
152 |                     "entries": result,
153 |                     "count": len(result),
154 |                 },
155 |                 indent=2,
156 |             )
157 |         except Exception as e:
158 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
159 | 
160 |     @mcp_instance.tool()
161 |     def filter_admin_activity_logs(
162 |         project_id: str = None,
163 |         service_name: str = "",
164 |         resource_type: str = "",
165 |         time_range: str = "1h",
166 |         page_size: int = 20,
167 |     ) -> str:
168 |         """
169 |         Filter Admin Activity audit logs for specific services or resource types
170 | 
171 |         Args:
172 |             project_id: GCP project ID (defaults to configured project)
173 |             service_name: Optional service name to filter by (e.g., compute.googleapis.com)
174 |             resource_type: Optional resource type to filter by (e.g., gce_instance)
175 |             time_range: Time range for logs (e.g., 1h, 24h, 7d)
176 |             page_size: Maximum number of entries to return
177 |         """
178 |         try:
179 |             # Get client from client_instances
180 |             client = client_instances.get_clients().logging
181 |             project_id = project_id or client_instances.get_project_id()
182 | 
183 |             # Build filter string
184 |             filter_parts = ['logName:"cloudaudit.googleapis.com%2Factivity"']
185 | 
186 |             if service_name:
187 |                 filter_parts.append(f'protoPayload.serviceName="{service_name}"')
188 | 
189 |             if resource_type:
190 |                 filter_parts.append(f'resource.type="{resource_type}"')
191 | 
192 |             if time_range:
193 |                 filter_parts.append(f'timestamp >= "-{time_range}"')
194 | 
195 |             filter_str = " AND ".join(filter_parts)
196 | 
197 |             print(f"Filtering Admin Activity logs with: {filter_str}")
198 | 
199 |             # List log entries
200 |             entries = client.list_log_entries(
201 |                 request={
202 |                     "resource_names": [f"projects/{project_id}"],
203 |                     "filter": filter_str,
204 |                     "page_size": page_size,
205 |                 }
206 |             )
207 | 
208 |             result = []
209 |             for entry in entries:
210 |                 # Extract relevant fields for admin activity logs
211 |                 log_data = {
212 |                     "timestamp": entry.timestamp.isoformat()
213 |                     if entry.timestamp
214 |                     else None,
215 |                     "method_name": None,
216 |                     "resource_name": None,
217 |                     "service_name": None,
218 |                     "user": None,
219 |                 }
220 | 
221 |                 # Extract data from proto payload
222 |                 if hasattr(entry, "proto_payload") and entry.proto_payload:
223 |                     payload = entry.proto_payload
224 |                     if hasattr(payload, "method_name"):
225 |                         log_data["method_name"] = payload.method_name
226 |                     if hasattr(payload, "resource_name"):
227 |                         log_data["resource_name"] = payload.resource_name
228 |                     if hasattr(payload, "service_name"):
229 |                         log_data["service_name"] = payload.service_name
230 | 
231 |                     # Extract authentication info
232 |                     if hasattr(payload, "authentication_info"):
233 |                         auth_info = payload.authentication_info
234 |                         if hasattr(auth_info, "principal_email"):
235 |                             log_data["user"] = auth_info.principal_email
236 | 
237 |                 result.append(log_data)
238 | 
239 |             return json.dumps(
240 |                 {
241 |                     "status": "success",
242 |                     "entries": result,
243 |                     "count": len(result),
244 |                     "filter": filter_str,
245 |                 },
246 |                 indent=2,
247 |             )
248 |         except Exception as e:
249 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
250 | 
251 |     @mcp_instance.tool()
252 |     def create_log_sink(
253 |         sink_name: str,
254 |         destination: str,
255 |         project_id: str = None,
256 |         filter_str: str = 'logName:"cloudaudit.googleapis.com"',
257 |         description: str = "",
258 |     ) -> str:
259 |         """
260 |         Create a log sink to export audit logs to a destination
261 | 
262 |         Args:
263 |             sink_name: Name for the new log sink
264 |             destination: Destination for logs (e.g., storage.googleapis.com/my-bucket)
265 |             project_id: GCP project ID (defaults to configured project)
266 |             filter_str: Log filter expression (defaults to audit logs)
267 |             description: Optional description for the sink
268 |         """
269 |         try:
270 |             # Get client from client_instances
271 |             client = client_instances.get_clients().logging
272 |             project_id = project_id or client_instances.get_project_id()
273 | 
274 |             parent = f"projects/{project_id}"
275 | 
276 |             # Create sink configuration
277 |             sink = logging_v2.LogSink(
278 |                 name=sink_name,
279 |                 destination=destination,
280 |                 filter=filter_str,
281 |                 description=description,
282 |             )
283 | 
284 |             print(f"Creating log sink '{sink_name}' to export to {destination}...")
285 | 
286 |             # Create the sink
287 |             response = client.create_sink(
288 |                 request={
289 |                     "parent": parent,
290 |                     "sink": sink,
291 |                 }
292 |             )
293 | 
294 |             # Important: Recommend setting up IAM permissions
295 |             writer_identity = response.writer_identity
296 | 
297 |             return json.dumps(
298 |                 {
299 |                     "status": "success",
300 |                     "name": response.name,
301 |                     "destination": response.destination,
302 |                     "filter": response.filter,
303 |                     "writer_identity": writer_identity,
304 |                     "next_steps": f"Important: Grant {writer_identity} the appropriate permissions on the destination",
305 |                 },
306 |                 indent=2,
307 |             )
308 |         except Exception as e:
309 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
310 | 
311 |     # Prompts
312 |     @mcp_instance.prompt()
313 |     def audit_log_investigation() -> str:
314 |         """Prompt for investigating security incidents through audit logs"""
315 |         return """
316 | I need to investigate a potential security incident in my Google Cloud project.
317 | 
318 | Please help me:
319 | 1. Determine what types of audit logs I should check (Admin Activity, Data Access, System Event)
320 | 2. Create an effective filter query to find relevant logs
321 | 3. Identify key fields to examine for signs of unusual activity
322 | 4. Suggest common indicators of potential security issues in audit logs
323 | """
324 | 
325 |     @mcp_instance.prompt()
326 |     def log_export_setup() -> str:
327 |         """Prompt for setting up log exports for compliance"""
328 |         return """
329 | I need to set up log exports for compliance requirements.
330 | 
331 | Please help me:
332 | 1. Understand the different destinations available for log exports
333 | 2. Design an effective filter to capture all required audit events
334 | 3. Implement a log sink with appropriate permissions
335 | 4. Verify my setup is correctly capturing and exporting logs
336 | """
337 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_storage.py:
--------------------------------------------------------------------------------

```python
  1 | import json
  2 | import os
  3 | from typing import Dict, Optional
  4 | 
  5 | from services import client_instances
  6 | 
  7 | 
  8 | def register(mcp_instance):
  9 |     """Register all Cloud Storage resources and tools with the MCP instance."""
 10 | 
 11 |     # Resources
 12 |     @mcp_instance.resource("gcp://storage/{project_id}/buckets")
 13 |     def list_buckets_resource(project_id: str = None) -> str:
 14 |         """List all Cloud Storage buckets in a project"""
 15 |         try:
 16 |             # Get client from client_instances
 17 |             client = client_instances.get_clients().storage
 18 |             project_id = project_id or client_instances.get_project_id()
 19 | 
 20 |             buckets = client.list_buckets()
 21 | 
 22 |             result = []
 23 |             for bucket in buckets:
 24 |                 result.append(
 25 |                     {
 26 |                         "name": bucket.name,
 27 |                         "location": bucket.location,
 28 |                         "storage_class": bucket.storage_class,
 29 |                         "time_created": bucket.time_created.isoformat()
 30 |                         if bucket.time_created
 31 |                         else None,
 32 |                         "versioning_enabled": bucket.versioning_enabled,
 33 |                         "labels": dict(bucket.labels) if bucket.labels else {},
 34 |                     }
 35 |                 )
 36 | 
 37 |             return json.dumps(result, indent=2)
 38 |         except Exception as e:
 39 |             return json.dumps({"error": str(e)}, indent=2)
 40 | 
 41 |     @mcp_instance.resource("gcp://storage/{project_id}/bucket/{bucket_name}")
 42 |     def get_bucket_resource(project_id: str = None, bucket_name: str = None) -> str:
 43 |         """Get details for a specific Cloud Storage bucket"""
 44 |         try:
 45 |             # Get client from client_instances
 46 |             client = client_instances.get_clients().storage
 47 |             project_id = project_id or client_instances.get_project_id()
 48 | 
 49 |             bucket = client.get_bucket(bucket_name)
 50 |             result = {
 51 |                 "name": bucket.name,
 52 |                 "location": bucket.location,
 53 |                 "storage_class": bucket.storage_class,
 54 |                 "time_created": bucket.time_created.isoformat()
 55 |                 if bucket.time_created
 56 |                 else None,
 57 |                 "versioning_enabled": bucket.versioning_enabled,
 58 |                 "requester_pays": bucket.requester_pays,
 59 |                 "lifecycle_rules": bucket.lifecycle_rules,
 60 |                 "cors": bucket.cors,
 61 |                 "labels": dict(bucket.labels) if bucket.labels else {},
 62 |             }
 63 |             return json.dumps(result, indent=2)
 64 |         except Exception as e:
 65 |             return json.dumps({"error": str(e)}, indent=2)
 66 | 
 67 |     @mcp_instance.resource("gcp://storage/{project_id}/bucket/{bucket_name}/objects")
 68 |     def list_objects_resource(project_id: str = None, bucket_name: str = None) -> str:
 69 |         """List objects in a specific Cloud Storage bucket"""
 70 |         prefix = ""  # Move it inside the function with a default value
 71 |         try:
 72 |             # Get client from client_instances
 73 |             client = client_instances.get_clients().storage
 74 |             project_id = project_id or client_instances.get_project_id()
 75 | 
 76 |             bucket = client.get_bucket(bucket_name)
 77 |             blobs = bucket.list_blobs(prefix=prefix)
 78 | 
 79 |             result = []
 80 |             for blob in blobs:
 81 |                 result.append(
 82 |                     {
 83 |                         "name": blob.name,
 84 |                         "size": blob.size,
 85 |                         "updated": blob.updated.isoformat() if blob.updated else None,
 86 |                         "content_type": blob.content_type,
 87 |                         "md5_hash": blob.md5_hash,
 88 |                         "generation": blob.generation,
 89 |                         "metadata": blob.metadata,
 90 |                     }
 91 |                 )
 92 | 
 93 |             return json.dumps(result, indent=2)
 94 |         except Exception as e:
 95 |             return json.dumps({"error": str(e)}, indent=2)
 96 | 
 97 |     # Tools
 98 |     @mcp_instance.tool()
 99 |     def create_bucket(
100 |         bucket_name: str,
101 |         project_id: str = None,
102 |         location: str = "us-central1",
103 |         storage_class: str = "STANDARD",
104 |         labels: Optional[Dict[str, str]] = None,
105 |         versioning_enabled: bool = False,
106 |     ) -> str:
107 |         """
108 |         Create a Cloud Storage bucket
109 | 
110 |         Args:
111 |             bucket_name: Name for the new bucket
112 |             project_id: GCP project ID (defaults to configured project)
113 |             location: GCP region (e.g., us-central1)
114 |             storage_class: Storage class (STANDARD, NEARLINE, COLDLINE, ARCHIVE)
115 |             labels: Optional key-value pairs for bucket labels
116 |             versioning_enabled: Whether to enable object versioning
117 |         """
118 |         try:
119 |             # Get client from client_instances
120 |             client = client_instances.get_clients().storage
121 |             project_id = project_id or client_instances.get_project_id()
122 | 
123 |             # Validate storage class
124 |             valid_storage_classes = ["STANDARD", "NEARLINE", "COLDLINE", "ARCHIVE"]
125 |             if storage_class not in valid_storage_classes:
126 |                 return json.dumps(
127 |                     {
128 |                         "status": "error",
129 |                         "message": f"Invalid storage class: {storage_class}. Valid classes are: {', '.join(valid_storage_classes)}",
130 |                     },
131 |                     indent=2,
132 |                 )
133 | 
134 |             # Log info (similar to ctx.info)
135 |             print(f"Creating bucket {bucket_name} in {location}...")
136 |             bucket = client.bucket(bucket_name)
137 |             bucket.create(location=location, storage_class=storage_class, labels=labels)
138 | 
139 |             # Set versioning if enabled
140 |             if versioning_enabled:
141 |                 bucket.versioning_enabled = True
142 |                 bucket.patch()
143 | 
144 |             return json.dumps(
145 |                 {
146 |                     "status": "success",
147 |                     "name": bucket.name,
148 |                     "location": bucket.location,
149 |                     "storage_class": bucket.storage_class,
150 |                     "time_created": bucket.time_created.isoformat()
151 |                     if bucket.time_created
152 |                     else None,
153 |                     "versioning_enabled": bucket.versioning_enabled,
154 |                     "url": f"gs://{bucket_name}/",
155 |                 },
156 |                 indent=2,
157 |             )
158 |         except Exception as e:
159 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
160 | 
161 |     @mcp_instance.tool()
162 |     def list_buckets(project_id: str = None, prefix: str = "") -> str:
163 |         """
164 |         List Cloud Storage buckets in a project
165 | 
166 |         Args:
167 |             project_id: GCP project ID (defaults to configured project)
168 |             prefix: Optional prefix to filter bucket names
169 |         """
170 |         try:
171 |             # Get client from client_instances
172 |             client = client_instances.get_clients().storage
173 |             project_id = project_id or client_instances.get_project_id()
174 | 
175 |             # Log info (similar to ctx.info)
176 |             print(f"Listing buckets in project {project_id}...")
177 | 
178 |             # List buckets with optional prefix filter
179 |             if prefix:
180 |                 buckets = [
181 |                     b for b in client.list_buckets() if b.name.startswith(prefix)
182 |                 ]
183 |             else:
184 |                 buckets = list(client.list_buckets())
185 | 
186 |             result = []
187 |             for bucket in buckets:
188 |                 result.append(
189 |                     {
190 |                         "name": bucket.name,
191 |                         "location": bucket.location,
192 |                         "storage_class": bucket.storage_class,
193 |                         "time_created": bucket.time_created.isoformat()
194 |                         if bucket.time_created
195 |                         else None,
196 |                     }
197 |                 )
198 | 
199 |             return json.dumps(
200 |                 {"status": "success", "buckets": result, "count": len(result)}, indent=2
201 |             )
202 |         except Exception as e:
203 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
204 | 
205 |     @mcp_instance.tool()
206 |     def upload_object(
207 |         bucket_name: str,
208 |         source_file_path: str,
209 |         destination_blob_name: Optional[str] = None,
210 |         project_id: str = None,
211 |         content_type: Optional[str] = None,
212 |         metadata: Optional[Dict[str, str]] = None,
213 |     ) -> str:
214 |         """
215 |         Upload an object to a Cloud Storage bucket
216 | 
217 |         Args:
218 |             bucket_name: Name of the bucket
219 |             source_file_path: Local path to the file to upload
220 |             destination_blob_name: Name to assign to the blob (defaults to file basename)
221 |             project_id: GCP project ID (defaults to configured project)
222 |             content_type: Content type of the object (optional)
223 |             metadata: Custom metadata dictionary (optional)
224 |         """
225 |         try:
226 |             # Get client from client_instances
227 |             client = client_instances.get_clients().storage
228 |             project_id = project_id or client_instances.get_project_id()
229 | 
230 |             # Check if file exists
231 |             if not os.path.exists(source_file_path):
232 |                 return json.dumps(
233 |                     {
234 |                         "status": "error",
235 |                         "message": f"File not found: {source_file_path}",
236 |                     },
237 |                     indent=2,
238 |                 )
239 | 
240 |             # Get bucket
241 |             bucket = client.bucket(bucket_name)
242 | 
243 |             # Use filename if destination_blob_name not provided
244 |             if not destination_blob_name:
245 |                 destination_blob_name = os.path.basename(source_file_path)
246 | 
247 |             # Create blob
248 |             blob = bucket.blob(destination_blob_name)
249 | 
250 |             # Set content type if provided
251 |             if content_type:
252 |                 blob.content_type = content_type
253 | 
254 |             # Set metadata if provided
255 |             if metadata:
256 |                 blob.metadata = metadata
257 | 
258 |             # Get file size for progress reporting
259 |             file_size = os.path.getsize(source_file_path)
260 |             print(
261 |                 f"Uploading {source_file_path} ({file_size} bytes) to gs://{bucket_name}/{destination_blob_name}..."
262 |             )
263 | 
264 |             # Upload file
265 |             blob.upload_from_filename(source_file_path)
266 | 
267 |             return json.dumps(
268 |                 {
269 |                     "status": "success",
270 |                     "bucket": bucket_name,
271 |                     "name": blob.name,
272 |                     "size": blob.size,
273 |                     "content_type": blob.content_type,
274 |                     "md5_hash": blob.md5_hash,
275 |                     "generation": blob.generation,
276 |                     "public_url": blob.public_url,
277 |                     "gsutil_uri": f"gs://{bucket_name}/{destination_blob_name}",
278 |                 },
279 |                 indent=2,
280 |             )
281 |         except Exception as e:
282 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
283 | 
284 |     @mcp_instance.tool()
285 |     def download_object(
286 |         bucket_name: str,
287 |         source_blob_name: str,
288 |         destination_file_path: str,
289 |         project_id: str = None,
290 |     ) -> str:
291 |         """
292 |         Download an object from a Cloud Storage bucket
293 | 
294 |         Args:
295 |             bucket_name: Name of the bucket
296 |             source_blob_name: Name of the blob to download
297 |             destination_file_path: Local path where the file should be saved
298 |             project_id: GCP project ID (defaults to configured project)
299 |         """
300 |         try:
301 |             # Get client from client_instances
302 |             client = client_instances.get_clients().storage
303 |             project_id = project_id or client_instances.get_project_id()
304 | 
305 |             # Get bucket and blob
306 |             bucket = client.bucket(bucket_name)
307 |             blob = bucket.blob(source_blob_name)
308 | 
309 |             # Check if blob exists
310 |             if not blob.exists():
311 |                 return json.dumps(
312 |                     {
313 |                         "status": "error",
314 |                         "message": f"Object not found: gs://{bucket_name}/{source_blob_name}",
315 |                     },
316 |                     indent=2,
317 |                 )
318 | 
319 |             # Create directory if doesn't exist
320 |             os.makedirs(
321 |                 os.path.dirname(os.path.abspath(destination_file_path)), exist_ok=True
322 |             )
323 | 
324 |             print(
325 |                 f"Downloading gs://{bucket_name}/{source_blob_name} to {destination_file_path}..."
326 |             )
327 | 
328 |             # Download file
329 |             blob.download_to_filename(destination_file_path)
330 | 
331 |             return json.dumps(
332 |                 {
333 |                     "status": "success",
334 |                     "bucket": bucket_name,
335 |                     "blob_name": source_blob_name,
336 |                     "size": blob.size,
337 |                     "content_type": blob.content_type,
338 |                     "downloaded_to": destination_file_path,
339 |                 },
340 |                 indent=2,
341 |             )
342 |         except Exception as e:
343 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
344 | 
345 |     @mcp_instance.tool()
346 |     def delete_object(bucket_name: str, blob_name: str, project_id: str = None) -> str:
347 |         """
348 |         Delete an object from a Cloud Storage bucket
349 | 
350 |         Args:
351 |             bucket_name: Name of the bucket
352 |             blob_name: Name of the blob to delete
353 |             project_id: GCP project ID (defaults to configured project)
354 |         """
355 |         try:
356 |             # Get client from client_instances
357 |             client = client_instances.get_clients().storage
358 |             project_id = project_id or client_instances.get_project_id()
359 | 
360 |             # Get bucket and blob
361 |             bucket = client.bucket(bucket_name)
362 |             blob = bucket.blob(blob_name)
363 | 
364 |             print(f"Deleting gs://{bucket_name}/{blob_name}...")
365 | 
366 |             # Delete the blob
367 |             blob.delete()
368 | 
369 |             return json.dumps(
370 |                 {
371 |                     "status": "success",
372 |                     "message": f"Successfully deleted gs://{bucket_name}/{blob_name}",
373 |                 },
374 |                 indent=2,
375 |             )
376 |         except Exception as e:
377 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
378 | 
379 |     # Prompts
380 |     @mcp_instance.prompt()
381 |     def create_bucket_prompt(location: str = "us-central1") -> str:
382 |         """Prompt for creating a new Cloud Storage bucket"""
383 |         return f"""
384 | I need to create a new Cloud Storage bucket in {location}.
385 | 
386 | Please help me with:
387 | 1. Understanding storage classes (STANDARD, NEARLINE, COLDLINE, ARCHIVE)
388 | 2. Best practices for bucket naming
389 | 3. When to enable versioning
390 | 4. Recommendations for bucket security settings
391 | 5. Steps to create the bucket
392 | """
393 | 
394 |     @mcp_instance.prompt()
395 |     def upload_file_prompt() -> str:
396 |         """Prompt for help with uploading files to Cloud Storage"""
397 |         return """
398 | I need to upload files to a Cloud Storage bucket.
399 | 
400 | Please help me understand:
401 | 1. The best way to organize files in Cloud Storage
402 | 2. When to use folders/prefixes
403 | 3. How to set appropriate permissions on uploaded files
404 | 4. How to make files publicly accessible (if needed)
405 | 5. The steps to perform the upload
406 | """
407 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/artifact_registry.py:
--------------------------------------------------------------------------------

```python
  1 | import json
  2 | 
  3 | from services import client_instances
  4 | 
  5 | 
  6 | def register(mcp_instance):
  7 |     """Register all resources and tools with the MCP instance."""
  8 | 
  9 |     @mcp_instance.resource(
 10 |         "gcp://artifactregistry/{project_id}/{location}/repositories"
 11 |     )
 12 |     def list_repositories_resource(project_id: str = None, location: str = None) -> str:
 13 |         """List all Artifact Registry repositories in a specific location"""
 14 |         try:
 15 |             # define the client
 16 |             client = client_instances.get_clients().artifactregistry
 17 |             project_id = project_id or client_instances.get_project_id()
 18 |             location = location or client_instances.get_location()
 19 | 
 20 |             # Use parameters directly from URL path
 21 |             parent = f"projects/{project_id}/locations/{location}"
 22 | 
 23 |             repositories = client.list_repositories(parent=parent)
 24 |             result = []
 25 |             for repo in repositories:
 26 |                 result.append(
 27 |                     {
 28 |                         "name": repo.name.split("/")[-1],
 29 |                         "format": repo.format.name
 30 |                         if hasattr(repo.format, "name")
 31 |                         else str(repo.format),
 32 |                         "description": repo.description,
 33 |                         "create_time": repo.create_time.isoformat()
 34 |                         if repo.create_time
 35 |                         else None,
 36 |                         "update_time": repo.update_time.isoformat()
 37 |                         if repo.update_time
 38 |                         else None,
 39 |                         "kms_key_name": repo.kms_key_name,
 40 |                         "labels": dict(repo.labels) if repo.labels else {},
 41 |                     }
 42 |                 )
 43 |             return json.dumps(result, indent=2)
 44 |         except Exception as e:
 45 |             return json.dumps({"error": str(e)}, indent=2)
 46 | 
 47 |     # Add a tool for creating repositories
 48 |     @mcp_instance.tool()
 49 |     def create_artifact_repository(
 50 |         name: str, format: str, description: str = ""
 51 |     ) -> str:
 52 |         """Create a new Artifact Registry repository"""
 53 |         try:
 54 |             # define the client
 55 |             client = client_instances.get_clients().artifactregistry
 56 |             project_id = client_instances.get_project_id()
 57 |             location = client_instances.get_location()
 58 | 
 59 |             parent = f"projects/{project_id}/locations/{location}"
 60 | 
 61 |             # Create repository request
 62 |             from google.cloud.artifactregistry_v1 import (
 63 |                 CreateRepositoryRequest,
 64 |                 Repository,
 65 |             )
 66 | 
 67 |             # Create the repository format enum value
 68 |             if format.upper() not in ["DOCKER", "MAVEN", "NPM", "PYTHON", "APT", "YUM"]:
 69 |                 return json.dumps(
 70 |                     {
 71 |                         "error": f"Invalid format: {format}. Must be one of: DOCKER, MAVEN, NPM, PYTHON, APT, YUM"
 72 |                     },
 73 |                     indent=2,
 74 |                 )
 75 | 
 76 |             repo = Repository(
 77 |                 name=f"{parent}/repositories/{name}",
 78 |                 format=getattr(Repository.Format, format.upper()),
 79 |                 description=description,
 80 |             )
 81 | 
 82 |             request = CreateRepositoryRequest(
 83 |                 parent=parent, repository_id=name, repository=repo
 84 |             )
 85 | 
 86 |             operation = client.create_repository(request=request)
 87 |             result = operation.result()  # Wait for operation to complete
 88 | 
 89 |             return json.dumps(
 90 |                 {
 91 |                     "name": result.name.split("/")[-1],
 92 |                     "format": result.format.name
 93 |                     if hasattr(result.format, "name")
 94 |                     else str(result.format),
 95 |                     "description": result.description,
 96 |                     "create_time": result.create_time.isoformat()
 97 |                     if result.create_time
 98 |                     else None,
 99 |                     "update_time": result.update_time.isoformat()
100 |                     if result.update_time
101 |                     else None,
102 |                 },
103 |                 indent=2,
104 |             )
105 | 
106 |         except Exception as e:
107 |             return json.dumps({"error": str(e)}, indent=2)
108 | 
109 | 
110 | #     @mcp_instance.tool()
111 | #     def get_artifact_repository(
112 | #         project_id: str = None,
113 | #         location: str = None,
114 | #         repository_id: str = None,
115 | #     ) -> str:
116 | #         """Get details about a specific Artifact Registry repository"""
117 | #         try:
118 | #             # Get the client from the context
119 | #             clients = ctx.lifespan_context["clients"]
120 | #             client = clients.artifactregistry
121 | 
122 | #             # Use context values if parameters not provided
123 | #             project_id = project_id or ctx.lifespan_context["project_id"]
124 | #             location = location or ctx.lifespan_context["location"]
125 | 
126 | #             if not repository_id:
127 | #                 return json.dumps({"error": "Repository ID is required"}, indent=2)
128 | 
129 | #             name = f"projects/{project_id}/locations/{location}/repositories/{repository_id}"
130 | 
131 | #             repo = client.get_repository(name=name)
132 | #             result = {
133 | #                 "name": repo.name.split("/")[-1],
134 | #                 "format": repo.format.name
135 | #                 if hasattr(repo.format, "name")
136 | #                 else str(repo.format),
137 | #                 "description": repo.description,
138 | #                 "create_time": repo.create_time.isoformat()
139 | #                 if repo.create_time
140 | #                 else None,
141 | #                 "update_time": repo.update_time.isoformat()
142 | #                 if repo.update_time
143 | #                 else None,
144 | #                 "kms_key_name": repo.kms_key_name,
145 | #                 "labels": dict(repo.labels) if repo.labels else {},
146 | #             }
147 | #             return json.dumps(result, indent=2)
148 | #         except Exception as e:
149 | #             return json.dumps({"error": str(e)}, indent=2)
150 | 
151 | #     @mcp_instance.tool()
152 | #     def upload_artifact(
153 | #         repo_name: str,
154 | #         artifact_path: str,
155 | #         package: str = "",
156 | #         version: str = "",
157 | #         project_id: str = None,
158 | #         location: str = None,
159 | #     ) -> str:
160 | #         """
161 | #         Upload an artifact to Artifact Registry
162 | 
163 | #         Args:
164 | #             project_id: GCP project ID
165 | #             location: GCP region (e.g., us-central1)
166 | #             repo_name: Name of the repository
167 | #             artifact_path: Local path to the artifact file
168 | #             package: Package name (optional)
169 | #             version: Version string (optional)
170 | #         """
171 | #         import os
172 | 
173 | #         try:
174 | #             # Get the client from the context
175 | #             clients = ctx.lifespan_context["clients"]
176 | #             client = clients.artifactregistry
177 | 
178 | #             # Use context values if parameters not provided
179 | #             project_id = project_id or ctx.lifespan_context["project_id"]
180 | #             location = location or ctx.lifespan_context["location"]
181 | 
182 | #             if not os.path.exists(artifact_path):
183 | #                 return json.dumps(
184 | #                     {"status": "error", "message": f"File not found: {artifact_path}"}
185 | #                 )
186 | 
187 | #             filename = os.path.basename(artifact_path)
188 | #             file_size = os.path.getsize(artifact_path)
189 | 
190 | #             parent = (
191 | #                 f"projects/{project_id}/locations/{location}/repositories/{repo_name}"
192 | #             )
193 | 
194 | #             with open(artifact_path, "rb") as f:
195 | #                 file_contents = f.read()
196 | #                 # Use the standard client for upload
197 | #                 result = client.upload_artifact(
198 | #                     parent=parent, contents=file_contents, artifact_path=filename
199 | #                 )
200 | 
201 | #                 return json.dumps(
202 | #                     {
203 | #                         "status": "success",
204 | #                         "uri": result.uri if hasattr(result, "uri") else None,
205 | #                         "message": f"Successfully uploaded {filename}",
206 | #                     },
207 | #                     indent=2,
208 | #                 )
209 | #         except Exception as e:
210 | #             return json.dumps({"status": "error", "message": str(e)}, indent=2)
211 | 
212 | #     # Prompts
213 | #     @mcp_instance.prompt()
214 | #     def create_repository_prompt(location: str = "us-central1") -> str:
215 | #         """Prompt for creating a new Artifact Registry repository"""
216 | #         return f"""
217 | #     I need to create a new Artifact Registry repository in {location}.
218 | 
219 | #     Please help me with:
220 | #     1. What formats are available (Docker, NPM, Maven, etc.)
221 | #     2. Best practices for naming repositories
222 | #     3. Recommendations for labels I should apply
223 | #     4. Steps to create the repository
224 | #     """
225 | 
226 | #     @mcp_instance.prompt()
227 | #     def upload_artifact_prompt(repo_format: str = "DOCKER") -> str:
228 | #         """Prompt for help with uploading artifacts"""
229 | #         return f"""
230 | #     I need to upload a {repo_format.lower()} artifact to my Artifact Registry repository.
231 | 
232 | #     Please help me understand:
233 | #     1. The recommended way to upload {repo_format.lower()} artifacts
234 | #     2. Any naming conventions I should follow
235 | #     3. How to ensure proper versioning
236 | #     4. The steps to perform the upload
237 | #     """
238 | 
239 | 
240 | # import json
241 | # import os
242 | # from typing import Optional
243 | 
244 | # from google.cloud import artifactregistry_v1
245 | # from mcp.server.fastmcp import Context
246 | 
247 | # # Instead of importing from core.server
248 | 
249 | 
250 | # # Resources
251 | # @mcp.resource("gcp://artifactregistry/{project_id}/{location}/repositories")
252 | # def list_repositories_resource(
253 | #     project_id: str = None, location: str = None, ctx: Context = None
254 | # ) -> str:
255 | #     """List all Artifact Registry repositories in a specific location"""
256 | #     client = ctx.clients.artifactregistry
257 | #     project_id = project_id or ctx.clients.project_id
258 | #     location = location or ctx.clients.location
259 | 
260 | #     parent = f"projects/{project_id}/locations/{location}"
261 | #     repositories = client.list_repositories(parent=parent)
262 | 
263 | #     result = []
264 | #     for repo in repositories:
265 | #         result.append(
266 | #             {
267 | #                 "name": repo.name.split("/")[-1],
268 | #                 "format": repo.format.name,
269 | #                 "description": repo.description,
270 | #                 "create_time": repo.create_time.isoformat()
271 | #                 if repo.create_time
272 | #                 else None,
273 | #                 "update_time": repo.update_time.isoformat()
274 | #                 if repo.update_time
275 | #                 else None,
276 | #                 "kms_key_name": repo.kms_key_name,
277 | #                 "labels": dict(repo.labels) if repo.labels else {},
278 | #             }
279 | #         )
280 | 
281 | #     return json.dumps(result, indent=2)
282 | 
283 | 
284 | # @mcp.resource("gcp://artifactregistry/{project_id}/{location}/repository/{repo_name}")
285 | # def get_repository_resource(
286 | #     repo_name: str, project_id: str = None, location: str = None, ctx: Context = None
287 | # ) -> str:
288 | #     """Get details for a specific Artifact Registry repository"""
289 | #     client = ctx.clients.artifactregistry
290 | #     project_id = project_id or ctx.clients.project_id
291 | #     location = location or ctx.clients.location
292 | 
293 | #     name = f"projects/{project_id}/locations/{location}/repositories/{repo_name}"
294 | 
295 | #     try:
296 | #         repo = client.get_repository(name=name)
297 | #         result = {
298 | #             "name": repo.name.split("/")[-1],
299 | #             "format": repo.format.name,
300 | #             "description": repo.description,
301 | #             "create_time": repo.create_time.isoformat() if repo.create_time else None,
302 | #             "update_time": repo.update_time.isoformat() if repo.update_time else None,
303 | #             "kms_key_name": repo.kms_key_name,
304 | #             "labels": dict(repo.labels) if repo.labels else {},
305 | #         }
306 | #         return json.dumps(result, indent=2)
307 | #     except Exception as e:
308 | #         return json.dumps({"error": str(e)})
309 | 
310 | 
311 | # @mcp.resource(
312 | #     "gcp://artifactregistry/{project_id}/{location}/repository/{repo_name}/packages"
313 | # )
314 | # def list_packages_resource(
315 | #     repo_name: str, project_id: str = None, location: str = None, ctx: Context = None
316 | # ) -> str:
317 | #     """List packages in a specific Artifact Registry repository"""
318 | #     client = ctx.clients.artifactregistry
319 | #     project_id = project_id or ctx.clients.project_id
320 | #     location = location or ctx.clients.location
321 | 
322 | #     parent = f"projects/{project_id}/locations/{location}/repositories/{repo_name}"
323 | #     packages = client.list_packages(parent=parent)
324 | 
325 | #     result = []
326 | #     for package in packages:
327 | #         result.append(
328 | #             {
329 | #                 "name": package.name.split("/")[-1],
330 | #                 "display_name": package.display_name,
331 | #                 "create_time": package.create_time.isoformat()
332 | #                 if package.create_time
333 | #                 else None,
334 | #                 "update_time": package.update_time.isoformat()
335 | #                 if package.update_time
336 | #                 else None,
337 | #             }
338 | #         )
339 | 
340 | #     return json.dumps(result, indent=2)
341 | 
342 | 
343 | # # Tools
344 | # @mcp.tool()
345 | # async def create_repository(
346 | #     repo_name: str,
347 | #     format: str = "DOCKER",
348 | #     description: str = "",
349 | #     labels: Optional[dict] = None,
350 | #     project_id: str = None,
351 | #     location: str = None,
352 | #     ctx: Context = None,
353 | # ) -> str:
354 | #     """
355 | #     Create an Artifact Registry repository
356 | 
357 | #     Args:
358 | #         project_id: GCP project ID
359 | #         location: GCP region (e.g., us-central1)
360 | #         repo_name: Name for the new repository
361 | #         format: Repository format (DOCKER, NPM, PYTHON, MAVEN, APT, YUM, GO)
362 | #         description: Optional description for the repository
363 | #         labels: Optional key-value pairs for repository labels
364 | #     """
365 | #     client = ctx.clients.artifactregistry
366 | #     project_id = project_id or ctx.clients.project_id
367 | #     location = location or ctx.clients.location
368 | 
369 | #     # Validate format
370 | #     try:
371 | #         format_enum = artifactregistry_v1.Repository.Format[format]
372 | #     except KeyError:
373 | #         return json.dumps(
374 | #             {
375 | #                 "status": "error",
376 | #                 "message": f"Invalid format: {format}. Valid formats are: {', '.join(artifactregistry_v1.Repository.Format._member_names_)}",
377 | #             }
378 | #         )
379 | 
380 | #     # Create repository
381 | #     parent = f"projects/{project_id}/locations/{location}"
382 | #     repository = artifactregistry_v1.Repository(
383 | #         format=format_enum,
384 | #         description=description,
385 | #     )
386 | 
387 | #     if labels:
388 | #         repository.labels = labels
389 | 
390 | #     request = artifactregistry_v1.CreateRepositoryRequest(
391 | #         parent=parent, repository_id=repo_name, repository=repository
392 | #     )
393 | 
394 | #     try:
395 | #         ctx.info(f"Creating repository {repo_name} in {location}...")
396 | #         response = client.create_repository(request=request)
397 | 
398 | #         return json.dumps(
399 | #             {
400 | #                 "status": "success",
401 | #                 "name": response.name,
402 | #                 "format": response.format.name,
403 | #                 "description": response.description,
404 | #                 "create_time": response.create_time.isoformat()
405 | #                 if response.create_time
406 | #                 else None,
407 | #             },
408 | #             indent=2,
409 | #         )
410 | #     except Exception as e:
411 | #         return json.dumps({"status": "error", "message": str(e)})
412 | 
413 | 
414 | # @mcp.tool()
415 | # async def list_repositories(
416 | #     project_id: str = None, location: str = None, ctx: Context = None
417 | # ) -> str:
418 | #     """
419 | #     List Artifact Registry repositories in a specific location
420 | 
421 | #     Args:
422 | #         project_id: GCP project ID
423 | #         location: GCP region (e.g., us-central1)
424 | #     """
425 | #     client = ctx.clients.artifactregistry
426 | #     project_id = project_id or ctx.clients.project_id
427 | #     location = location or ctx.clients.location
428 | 
429 | #     parent = f"projects/{project_id}/locations/{location}"
430 | 
431 | #     try:
432 | #         ctx.info(f"Listing repositories in {location}...")
433 | #         repositories = client.list_repositories(parent=parent)
434 | 
435 | #         result = []
436 | #         for repo in repositories:
437 | #             result.append(
438 | #                 {
439 | #                     "name": repo.name.split("/")[-1],
440 | #                     "format": repo.format.name,
441 | #                     "description": repo.description,
442 | #                     "create_time": repo.create_time.isoformat()
443 | #                     if repo.create_time
444 | #                     else None,
445 | #                 }
446 | #             )
447 | 
448 | #         return json.dumps(
449 | #             {"status": "success", "repositories": result, "count": len(result)},
450 | #             indent=2,
451 | #         )
452 | #     except Exception as e:
453 | #         return json.dumps({"status": "error", "message": str(e)})
454 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_bigquery.py:
--------------------------------------------------------------------------------

```python
  1 | import json
  2 | from typing import Any, Dict, List, Optional
  3 | 
  4 | from google.cloud import bigquery
  5 | from google.cloud.exceptions import NotFound
  6 | from services import client_instances
  7 | 
  8 | 
  9 | def register(mcp_instance):
 10 |     """Register all BigQuery resources and tools with the MCP instance."""
 11 | 
 12 |     # Resources
 13 |     @mcp_instance.resource("gcp://bigquery/{project_id}/datasets")
 14 |     def list_datasets_resource(project_id: str = None) -> str:
 15 |         """List all BigQuery datasets in a project"""
 16 |         try:
 17 |             # Get client from client_instances
 18 |             client = client_instances.get_clients().bigquery
 19 |             project_id = project_id or client_instances.get_project_id()
 20 | 
 21 |             datasets = list(client.list_datasets())
 22 | 
 23 |             result = []
 24 |             for dataset in datasets:
 25 |                 result.append(
 26 |                     {
 27 |                         "id": dataset.dataset_id,
 28 |                         "full_id": dataset.full_dataset_id,
 29 |                         "friendly_name": dataset.friendly_name,
 30 |                         "location": dataset.location,
 31 |                         "labels": dict(dataset.labels) if dataset.labels else {},
 32 |                         "created": dataset.created.isoformat()
 33 |                         if dataset.created
 34 |                         else None,
 35 |                     }
 36 |                 )
 37 | 
 38 |             return json.dumps(result, indent=2)
 39 |         except Exception as e:
 40 |             return json.dumps({"error": str(e)}, indent=2)
 41 | 
 42 |     @mcp_instance.resource("gcp://bigquery/{project_id}/dataset/{dataset_id}")
 43 |     def get_dataset_resource(project_id: str = None, dataset_id: str = None) -> str:
 44 |         """Get details for a specific BigQuery dataset"""
 45 |         try:
 46 |             # Get client from client_instances
 47 |             client = client_instances.get_clients().bigquery
 48 |             project_id = project_id or client_instances.get_project_id()
 49 | 
 50 |             dataset_ref = client.dataset(dataset_id)
 51 |             dataset = client.get_dataset(dataset_ref)
 52 | 
 53 |             result = {
 54 |                 "id": dataset.dataset_id,
 55 |                 "full_id": dataset.full_dataset_id,
 56 |                 "friendly_name": dataset.friendly_name,
 57 |                 "description": dataset.description,
 58 |                 "location": dataset.location,
 59 |                 "labels": dict(dataset.labels) if dataset.labels else {},
 60 |                 "created": dataset.created.isoformat() if dataset.created else None,
 61 |                 "modified": dataset.modified.isoformat() if dataset.modified else None,
 62 |                 "default_table_expiration_ms": dataset.default_table_expiration_ms,
 63 |                 "default_partition_expiration_ms": dataset.default_partition_expiration_ms,
 64 |             }
 65 |             return json.dumps(result, indent=2)
 66 |         except NotFound:
 67 |             return json.dumps({"error": f"Dataset {dataset_id} not found"}, indent=2)
 68 |         except Exception as e:
 69 |             return json.dumps({"error": str(e)}, indent=2)
 70 | 
 71 |     @mcp_instance.resource("gcp://bigquery/{project_id}/dataset/{dataset_id}/tables")
 72 |     def list_tables_resource(project_id: str = None, dataset_id: str = None) -> str:
 73 |         """List all tables in a BigQuery dataset"""
 74 |         try:
 75 |             # Get client from client_instances
 76 |             client = client_instances.get_clients().bigquery
 77 |             project_id = project_id or client_instances.get_project_id()
 78 | 
 79 |             tables = list(client.list_tables(dataset_id))
 80 | 
 81 |             result = []
 82 |             for table in tables:
 83 |                 result.append(
 84 |                     {
 85 |                         "id": table.table_id,
 86 |                         "full_id": f"{table.project}.{table.dataset_id}.{table.table_id}",
 87 |                         "table_type": table.table_type,
 88 |                     }
 89 |                 )
 90 | 
 91 |             return json.dumps(result, indent=2)
 92 |         except NotFound:
 93 |             return json.dumps({"error": f"Dataset {dataset_id} not found"}, indent=2)
 94 |         except Exception as e:
 95 |             return json.dumps({"error": str(e)}, indent=2)
 96 | 
 97 |     @mcp_instance.resource(
 98 |         "gcp://bigquery/{project_id}/dataset/{dataset_id}/table/{table_id}"
 99 |     )
100 |     def get_table_resource(
101 |         project_id: str = None, dataset_id: str = None, table_id: str = None
102 |     ) -> str:
103 |         """Get details for a specific BigQuery table"""
104 |         try:
105 |             # Get client from client_instances
106 |             client = client_instances.get_clients().bigquery
107 |             project_id = project_id or client_instances.get_project_id()
108 | 
109 |             table_ref = client.dataset(dataset_id).table(table_id)
110 |             table = client.get_table(table_ref)
111 | 
112 |             # Extract schema information
113 |             schema_fields = []
114 |             for field in table.schema:
115 |                 schema_fields.append(
116 |                     {
117 |                         "name": field.name,
118 |                         "type": field.field_type,
119 |                         "mode": field.mode,
120 |                         "description": field.description,
121 |                     }
122 |                 )
123 | 
124 |             result = {
125 |                 "id": table.table_id,
126 |                 "full_id": f"{table.project}.{table.dataset_id}.{table.table_id}",
127 |                 "friendly_name": table.friendly_name,
128 |                 "description": table.description,
129 |                 "num_rows": table.num_rows,
130 |                 "num_bytes": table.num_bytes,
131 |                 "table_type": table.table_type,
132 |                 "created": table.created.isoformat() if table.created else None,
133 |                 "modified": table.modified.isoformat() if table.modified else None,
134 |                 "expires": table.expires.isoformat() if table.expires else None,
135 |                 "schema": schema_fields,
136 |                 "labels": dict(table.labels) if table.labels else {},
137 |             }
138 |             return json.dumps(result, indent=2)
139 |         except NotFound:
140 |             return json.dumps(
141 |                 {"error": f"Table {dataset_id}.{table_id} not found"}, indent=2
142 |             )
143 |         except Exception as e:
144 |             return json.dumps({"error": str(e)}, indent=2)
145 | 
146 |     # Tools
147 |     @mcp_instance.tool()
148 |     def run_query(
149 |         query: str,
150 |         project_id: str = None,
151 |         location: str = None,
152 |         max_results: int = 100,
153 |         use_legacy_sql: bool = False,
154 |         timeout_ms: int = 30000,
155 |     ) -> str:
156 |         """
157 |         Run a BigQuery query and return the results
158 | 
159 |         Args:
160 |             query: SQL query to execute
161 |             project_id: GCP project ID (defaults to configured project)
162 |             location: Optional BigQuery location (us, eu, etc.)
163 |             max_results: Maximum number of rows to return
164 |             use_legacy_sql: Whether to use legacy SQL syntax
165 |             timeout_ms: Query timeout in milliseconds
166 |         """
167 |         try:
168 |             # Get client from client_instances
169 |             client = client_instances.get_clients().bigquery
170 |             project_id = project_id or client_instances.get_project_id()
171 |             location = location or client_instances.get_location()
172 | 
173 |             job_config = bigquery.QueryJobConfig(
174 |                 use_legacy_sql=use_legacy_sql,
175 |             )
176 | 
177 |             # Log info (similar to ctx.info)
178 |             print(f"Running query: {query[:100]}...")
179 | 
180 |             query_job = client.query(
181 |                 query,
182 |                 job_config=job_config,
183 |                 location=location,
184 |                 timeout=timeout_ms / 1000.0,
185 |             )
186 | 
187 |             # Wait for the query to complete
188 |             results = query_job.result(max_results=max_results)
189 | 
190 |             # Get the schema
191 |             schema = [field.name for field in results.schema]
192 | 
193 |             # Convert rows to a list of dictionaries
194 |             rows = []
195 |             for row in results:
196 |                 row_dict = {}
197 |                 for key in schema:
198 |                     value = row[key]
199 |                     if hasattr(value, "isoformat"):  # Handle datetime objects
200 |                         row_dict[key] = value.isoformat()
201 |                     else:
202 |                         row_dict[key] = value
203 |                 rows.append(row_dict)
204 | 
205 |             # Create summary statistics
206 |             stats = {
207 |                 "total_rows": query_job.total_rows,
208 |                 "total_bytes_processed": query_job.total_bytes_processed,
209 |                 "total_bytes_billed": query_job.total_bytes_billed,
210 |                 "billing_tier": query_job.billing_tier,
211 |                 "created": query_job.created.isoformat() if query_job.created else None,
212 |                 "started": query_job.started.isoformat() if query_job.started else None,
213 |                 "ended": query_job.ended.isoformat() if query_job.ended else None,
214 |                 "duration_ms": (query_job.ended - query_job.started).total_seconds()
215 |                 * 1000
216 |                 if query_job.started and query_job.ended
217 |                 else None,
218 |             }
219 | 
220 |             return json.dumps(
221 |                 {
222 |                     "status": "success",
223 |                     "schema": schema,
224 |                     "rows": rows,
225 |                     "returned_rows": len(rows),
226 |                     "stats": stats,
227 |                 },
228 |                 indent=2,
229 |             )
230 |         except Exception as e:
231 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
232 | 
233 |     @mcp_instance.tool()
234 |     def create_dataset(
235 |         dataset_id: str,
236 |         project_id: str = None,
237 |         location: str = None,
238 |         description: str = "",
239 |         friendly_name: str = None,
240 |         labels: Optional[Dict[str, str]] = None,
241 |         default_table_expiration_ms: Optional[int] = None,
242 |     ) -> str:
243 |         """
244 |         Create a new BigQuery dataset
245 | 
246 |         Args:
247 |             dataset_id: ID for the new dataset
248 |             project_id: GCP project ID (defaults to configured project)
249 |             location: Dataset location (US, EU, asia-northeast1, etc.)
250 |             description: Optional dataset description
251 |             friendly_name: Optional user-friendly name
252 |             labels: Optional key-value pairs for dataset labels
253 |             default_table_expiration_ms: Default expiration time for tables in milliseconds
254 |         """
255 |         try:
256 |             # Get client from client_instances
257 |             client = client_instances.get_clients().bigquery
258 |             project_id = project_id or client_instances.get_project_id()
259 |             location = location or client_instances.get_location()
260 | 
261 |             dataset = bigquery.Dataset(f"{project_id}.{dataset_id}")
262 |             dataset.location = location
263 | 
264 |             if description:
265 |                 dataset.description = description
266 |             if friendly_name:
267 |                 dataset.friendly_name = friendly_name
268 |             if labels:
269 |                 dataset.labels = labels
270 |             if default_table_expiration_ms:
271 |                 dataset.default_table_expiration_ms = default_table_expiration_ms
272 | 
273 |             # Log info (similar to ctx.info)
274 |             print(f"Creating dataset {dataset_id} in {location}...")
275 |             dataset = client.create_dataset(dataset)
276 | 
277 |             return json.dumps(
278 |                 {
279 |                     "status": "success",
280 |                     "dataset_id": dataset.dataset_id,
281 |                     "full_dataset_id": dataset.full_dataset_id,
282 |                     "location": dataset.location,
283 |                     "created": dataset.created.isoformat() if dataset.created else None,
284 |                 },
285 |                 indent=2,
286 |             )
287 |         except Exception as e:
288 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
289 | 
290 |     @mcp_instance.tool()
291 |     def create_table(
292 |         dataset_id: str,
293 |         table_id: str,
294 |         schema_fields: List[Dict[str, Any]],
295 |         project_id: str = None,
296 |         description: str = "",
297 |         friendly_name: str = None,
298 |         expiration_ms: Optional[int] = None,
299 |         labels: Optional[Dict[str, str]] = None,
300 |         clustering_fields: Optional[List[str]] = None,
301 |         time_partitioning_field: Optional[str] = None,
302 |         time_partitioning_type: str = "DAY",
303 |     ) -> str:
304 |         """
305 |         Create a new BigQuery table
306 | 
307 |         Args:
308 |             dataset_id: Dataset ID where the table will be created
309 |             table_id: ID for the new table
310 |             schema_fields: List of field definitions, each with name, type, mode, and description
311 |             project_id: GCP project ID (defaults to configured project)
312 |             description: Optional table description
313 |             friendly_name: Optional user-friendly name
314 |             expiration_ms: Optional table expiration time in milliseconds
315 |             labels: Optional key-value pairs for table labels
316 |             clustering_fields: Optional list of fields to cluster by
317 |             time_partitioning_field: Optional field to use for time-based partitioning
318 |             time_partitioning_type: Partitioning type (DAY, HOUR, MONTH, YEAR)
319 | 
320 |         Example schema_fields:
321 |         [
322 |             {"name": "name", "type": "STRING", "mode": "REQUIRED", "description": "Name field"},
323 |             {"name": "age", "type": "INTEGER", "mode": "NULLABLE", "description": "Age field"}
324 |         ]
325 |         """
326 |         try:
327 |             # Get client from client_instances
328 |             client = client_instances.get_clients().bigquery
329 |             project_id = project_id or client_instances.get_project_id()
330 | 
331 |             # Convert schema_fields to SchemaField objects
332 |             schema = []
333 |             for field in schema_fields:
334 |                 schema.append(
335 |                     bigquery.SchemaField(
336 |                         name=field["name"],
337 |                         field_type=field["type"],
338 |                         mode=field.get("mode", "NULLABLE"),
339 |                         description=field.get("description", ""),
340 |                     )
341 |                 )
342 | 
343 |             # Create table reference
344 |             table_ref = client.dataset(dataset_id).table(table_id)
345 |             table = bigquery.Table(table_ref, schema=schema)
346 | 
347 |             # Set table properties
348 |             if description:
349 |                 table.description = description
350 |             if friendly_name:
351 |                 table.friendly_name = friendly_name
352 |             if expiration_ms:
353 |                 table.expires = expiration_ms
354 |             if labels:
355 |                 table.labels = labels
356 | 
357 |             # Set clustering if specified
358 |             if clustering_fields:
359 |                 table.clustering_fields = clustering_fields
360 | 
361 |             # Set time partitioning if specified
362 |             if time_partitioning_field:
363 |                 if time_partitioning_type == "DAY":
364 |                     table.time_partitioning = bigquery.TimePartitioning(
365 |                         type_=bigquery.TimePartitioningType.DAY,
366 |                         field=time_partitioning_field,
367 |                     )
368 |                 elif time_partitioning_type == "HOUR":
369 |                     table.time_partitioning = bigquery.TimePartitioning(
370 |                         type_=bigquery.TimePartitioningType.HOUR,
371 |                         field=time_partitioning_field,
372 |                     )
373 |                 elif time_partitioning_type == "MONTH":
374 |                     table.time_partitioning = bigquery.TimePartitioning(
375 |                         type_=bigquery.TimePartitioningType.MONTH,
376 |                         field=time_partitioning_field,
377 |                     )
378 |                 elif time_partitioning_type == "YEAR":
379 |                     table.time_partitioning = bigquery.TimePartitioning(
380 |                         type_=bigquery.TimePartitioningType.YEAR,
381 |                         field=time_partitioning_field,
382 |                     )
383 | 
384 |             # Log info (similar to ctx.info)
385 |             print(f"Creating table {dataset_id}.{table_id}...")
386 |             table = client.create_table(table)
387 | 
388 |             return json.dumps(
389 |                 {
390 |                     "status": "success",
391 |                     "table_id": table.table_id,
392 |                     "full_table_id": f"{table.project}.{table.dataset_id}.{table.table_id}",
393 |                     "created": table.created.isoformat() if table.created else None,
394 |                 },
395 |                 indent=2,
396 |             )
397 |         except Exception as e:
398 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
399 | 
400 |     @mcp_instance.tool()
401 |     def delete_table(dataset_id: str, table_id: str, project_id: str = None) -> str:
402 |         """
403 |         Delete a BigQuery table
404 | 
405 |         Args:
406 |             dataset_id: Dataset ID containing the table
407 |             table_id: ID of the table to delete
408 |             project_id: GCP project ID (defaults to configured project)
409 |         """
410 |         try:
411 |             # Get client from client_instances
412 |             client = client_instances.get_clients().bigquery
413 |             project_id = project_id or client_instances.get_project_id()
414 | 
415 |             table_ref = client.dataset(dataset_id).table(table_id)
416 | 
417 |             # Log info (similar to ctx.info)
418 |             print(f"Deleting table {dataset_id}.{table_id}...")
419 |             client.delete_table(table_ref)
420 | 
421 |             return json.dumps(
422 |                 {
423 |                     "status": "success",
424 |                     "message": f"Table {project_id}.{dataset_id}.{table_id} successfully deleted",
425 |                 },
426 |                 indent=2,
427 |             )
428 |         except NotFound:
429 |             return json.dumps(
430 |                 {
431 |                     "status": "error",
432 |                     "message": f"Table {project_id}.{dataset_id}.{table_id} not found",
433 |                 },
434 |                 indent=2,
435 |             )
436 |         except Exception as e:
437 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
438 | 
439 |     @mcp_instance.tool()
440 |     def load_table_from_json(
441 |         dataset_id: str,
442 |         table_id: str,
443 |         json_data: List[Dict[str, Any]],
444 |         project_id: str = None,
445 |         schema_fields: Optional[List[Dict[str, Any]]] = None,
446 |         write_disposition: str = "WRITE_APPEND",
447 |     ) -> str:
448 |         """
449 |         Load data into a BigQuery table from JSON data
450 | 
451 |         Args:
452 |             dataset_id: Dataset ID containing the table
453 |             table_id: ID of the table to load data into
454 |             json_data: List of dictionaries representing rows to insert
455 |             project_id: GCP project ID (defaults to configured project)
456 |             schema_fields: Optional schema definition (if not using existing table schema)
457 |             write_disposition: How to handle existing data (WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY)
458 |         """
459 |         try:
460 |             # Get client from client_instances
461 |             client = client_instances.get_clients().bigquery
462 |             project_id = project_id or client_instances.get_project_id()
463 | 
464 |             table_ref = client.dataset(dataset_id).table(table_id)
465 | 
466 |             # Convert write_disposition to the appropriate enum
467 |             if write_disposition == "WRITE_TRUNCATE":
468 |                 disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
469 |             elif write_disposition == "WRITE_APPEND":
470 |                 disposition = bigquery.WriteDisposition.WRITE_APPEND
471 |             elif write_disposition == "WRITE_EMPTY":
472 |                 disposition = bigquery.WriteDisposition.WRITE_EMPTY
473 |             else:
474 |                 return json.dumps(
475 |                     {
476 |                         "status": "error",
477 |                         "message": f"Invalid write_disposition: {write_disposition}. Use WRITE_TRUNCATE, WRITE_APPEND, or WRITE_EMPTY.",
478 |                     },
479 |                     indent=2,
480 |                 )
481 | 
482 |             # Create job config
483 |             job_config = bigquery.LoadJobConfig(
484 |                 write_disposition=disposition,
485 |                 source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
486 |             )
487 | 
488 |             # Set schema if provided
489 |             if schema_fields:
490 |                 schema = []
491 |                 for field in schema_fields:
492 |                     schema.append(
493 |                         bigquery.SchemaField(
494 |                             name=field["name"],
495 |                             field_type=field["type"],
496 |                             mode=field.get("mode", "NULLABLE"),
497 |                             description=field.get("description", ""),
498 |                         )
499 |                     )
500 |                 job_config.schema = schema
501 | 
502 |             # Convert JSON data to newline-delimited JSON (not needed but keeping the log)
503 |             print(f"Loading {len(json_data)} rows into {dataset_id}.{table_id}...")
504 | 
505 |             # Create and run the load job
506 |             load_job = client.load_table_from_json(
507 |                 json_data, table_ref, job_config=job_config
508 |             )
509 | 
510 |             # Wait for the job to complete
511 |             load_job.result()
512 | 
513 |             # Get updated table info
514 |             table = client.get_table(table_ref)
515 | 
516 |             return json.dumps(
517 |                 {
518 |                     "status": "success",
519 |                     "rows_loaded": len(json_data),
520 |                     "total_rows": table.num_rows,
521 |                     "message": f"Successfully loaded data into {project_id}.{dataset_id}.{table_id}",
522 |                 },
523 |                 indent=2,
524 |             )
525 |         except Exception as e:
526 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
527 | 
528 |     @mcp_instance.tool()
529 |     def export_table_to_csv(
530 |         dataset_id: str,
531 |         table_id: str,
532 |         destination_uri: str,
533 |         project_id: str = None,
534 |         print_header: bool = True,
535 |         field_delimiter: str = ",",
536 |     ) -> str:
537 |         """
538 |         Export a BigQuery table to Cloud Storage as CSV
539 | 
540 |         Args:
541 |             dataset_id: Dataset ID containing the table
542 |             table_id: ID of the table to export
543 |             destination_uri: GCS URI (gs://bucket/path)
544 |             project_id: GCP project ID (defaults to configured project)
545 |             print_header: Whether to include column headers
546 |             field_delimiter: Delimiter character for fields
547 |         """
548 |         try:
549 |             # Get client from client_instances
550 |             client = client_instances.get_clients().bigquery
551 |             project_id = project_id or client_instances.get_project_id()
552 | 
553 |             table_ref = client.dataset(dataset_id).table(table_id)
554 | 
555 |             # Validate destination URI
556 |             if not destination_uri.startswith("gs://"):
557 |                 return json.dumps(
558 |                     {
559 |                         "status": "error",
560 |                         "message": "destination_uri must start with gs://",
561 |                     },
562 |                     indent=2,
563 |                 )
564 | 
565 |             job_config = bigquery.ExtractJobConfig()
566 |             job_config.destination_format = bigquery.DestinationFormat.CSV
567 |             job_config.print_header = print_header
568 |             job_config.field_delimiter = field_delimiter
569 | 
570 |             # Log info (similar to ctx.info)
571 |             print(f"Exporting {dataset_id}.{table_id} to {destination_uri}...")
572 | 
573 |             # Create and run the extract job
574 |             extract_job = client.extract_table(
575 |                 table_ref, destination_uri, job_config=job_config
576 |             )
577 | 
578 |             # Wait for the job to complete
579 |             extract_job.result()
580 | 
581 |             return json.dumps(
582 |                 {
583 |                     "status": "success",
584 |                     "destination": destination_uri,
585 |                     "message": f"Successfully exported {project_id}.{dataset_id}.{table_id} to {destination_uri}",
586 |                 },
587 |                 indent=2,
588 |             )
589 |         except Exception as e:
590 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
591 | 
592 |     # Prompts
593 |     @mcp_instance.prompt()
594 |     def create_dataset_prompt() -> str:
595 |         """Prompt for creating a new BigQuery dataset"""
596 |         return """
597 | I need to create a new BigQuery dataset.
598 | 
599 | Please help me with:
600 | 1. Choosing an appropriate location for my dataset
601 | 2. Understanding dataset naming conventions
602 | 3. Best practices for dataset configuration (expiration, labels, etc.)
603 | 4. The process to create the dataset
604 | """
605 | 
606 |     @mcp_instance.prompt()
607 |     def query_optimization_prompt() -> str:
608 |         """Prompt for BigQuery query optimization help"""
609 |         return """
610 | I have a BigQuery query that's slow or expensive to run.
611 | 
612 | Please help me optimize it by:
613 | 1. Analyzing key factors that affect BigQuery performance
614 | 2. Identifying common patterns that lead to inefficient queries
615 | 3. Suggesting specific optimization techniques
616 | 4. Helping me understand how to use EXPLAIN plan analysis
617 | """
618 | 
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_run.py:
--------------------------------------------------------------------------------

```python
  1 | import json
  2 | from typing import Dict, Optional
  3 | 
  4 | from google.cloud import run_v2
  5 | from google.protobuf import field_mask_pb2
  6 | from services import client_instances
  7 | 
  8 | 
  9 | def register(mcp_instance):
 10 |     """Register all Cloud Run resources and tools with the MCP instance."""
 11 | 
 12 |     # Resources
 13 |     @mcp_instance.resource("gcp://cloudrun/{project_id}/{region}/services")
 14 |     def list_services_resource(project_id: str = None, region: str = None) -> str:
 15 |         """List all Cloud Run services in a specific region"""
 16 |         try:
 17 |             # Get client from client_instances
 18 |             client = client_instances.get_clients().run
 19 |             project_id = project_id or client_instances.get_project_id()
 20 |             region = region or client_instances.get_location()
 21 | 
 22 |             parent = f"projects/{project_id}/locations/{region}"
 23 |             services = client.list_services(parent=parent)
 24 | 
 25 |             result = []
 26 |             for service in services:
 27 |                 result.append(
 28 |                     {
 29 |                         "name": service.name.split("/")[-1],
 30 |                         "uid": service.uid,
 31 |                         "generation": service.generation,
 32 |                         "labels": dict(service.labels) if service.labels else {},
 33 |                         "annotations": dict(service.annotations)
 34 |                         if service.annotations
 35 |                         else {},
 36 |                         "create_time": service.create_time.isoformat()
 37 |                         if service.create_time
 38 |                         else None,
 39 |                         "update_time": service.update_time.isoformat()
 40 |                         if service.update_time
 41 |                         else None,
 42 |                         "uri": service.uri,
 43 |                     }
 44 |                 )
 45 | 
 46 |             return json.dumps(result, indent=2)
 47 |         except Exception as e:
 48 |             return json.dumps({"error": str(e)}, indent=2)
 49 | 
 50 |     @mcp_instance.resource("gcp://run/{project_id}/{location}/service/{service_name}")
 51 |     def get_service_resource(
 52 |         service_name: str, project_id: str = None, location: str = None
 53 |     ) -> str:
 54 |         """Get details for a specific Cloud Run service"""
 55 |         try:
 56 |             # Get client from client_instances
 57 |             client = client_instances.get_clients().run
 58 |             project_id = project_id or client_instances.get_project_id()
 59 |             location = location or client_instances.get_location()
 60 | 
 61 |             name = f"projects/{project_id}/locations/{location}/services/{service_name}"
 62 |             service = client.get_service(name=name)
 63 | 
 64 |             # Extract container details
 65 |             containers = []
 66 |             if service.template and service.template.containers:
 67 |                 for container in service.template.containers:
 68 |                     container_info = {
 69 |                         "image": container.image,
 70 |                         "command": list(container.command) if container.command else [],
 71 |                         "args": list(container.args) if container.args else [],
 72 |                         "env": [
 73 |                             {"name": env.name, "value": env.value}
 74 |                             for env in container.env
 75 |                         ]
 76 |                         if container.env
 77 |                         else [],
 78 |                         "resources": {
 79 |                             "limits": dict(container.resources.limits)
 80 |                             if container.resources and container.resources.limits
 81 |                             else {},
 82 |                             "cpu_idle": container.resources.cpu_idle
 83 |                             if container.resources
 84 |                             else None,
 85 |                         }
 86 |                         if container.resources
 87 |                         else {},
 88 |                         "ports": [
 89 |                             {"name": port.name, "container_port": port.container_port}
 90 |                             for port in container.ports
 91 |                         ]
 92 |                         if container.ports
 93 |                         else [],
 94 |                     }
 95 |                     containers.append(container_info)
 96 | 
 97 |             # Extract scaling details
 98 |             scaling = None
 99 |             if service.template and service.template.scaling:
100 |                 scaling = {
101 |                     "min_instance_count": service.template.scaling.min_instance_count,
102 |                     "max_instance_count": service.template.scaling.max_instance_count,
103 |                 }
104 | 
105 |             result = {
106 |                 "name": service.name.split("/")[-1],
107 |                 "uid": service.uid,
108 |                 "generation": service.generation,
109 |                 "labels": dict(service.labels) if service.labels else {},
110 |                 "annotations": dict(service.annotations) if service.annotations else {},
111 |                 "create_time": service.create_time.isoformat()
112 |                 if service.create_time
113 |                 else None,
114 |                 "update_time": service.update_time.isoformat()
115 |                 if service.update_time
116 |                 else None,
117 |                 "creator": service.creator,
118 |                 "last_modifier": service.last_modifier,
119 |                 "client": service.client,
120 |                 "client_version": service.client_version,
121 |                 "ingress": service.ingress.name if service.ingress else None,
122 |                 "launch_stage": service.launch_stage.name
123 |                 if service.launch_stage
124 |                 else None,
125 |                 "traffic": [
126 |                     {
127 |                         "type": traffic.type_.name if traffic.type_ else None,
128 |                         "revision": traffic.revision.split("/")[-1]
129 |                         if traffic.revision
130 |                         else None,
131 |                         "percent": traffic.percent,
132 |                         "tag": traffic.tag,
133 |                     }
134 |                     for traffic in service.traffic
135 |                 ]
136 |                 if service.traffic
137 |                 else [],
138 |                 "uri": service.uri,
139 |                 "template": {
140 |                     "containers": containers,
141 |                     "execution_environment": service.template.execution_environment.name
142 |                     if service.template and service.template.execution_environment
143 |                     else None,
144 |                     "vpc_access": {
145 |                         "connector": service.template.vpc_access.connector,
146 |                         "egress": service.template.vpc_access.egress.name
147 |                         if service.template.vpc_access.egress
148 |                         else None,
149 |                     }
150 |                     if service.template and service.template.vpc_access
151 |                     else None,
152 |                     "scaling": scaling,
153 |                     "timeout": f"{service.template.timeout.seconds}s {service.template.timeout.nanos}ns"
154 |                     if service.template and service.template.timeout
155 |                     else None,
156 |                     "service_account": service.template.service_account
157 |                     if service.template
158 |                     else None,
159 |                 }
160 |                 if service.template
161 |                 else {},
162 |             }
163 | 
164 |             return json.dumps(result, indent=2)
165 |         except Exception as e:
166 |             return json.dumps({"error": str(e)}, indent=2)
167 | 
168 |     @mcp_instance.resource(
169 |         "gcp://run/{project_id}/{location}/service/{service_name}/revisions"
170 |     )
171 |     def list_revisions_resource(
172 |         service_name: str, project_id: str = None, location: str = None
173 |     ) -> str:
174 |         """List revisions for a specific Cloud Run service"""
175 |         try:
176 |             # Get client from client_instances
177 |             client = client_instances.get_clients().run
178 |             project_id = project_id or client_instances.get_project_id()
179 |             location = location or client_instances.get_location()
180 | 
181 |             parent = f"projects/{project_id}/locations/{location}"
182 | 
183 |             # List all revisions
184 |             revisions = client.list_revisions(parent=parent)
185 | 
186 |             # Filter revisions for the specified service
187 |             service_revisions = []
188 |             for revision in revisions:
189 |                 # Check if this revision belongs to the specified service
190 |                 if service_name in revision.name:
191 |                     service_revisions.append(
192 |                         {
193 |                             "name": revision.name.split("/")[-1],
194 |                             "uid": revision.uid,
195 |                             "generation": revision.generation,
196 |                             "service": revision.service.split("/")[-1]
197 |                             if revision.service
198 |                             else None,
199 |                             "create_time": revision.create_time.isoformat()
200 |                             if revision.create_time
201 |                             else None,
202 |                             "update_time": revision.update_time.isoformat()
203 |                             if revision.update_time
204 |                             else None,
205 |                             "scaling": {
206 |                                 "min_instance_count": revision.scaling.min_instance_count
207 |                                 if revision.scaling
208 |                                 else None,
209 |                                 "max_instance_count": revision.scaling.max_instance_count
210 |                                 if revision.scaling
211 |                                 else None,
212 |                             }
213 |                             if revision.scaling
214 |                             else None,
215 |                             "conditions": [
216 |                                 {
217 |                                     "type": condition.type,
218 |                                     "state": condition.state.name
219 |                                     if condition.state
220 |                                     else None,
221 |                                     "message": condition.message,
222 |                                     "last_transition_time": condition.last_transition_time.isoformat()
223 |                                     if condition.last_transition_time
224 |                                     else None,
225 |                                     "severity": condition.severity.name
226 |                                     if condition.severity
227 |                                     else None,
228 |                                 }
229 |                                 for condition in revision.conditions
230 |                             ]
231 |                             if revision.conditions
232 |                             else [],
233 |                         }
234 |                     )
235 | 
236 |             return json.dumps(service_revisions, indent=2)
237 |         except Exception as e:
238 |             return json.dumps({"error": str(e)}, indent=2)
239 | 
240 |     # Tools
241 |     @mcp_instance.tool()
242 |     def create_service(
243 |         service_name: str,
244 |         image: str,
245 |         project_id: str = None,
246 |         location: str = None,
247 |         env_vars: Optional[Dict[str, str]] = None,
248 |         memory_limit: str = "512Mi",
249 |         cpu_limit: str = "1",
250 |         min_instances: int = 0,
251 |         max_instances: int = 100,
252 |         port: int = 8080,
253 |         allow_unauthenticated: bool = True,
254 |         service_account: str = None,
255 |         vpc_connector: str = None,
256 |         timeout_seconds: int = 300,
257 |     ) -> str:
258 |         """
259 |         Create a new Cloud Run service
260 | 
261 |         Args:
262 |             service_name: Name for the new service
263 |             image: Container image to deploy (e.g., gcr.io/project/image:tag)
264 |             project_id: GCP project ID (defaults to configured project)
265 |             location: GCP region (e.g., us-central1) (defaults to configured location)
266 |             env_vars: Environment variables as key-value pairs
267 |             memory_limit: Memory limit (e.g., 512Mi, 1Gi)
268 |             cpu_limit: CPU limit (e.g., 1, 2)
269 |             min_instances: Minimum number of instances
270 |             max_instances: Maximum number of instances
271 |             port: Container port
272 |             allow_unauthenticated: Whether to allow unauthenticated access
273 |             service_account: Service account email
274 |             vpc_connector: VPC connector name
275 |             timeout_seconds: Request timeout in seconds
276 |         """
277 |         try:
278 |             # Get client from client_instances
279 |             client = client_instances.get_clients().run
280 |             project_id = project_id or client_instances.get_project_id()
281 |             location = location or client_instances.get_location()
282 | 
283 |             print(f"Creating Cloud Run service {service_name} in {location}...")
284 | 
285 |             # Create container
286 |             container = run_v2.Container(
287 |                 image=image,
288 |                 resources=run_v2.ResourceRequirements(
289 |                     limits={
290 |                         "memory": memory_limit,
291 |                         "cpu": cpu_limit,
292 |                     }
293 |                 ),
294 |             )
295 | 
296 |             # Add environment variables if provided
297 |             if env_vars:
298 |                 container.env = [
299 |                     run_v2.EnvVar(name=key, value=value)
300 |                     for key, value in env_vars.items()
301 |                 ]
302 | 
303 |             # Add port
304 |             container.ports = [run_v2.ContainerPort(container_port=port)]
305 | 
306 |             # Create template
307 |             template = run_v2.RevisionTemplate(
308 |                 containers=[container],
309 |                 scaling=run_v2.RevisionScaling(
310 |                     min_instance_count=min_instances,
311 |                     max_instance_count=max_instances,
312 |                 ),
313 |                 timeout=run_v2.Duration(seconds=timeout_seconds),
314 |             )
315 | 
316 |             # Add service account if provided
317 |             if service_account:
318 |                 template.service_account = service_account
319 | 
320 |             # Add VPC connector if provided
321 |             if vpc_connector:
322 |                 template.vpc_access = run_v2.VpcAccess(
323 |                     connector=vpc_connector,
324 |                     egress=run_v2.VpcAccess.VpcEgress.ALL_TRAFFIC,
325 |                 )
326 | 
327 |             # Create service
328 |             service = run_v2.Service(
329 |                 template=template,
330 |                 ingress=run_v2.IngressTraffic.INGRESS_TRAFFIC_ALL
331 |                 if allow_unauthenticated
332 |                 else run_v2.IngressTraffic.INGRESS_TRAFFIC_INTERNAL_ONLY,
333 |             )
334 | 
335 |             # Create the service
336 |             parent = f"projects/{project_id}/locations/{location}"
337 |             operation = client.create_service(
338 |                 parent=parent,
339 |                 service_id=service_name,
340 |                 service=service,
341 |             )
342 | 
343 |             # Wait for the operation to complete
344 |             print(f"Waiting for service {service_name} to be created...")
345 |             result = operation.result()
346 | 
347 |             return json.dumps(
348 |                 {
349 |                     "status": "success",
350 |                     "name": result.name.split("/")[-1],
351 |                     "uri": result.uri,
352 |                     "create_time": result.create_time.isoformat()
353 |                     if result.create_time
354 |                     else None,
355 |                     "update_time": result.update_time.isoformat()
356 |                     if result.update_time
357 |                     else None,
358 |                 },
359 |                 indent=2,
360 |             )
361 |         except Exception as e:
362 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
363 | 
364 |     @mcp_instance.tool()
365 |     def list_services(project_id: str = None, region: str = None) -> str:
366 |         """
367 |         List Cloud Run services in a specific region
368 | 
369 |         Args:
370 |             project_id: GCP project ID (defaults to configured project)
371 |             region: GCP region (e.g., us-central1) (defaults to configured location)
372 |         """
373 |         try:
374 |             # Get client from client_instances
375 |             client = client_instances.get_clients().run
376 |             project_id = project_id or client_instances.get_project_id()
377 |             region = region or client_instances.get_location()
378 | 
379 |             parent = f"projects/{project_id}/locations/{region}"
380 | 
381 |             print(f"Listing Cloud Run services in {region}...")
382 |             services = client.list_services(parent=parent)
383 | 
384 |             result = []
385 |             for service in services:
386 |                 result.append(
387 |                     {
388 |                         "name": service.name.split("/")[-1],
389 |                         "uri": service.uri,
390 |                         "create_time": service.create_time.isoformat()
391 |                         if service.create_time
392 |                         else None,
393 |                         "update_time": service.update_time.isoformat()
394 |                         if service.update_time
395 |                         else None,
396 |                         "labels": dict(service.labels) if service.labels else {},
397 |                     }
398 |                 )
399 | 
400 |             return json.dumps(
401 |                 {"status": "success", "services": result, "count": len(result)},
402 |                 indent=2,
403 |             )
404 |         except Exception as e:
405 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
406 | 
407 |     @mcp_instance.tool()
408 |     def update_service(
409 |         service_name: str,
410 |         project_id: str = None,
411 |         region: str = None,
412 |         image: Optional[str] = None,
413 |         memory: Optional[str] = None,
414 |         cpu: Optional[str] = None,
415 |         max_instances: Optional[int] = None,
416 |         min_instances: Optional[int] = None,
417 |         env_vars: Optional[Dict[str, str]] = None,
418 |         concurrency: Optional[int] = None,
419 |         timeout_seconds: Optional[int] = None,
420 |         service_account: Optional[str] = None,
421 |         labels: Optional[Dict[str, str]] = None,
422 |     ) -> str:
423 |         """
424 |         Update an existing Cloud Run service
425 | 
426 |         Args:
427 |             service_name: Name of the service to update
428 |             project_id: GCP project ID (defaults to configured project)
429 |             region: GCP region (e.g., us-central1) (defaults to configured location)
430 |             image: New container image URL (optional)
431 |             memory: New memory allocation (optional)
432 |             cpu: New CPU allocation (optional)
433 |             max_instances: New maximum number of instances (optional)
434 |             min_instances: New minimum number of instances (optional)
435 |             env_vars: New environment variables (optional)
436 |             concurrency: New maximum concurrent requests per instance (optional)
437 |             timeout_seconds: New request timeout in seconds (optional)
438 |             service_account: New service account email (optional)
439 |             labels: New key-value labels to apply to the service (optional)
440 |         """
441 |         try:
442 |             # Get client from client_instances
443 |             client = client_instances.get_clients().run
444 |             project_id = project_id or client_instances.get_project_id()
445 |             region = region or client_instances.get_location()
446 | 
447 |             # Get the existing service
448 |             name = f"projects/{project_id}/locations/{region}/services/{service_name}"
449 | 
450 |             print(f"Getting current service configuration for {service_name}...")
451 |             service = client.get_service(name=name)
452 | 
453 |             # Track which fields are being updated
454 |             update_mask_fields = []
455 | 
456 |             # Update the container image if specified
457 |             if image and service.template and service.template.containers:
458 |                 service.template.containers[0].image = image
459 |                 update_mask_fields.append("template.containers[0].image")
460 | 
461 |             # Update resources if specified
462 |             if (memory or cpu) and service.template and service.template.containers:
463 |                 if not service.template.containers[0].resources:
464 |                     service.template.containers[
465 |                         0
466 |                     ].resources = run_v2.ResourceRequirements(limits={})
467 | 
468 |                 if not service.template.containers[0].resources.limits:
469 |                     service.template.containers[0].resources.limits = {}
470 | 
471 |                 if memory:
472 |                     service.template.containers[0].resources.limits["memory"] = memory
473 |                     update_mask_fields.append(
474 |                         "template.containers[0].resources.limits.memory"
475 |                     )
476 | 
477 |                 if cpu:
478 |                     service.template.containers[0].resources.limits["cpu"] = cpu
479 |                     update_mask_fields.append(
480 |                         "template.containers[0].resources.limits.cpu"
481 |                     )
482 | 
483 |             # Update scaling parameters
484 |             if max_instances is not None and service.template:
485 |                 service.template.max_instance_count = max_instances
486 |                 update_mask_fields.append("template.max_instance_count")
487 | 
488 |             if min_instances is not None and service.template:
489 |                 service.template.min_instance_count = min_instances
490 |                 update_mask_fields.append("template.min_instance_count")
491 | 
492 |             # Update concurrency
493 |             if concurrency is not None and service.template:
494 |                 service.template.max_instance_request_concurrency = concurrency
495 |                 update_mask_fields.append("template.max_instance_request_concurrency")
496 | 
497 |             # Update timeout
498 |             if timeout_seconds is not None and service.template:
499 |                 service.template.timeout = {"seconds": timeout_seconds}
500 |                 update_mask_fields.append("template.timeout")
501 | 
502 |             # Update service account
503 |             if service_account is not None and service.template:
504 |                 service.template.service_account = service_account
505 |                 update_mask_fields.append("template.service_account")
506 | 
507 |             # Update environment variables
508 |             if (
509 |                 env_vars is not None
510 |                 and service.template
511 |                 and service.template.containers
512 |             ):
513 |                 # Create new env var list
514 |                 new_env_vars = [
515 |                     run_v2.EnvVar(name=key, value=value)
516 |                     for key, value in env_vars.items()
517 |                 ]
518 |                 service.template.containers[0].env = new_env_vars
519 |                 update_mask_fields.append("template.containers[0].env")
520 | 
521 |             # Update labels
522 |             if labels is not None:
523 |                 service.labels = labels
524 |                 update_mask_fields.append("labels")
525 | 
526 |             # Only proceed if there are fields to update
527 |             if not update_mask_fields:
528 |                 return json.dumps(
529 |                     {"status": "info", "message": "No updates specified"}, indent=2
530 |                 )
531 | 
532 |             # Create update mask
533 |             update_mask = field_mask_pb2.FieldMask(paths=update_mask_fields)
534 | 
535 |             # Create the request
536 |             request = run_v2.UpdateServiceRequest(
537 |                 service=service, update_mask=update_mask
538 |             )
539 | 
540 |             print(f"Updating Cloud Run service {service_name}...")
541 |             operation = client.update_service(request=request)
542 | 
543 |             print("Waiting for service update to complete...")
544 |             response = operation.result()
545 | 
546 |             return json.dumps(
547 |                 {
548 |                     "status": "success",
549 |                     "name": response.name,
550 |                     "uri": response.uri,
551 |                     "updated_fields": update_mask_fields,
552 |                     "conditions": [
553 |                         {
554 |                             "type": condition.type_,
555 |                             "state": condition.state.name
556 |                             if hasattr(condition.state, "name")
557 |                             else str(condition.state),
558 |                             "message": condition.message,
559 |                         }
560 |                         for condition in response.conditions
561 |                     ]
562 |                     if response.conditions
563 |                     else [],
564 |                 },
565 |                 indent=2,
566 |             )
567 |         except Exception as e:
568 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
569 | 
570 |     @mcp_instance.tool()
571 |     def delete_service(
572 |         service_name: str, project_id: str = None, region: str = None
573 |     ) -> str:
574 |         """
575 |         Delete a Cloud Run service
576 | 
577 |         Args:
578 |             service_name: Name of the service to delete
579 |             project_id: GCP project ID (defaults to configured project)
580 |             region: GCP region (e.g., us-central1) (defaults to configured location)
581 |         """
582 |         try:
583 |             # Get client from client_instances
584 |             client = client_instances.get_clients().run
585 |             project_id = project_id or client_instances.get_project_id()
586 |             region = region or client_instances.get_location()
587 | 
588 |             name = f"projects/{project_id}/locations/{region}/services/{service_name}"
589 | 
590 |             print(f"Deleting Cloud Run service {service_name} in {region}...")
591 |             operation = client.delete_service(name=name)
592 | 
593 |             print("Waiting for service deletion to complete...")
594 |             operation.result()
595 | 
596 |             return json.dumps(
597 |                 {
598 |                     "status": "success",
599 |                     "message": f"Service {service_name} successfully deleted",
600 |                 },
601 |                 indent=2,
602 |             )
603 |         except Exception as e:
604 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
605 | 
606 |     @mcp_instance.tool()
607 |     def get_service(
608 |         service_name: str, project_id: str = None, region: str = None
609 |     ) -> str:
610 |         """
611 |         Get details of a specific Cloud Run service
612 | 
613 |         Args:
614 |             service_name: Name of the service
615 |             project_id: GCP project ID (defaults to configured project)
616 |             region: GCP region (e.g., us-central1) (defaults to configured location)
617 |         """
618 |         try:
619 |             # Get client from client_instances
620 |             client = client_instances.get_clients().run
621 |             project_id = project_id or client_instances.get_project_id()
622 |             region = region or client_instances.get_location()
623 | 
624 |             name = f"projects/{project_id}/locations/{region}/services/{service_name}"
625 | 
626 |             print(f"Getting details for Cloud Run service {service_name}...")
627 |             service = client.get_service(name=name)
628 | 
629 |             # Parse container details
630 |             containers = []
631 |             if service.template and service.template.containers:
632 |                 for container in service.template.containers:
633 |                     container_info = {
634 |                         "image": container.image,
635 |                         "resources": {
636 |                             "limits": dict(container.resources.limits)
637 |                             if container.resources and container.resources.limits
638 |                             else {}
639 |                         }
640 |                         if container.resources
641 |                         else {},
642 |                         "env_vars": {env.name: env.value for env in container.env}
643 |                         if container.env
644 |                         else {},
645 |                     }
646 |                     containers.append(container_info)
647 | 
648 |             # Parse traffic
649 |             traffic_result = []
650 |             if service.traffic:
651 |                 for traffic in service.traffic:
652 |                     traffic_info = {
653 |                         "type": traffic.type_.name
654 |                         if hasattr(traffic.type_, "name")
655 |                         else str(traffic.type_),
656 |                         "revision": traffic.revision.split("/")[-1]
657 |                         if traffic.revision
658 |                         else None,
659 |                         "percent": traffic.percent,
660 |                         "tag": traffic.tag,
661 |                     }
662 |                     traffic_result.append(traffic_info)
663 | 
664 |             return json.dumps(
665 |                 {
666 |                     "status": "success",
667 |                     "name": service.name.split("/")[-1],
668 |                     "uri": service.uri,
669 |                     "containers": containers,
670 |                     "traffic": traffic_result,
671 |                     "update_time": service.update_time.isoformat()
672 |                     if service.update_time
673 |                     else None,
674 |                 },
675 |                 indent=2,
676 |             )
677 |         except Exception as e:
678 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
679 | 
680 |     # Prompts
681 |     @mcp_instance.prompt()
682 |     def deploy_service_prompt(
683 |         service_name: str,
684 |         image: str,
685 |         min_instances: str = "0",
686 |         max_instances: str = "100",
687 |         env_vars: str = "{}",
688 |     ) -> str:
689 |         """Prompt to deploy a Cloud Run service with the given configuration"""
690 |         return f"""
691 | I need to deploy a Cloud Run service with the following configuration:
692 | 
693 | - Service name: {service_name}
694 | - Container image: {image}
695 | - Min instances: {min_instances}
696 | - Max instances: {max_instances}
697 | - Environment variables: {env_vars}
698 | 
699 | Please help me set up this service and explain the deployment process and any best practices I should follow.
700 | """
701 | 
702 |     @mcp_instance.prompt()
703 |     def check_service_status_prompt(service_name: str) -> str:
704 |         """Prompt to check the status of a deployed Cloud Run service"""
705 |         return f"""
706 | I need to check the current status of my Cloud Run service named "{service_name}".
707 | 
708 | Please provide me with:
709 | 1. Is the service currently running?
710 | 2. What is the URL to access it?
711 | 3. What revision is currently serving traffic?
712 | 4. Are there any issues with the service?
713 | """
714 | 
715 |     @mcp_instance.prompt()
716 |     def create_service_prompt(region: str = "us-central1") -> str:
717 |         """Prompt for creating a new Cloud Run service"""
718 |         return f"""
719 | I need to create a new Cloud Run service in {region}.
720 | 
721 | Please help me with:
722 | 1. What container image should I use?
723 | 2. How much CPU and memory should I allocate?
724 | 3. Should I set min and max instances for scaling?
725 | 4. Do I need to set any environment variables?
726 | 5. Should I allow unauthenticated access?
727 | 6. What's the best way to deploy my service?
728 | """
729 | 
730 |     @mcp_instance.prompt()
731 |     def update_service_prompt() -> str:
732 |         """Prompt for updating a Cloud Run service"""
733 |         return """
734 | I need to update my Cloud Run service.
735 | 
736 | Please help me understand:
737 | 1. How to update the container image
738 | 2. How to change resource allocations
739 | 3. How to add or modify environment variables
740 | 4. How to update scaling settings
741 | 5. Best practices for zero-downtime updates
742 | """
743 | 
```
Page 1/2FirstPrevNextLast