# Directory Structure
```
├── .gitignore
├── .python-version
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── requirements.txt
├── smithery.yaml
├── src
│ └── gcp-mcp-server
│ ├── __init__.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ ├── context.py
│ │ ├── logging_handler.py
│ │ ├── security.py
│ │ └── server.py
│ ├── main.py
│ ├── prompts
│ │ ├── __init__.py
│ │ └── common.py
│ └── services
│ ├── __init__.py
│ ├── artifact_registry.py
│ ├── client_instances.py
│ ├── cloud_audit_logs.py
│ ├── cloud_bigquery.py
│ ├── cloud_build.py
│ ├── cloud_compute_engine.py
│ ├── cloud_monitoring.py
│ ├── cloud_run.py
│ ├── cloud_storage.py
│ └── README.md
├── utils
│ ├── __init__.py
│ └── helpers.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.13
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/README.md:
--------------------------------------------------------------------------------
```markdown
# GCP Services MCP Implementation
This repository contains Model Context Protocol (MCP) implementations for various Google Cloud Platform (GCP) services. These modules expose GCP resources, tools, and prompts in a standardized way for AI assistants to interact with GCP infrastructure.
## Overview
The MCP framework provides a consistent way to expose cloud resources to AI assistants. Each service module follows a common pattern:
1. A `register` function that takes an MCP instance as a parameter
2. Resources for retrieving information about GCP resources
3. Tools for performing operations on GCP resources
4. Prompts for guiding users through common tasks
## Implementation Pattern
All service modules follow a consistent implementation pattern:
```python
import json
from services import client_instances
from google.cloud import service_client
def register(mcp_instance):
"""Register all resources and tools with the MCP instance."""
# Resources
@mcp_instance.resource("gcp://service/{project_id}/resources")
def list_resources(project_id: str = None) -> str:
# Implement resource listing
pass
# Tools
@mcp_instance.tool()
def create_resource(resource_name: str, project_id: str = None) -> str:
# Implement resource creation
pass
# Prompts
@mcp_instance.prompt()
def configuration_help() -> str:
# Return helpful prompt text
return """Help text here"""
```
## Key Design Principles
### 1. Client Management
Clients are accessed through a central `client_instances` module:
```python
client = client_instances.get_clients().service_name
```
This approach:
- Centralizes client creation and authentication
- Avoids duplicating client instantiation logic
- Enables consistent credential management
### 2. Parameter Defaults
Required parameters come first, followed by optional parameters with sensible defaults:
```python
def create_resource(
resource_name: str, # Required parameter first
project_id: str = None, # Optional parameters with defaults
location: str = None
) -> str:
# Use defaults from client_instances when not provided
project_id = project_id or client_instances.get_project_id()
location = location or client_instances.get_location()
```
### 3. Error Handling
All functions include consistent error handling:
```python
try:
# Implementation code
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
```
### 4. JSON Responses
All responses are consistently formatted JSON strings with proper indentation:
```python
return json.dumps(
{
"status": "success",
"resources": result,
"count": len(result)
},
indent=2
)
```
### 5. Defensive Coding
All implementations use defensive coding practices to handle potentially missing or null values:
```python
"labels": dict(resource.labels) if resource.labels else {}
```
## Available Services
The following GCP services are implemented:
1. **BigQuery** - Data warehouse operations
2. **Cloud Storage** - Object storage operations
3. **Cloud Run** - Serverless container management
4. **Cloud Build** - CI/CD pipeline management
5. **Artifact Registry** - Container and package registry
6. **Compute Engine** - VM instance management
7. **Cloud Monitoring** - Metrics and alerting
8. **Cloud Audit Logs** - Logging and auditing
## Service-Specific Features
### BigQuery
- Dataset and table management
- Query execution
- Data import/export
### Cloud Storage
- Bucket management
- Object operations (upload, download, delete)
- Bucket configuration
### Cloud Run
- Service deployment
- Service configuration
- Traffic management
### Cloud Build
- Trigger management
- Build execution
- Build monitoring
### Artifact Registry
- Repository management
- Package operations
### Compute Engine
- Instance lifecycle management
- Instance configuration
### Cloud Monitoring
- Metric exploration
- Alert policy management
- Notification channel configuration
### Cloud Audit Logs
- Log querying
- Log sink management
## Usage Example
To register all services with an MCP instance:
```python
from mcp.server.fastmcp import FastMCP
from services import (
bigquery,
storage,
cloudrun,
cloudbuild,
artifactregistry,
compute,
monitoring,
auditlogs
)
# Create MCP instance
mcp = FastMCP("GCP Services")
# Register all services
bigquery.register(mcp)
storage.register(mcp)
cloudrun.register(mcp)
cloudbuild.register(mcp)
artifactregistry.register(mcp)
compute.register(mcp)
monitoring.register(mcp)
auditlogs.register(mcp)
```
## Important Notes
1. **Default Project and Location**:
- Services default to the project and location configured in `client_instances`
- Always allow these to be overridden by parameters
2. **Authentication**:
- Authentication is handled by the `client_instances` module
- No credentials should be hardcoded in service implementations
3. **Error Handling**:
- All operations should include robust error handling
- Error responses should be informative but not expose sensitive information
4. **Response Formatting**:
- All responses should be valid JSON with proper indentation
- Success responses should include a "status": "success" field
- Error responses should include a "status": "error" field with a "message"
## Best Practices for MCP Integration
1. **Resource Naming**:
- Use consistent URI patterns for resources (gcp://service/{project_id}/resource)
- Group related resources logically
2. **Tool Design**:
- Design tools around specific user intents
- Order parameters with required ones first, followed by optional ones
- Provide sensible defaults for optional parameters
3. **Prompt Design**:
- Create prompts for common user scenarios
- Include enough context for the AI to provide helpful responses
- Structure prompts as guiding questions or templates
4. **Documentation**:
- Include descriptive docstrings for all resources, tools, and prompts
- Document all parameters and return values
- Include usage examples where appropriate
## Extending the Services
To add a new GCP service:
1. Create a new module following the pattern above
2. Implement resources for retrieving information
3. Implement tools for operations
4. Create prompts for common user tasks
5. Register the new service in your MCP initialization
## Troubleshooting
If you encounter issues:
1. Check client initialization in `client_instances`
2. Verify required permissions for the service operations
3. Look for additional error details in the returned JSON
4. Add print statements for debugging complex operations
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# This is not a Ready MCP Server
# GCP MCP Server
A comprehensive Model Context Protocol (MCP) server implementation for Google Cloud Platform (GCP) services, enabling AI assistants to interact with and manage GCP resources through a standardized interface.
## Overview
GCP MCP Server provides AI assistants with capabilities to:
- **Query GCP Resources**: Get information about your cloud infrastructure
- **Manage Cloud Resources**: Create, configure, and manage GCP services
- **Receive Assistance**: Get AI-guided help with GCP configurations and best practices
The implementation follows the MCP specification to enable AI systems to interact with GCP services in a secure, controlled manner.
## Supported GCP Services
This implementation includes support for the following GCP services:
- **Artifact Registry**: Container and package management
- **BigQuery**: Data warehousing and analytics
- **Cloud Audit Logs**: Logging and audit trail analysis
- **Cloud Build**: CI/CD pipeline management
- **Cloud Compute Engine**: Virtual machine instances
- **Cloud Monitoring**: Metrics, alerting, and dashboards
- **Cloud Run**: Serverless container deployments
- **Cloud Storage**: Object storage management
## Architecture
The project is structured as follows:
```
gcp-mcp-server/
├── core/ # Core MCP server functionality auth context logging_handler security
├── prompts/ # AI assistant prompts for GCP operations
├── services/ # GCP service implementations
│ ├── README.md # Service implementation details
│ └── ... # Individual service modules
├── main.py # Main server entry point
└── ...
```
Key components:
- **Service Modules**: Each GCP service has its own module with resources, tools, and prompts
- **Client Instances**: Centralized client management for authentication and resource access
- **Core Components**: Base functionality for the MCP server implementation
## Getting Started
### Prerequisites
- Python 3.10+
- GCP project with enabled APIs for the services you want to use
- Authenticated GCP credentials (Application Default Credentials recommended)
### Installation
1. Clone the repository:
```bash
git clone https://github.com/yourusername/gcp-mcp-server.git
cd gcp-mcp-server
```
2. Set up a virtual environment:
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
3. Install dependencies:
```bash
pip install -r requirements.txt
```
4. Configure your GCP credentials:
```bash
# Using gcloud
gcloud auth application-default login
# Or set GOOGLE_APPLICATION_CREDENTIALS
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"
```
5. Set up environment variables:
```bash
cp .env.example .env
# Edit .env with your configuration
```
### Running the Server
Start the MCP server:
```bash
python main.py
```
For development and testing:
```bash
# Development mode with auto-reload
python main.py --dev
# Run with specific configuration
python main.py --config config.yaml
```
## Docker Deployment
Build and run with Docker:
```bash
# Build the image
docker build -t gcp-mcp-server .
# Run the container
docker run -p 8080:8080 -v ~/.config/gcloud:/root/.config/gcloud gcp-mcp-server
```
## Configuration
The server can be configured through environment variables or a configuration file:
| Environment Variable | Description | Default |
|----------------------|-------------|---------|
| `GCP_PROJECT_ID` | Default GCP project ID | None (required) |
| `GCP_DEFAULT_LOCATION` | Default region/zone | `us-central1` |
| `MCP_SERVER_PORT` | Server port | `8080` |
| `LOG_LEVEL` | Logging level | `INFO` |
See `.env.example` for a complete list of configuration options.
## Development
### Adding a New GCP Service
1. Create a new file in the `services/` directory
2. Implement the service following the pattern in existing services
3. Register the service in `main.py`
See the [services README](services/README.md) for detailed implementation guidance.
## Security Considerations
- The server uses Application Default Credentials for authentication
- Authorization is determined by the permissions of the authenticated identity
- No credentials are hardcoded in the service implementations
- Consider running with a service account with appropriate permissions
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
1. Fork the repository
2. Create your feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add some amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## Acknowledgments
- Google Cloud Platform team for their comprehensive APIs
- Model Context Protocol for providing a standardized way for AI to interact with services
### Using the Server
To use this server:
1. Place your GCP service account key file as `service-account.json` in the same directory
2. Install the MCP package: `pip install "mcp[cli]"`
3. Install the required GCP package: `pip install google-cloud-run`
4. Run: `mcp dev gcp_cloudrun_server.py`
Or install it in Claude Desktop:
```
mcp install gcp_cloudrun_server.py --name "GCP Cloud Run Manager"
```
## MCP Server Configuration
The following configuration can be added to your configuration file for GCP Cloud Tools:
```json
"mcpServers": {
"GCP Cloud Tools": {
"command": "uv",
"args": [
"run",
"--with",
"google-cloud-artifact-registry>=1.10.0",
"--with",
"google-cloud-bigquery>=3.27.0",
"--with",
"google-cloud-build>=3.0.0",
"--with",
"google-cloud-compute>=1.0.0",
"--with",
"google-cloud-logging>=3.5.0",
"--with",
"google-cloud-monitoring>=2.0.0",
"--with",
"google-cloud-run>=0.9.0",
"--with",
"google-cloud-storage>=2.10.0",
"--with",
"mcp[cli]",
"--with",
"python-dotenv>=1.0.0",
"mcp",
"run",
"C:\\Users\\enes_\\Desktop\\mcp-repo-final\\gcp-mcp\\src\\gcp-mcp-server\\main.py"
],
"env": {
"GOOGLE_APPLICATION_CREDENTIALS": "C:/Users/enes_/Desktop/mcp-repo-final/gcp-mcp/service-account.json",
"GCP_PROJECT_ID": "gcp-mcp-cloud-project",
"GCP_LOCATION": "us-east1"
}
}
}
```
### Configuration Details
This configuration sets up an MCP server for Google Cloud Platform tools with the following:
- **Command**: Uses `uv` package manager to run the server
- **Dependencies**: Includes various Google Cloud libraries (Artifact Registry, BigQuery, Cloud Build, etc.)
- **Environment Variables**:
- `GOOGLE_APPLICATION_CREDENTIALS`: Path to your GCP service account credentials
- `GCP_PROJECT_ID`: Your Google Cloud project ID
- `GCP_LOCATION`: GCP region (us-east1)
### Usage
Add this configuration to your MCP configuration file to enable GCP Cloud Tools functionality.
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/prompts/common.py:
--------------------------------------------------------------------------------
```python
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/__init__.py:
--------------------------------------------------------------------------------
```python
"""Core package for MCP server components."""
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/__init__.py:
--------------------------------------------------------------------------------
```python
"""GCP MCP server package."""
__version__ = "0.1.0"
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
mcp>=1.0.0
google-cloud-bigquery
google-cloud-storage
google-cloud-run
google-cloud-artifact-registry
google-cloud-logging
python-dotenv
google-cloud-monitoring
google-cloud-compute
google-cloud-build
```
--------------------------------------------------------------------------------
/utils/helpers.py:
--------------------------------------------------------------------------------
```python
"""Common utility functions."""
def format_resource_name(project_id: str, resource_name: str) -> str:
"""Format a resource name with project ID."""
return f"projects/{project_id}/resources/{resource_name}"
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/security.py:
--------------------------------------------------------------------------------
```python
import re
class DataSanitizer:
PATTERNS = [
r"(?i)(token|key|secret|password)",
r"\b\d{4}-\d{4}-\d{4}-\d{4}\b", # CC-like numbers
]
@classmethod
def sanitize(cls, text: str) -> str:
for pattern in cls.PATTERNS:
text = re.sub(pattern, "[REDACTED]", text)
return text
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/__init__.py:
--------------------------------------------------------------------------------
```python
"""
GCP MCP Server services package.
This package contains service modules that register tools and resources with the MCP server.
"""
# The following allows these modules to be imported directly from the services package
from . import (
artifact_registry,
client_instances,
cloud_audit_logs,
cloud_bigquery,
cloud_build,
cloud_compute_engine,
cloud_monitoring,
cloud_run,
cloud_storage,
)
```
--------------------------------------------------------------------------------
/utils/__init__.py:
--------------------------------------------------------------------------------
```python
"""Utility functions for MCP server."""
import json
from typing import Any
def format_json_response(data: Any) -> str:
"""Format data as a JSON string with consistent styling."""
return json.dumps({"status": "success", "data": data}, indent=2)
def format_error_response(message: str) -> str:
"""Format error message as a JSON string."""
return json.dumps({"status": "error", "message": message}, indent=2)
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/logging_handler.py:
--------------------------------------------------------------------------------
```python
import logging
class MCPLogger:
"""Simple logger for MCP server"""
def __init__(self, name):
self.logger = logging.getLogger(f"mcp.{name}")
def info(self, message):
self.logger.info(message)
def warning(self, message):
self.logger.warning(message)
def error(self, message):
self.logger.error(message)
def critical(self, message):
self.logger.critical(message)
def debug(self, message):
self.logger.debug(message)
def audit_log(self, action, resource, details=None):
"""Simple audit logging"""
self.logger.info(f"AUDIT: {action} on {resource}")
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/prompts/__init__.py:
--------------------------------------------------------------------------------
```python
"""Common prompts for GCP operations."""
from ..core import mcp
@mcp.prompt()
def gcp_service_help(service_name: str) -> str:
"""Get help for using a GCP service."""
return f"""
I need help with using {service_name} in Google Cloud Platform.
Please help me understand:
1. Common operations and best practices
2. Required parameters and configuration
3. Security considerations
4. Recommended patterns for {service_name}
"""
@mcp.prompt()
def error_analysis(error_message: str) -> str:
"""Analyze a GCP error message."""
return f"""
I received this error from GCP:
{error_message}
Please help me:
1. Understand what caused this error
2. Find potential solutions
3. Prevent similar errors in the future
"""
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "gcp-mcp-server"
version = "0.1.0"
description = "A Model Context Protocol server that provides access to Google Cloud Platform services, enabling LLMs to manage and interact with GCP resources."
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"mcp[cli]>=1.0.0",
"google-cloud-bigquery",
"google-cloud-storage",
"google-cloud-run",
"google-cloud-artifact-registry",
"google-cloud-logging",
"python-dotenv",
"google-cloud-monitoring",
"google-cloud-compute",
"google-cloud-build",
]
[[project.authors]]
name = "Enes Bol"
email = "[email protected]"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/gcp_mcp"]
[project.scripts]
gcp-mcp-server = "gcp_mcp.main:main"
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/client_instances.py:
--------------------------------------------------------------------------------
```python
import logging
from core.context import GCPClients
# Set up logging
logger = logging.getLogger(__name__)
# Global client instance
_gcp_clients = None
_project_id = None
_location = None
def initialize_clients(credentials=None):
"""Initialize GCP clients with credentials."""
global _gcp_clients, _project_id, _location
try:
# Initialize GCP clients
_gcp_clients = GCPClients(credentials)
_project_id = _gcp_clients.project_id
_location = _gcp_clients.location
logger.info(f"GCP clients initialized with project: {_project_id}")
return True
except Exception as e:
logger.error(f"Failed to initialize GCP clients: {str(e)}")
return False
def get_clients():
"""Get the initialized GCP clients."""
if _gcp_clients is None:
logger.warning("GCP clients not initialized. Attempting to initialize...")
initialize_clients()
return _gcp_clients
def get_project_id():
"""Get the GCP project ID."""
return _project_id
def get_location():
"""Get the GCP location."""
return _location
```
--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------
```yaml
# Smithery.ai configuration
startCommand:
type: stdio
configSchema:
type: object
properties:
project_id:
type: string
description: "Google Cloud Project ID"
region:
type: string
description: "Default GCP region"
default: "us-central1"
timeout:
type: integer
description: "Default timeout for operations in seconds"
default: 300
service_account_json:
type: string
description: "GCP Service Account key JSON (as string)"
service_account_path:
type: string
description: "Path to GCP Service Account key file"
required:
- project_id
commandFunction: |-
(config) => ({
"command": "python",
"args": [
"main.py"
],
"env": {
"GCP_PROJECT_ID": config.project_id,
"GCP_REGION": config.region || "us-central1",
"GCP_TIMEOUT": String(config.timeout || 300),
"GCP_SERVICE_ACCOUNT_JSON": config.service_account_json || "",
"GCP_SERVICE_ACCOUNT_KEY_PATH": config.service_account_path || ""
}
})
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
# Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
# Use a Python image with uv pre-installed
FROM ghcr.io/astral-sh/uv:python3.13-bookworm-slim AS uv
# Set working directory
WORKDIR /app
# Enable bytecode compilation
ENV UV_COMPILE_BYTECODE=1
# Copy pyproject.toml and lock file for dependencies
COPY pyproject.toml uv.lock ./
# Install the project's dependencies
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --no-install-project --no-dev --no-editable
# Add the rest of the project source code and install it
ADD src /app/src
# Sync and install the project
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --no-dev --no-editable
FROM python:3.13-slim-bookworm
# Set working directory
WORKDIR /app
# Copy virtual environment from the builder
COPY --from=uv /root/.local /root/.local
COPY --from=uv --chown=app:app /app/.venv /app/.venv
# Place executables in the environment at the front of the path
ENV PATH="/app/.venv/bin:$PATH"
# Define the entry point
ENTRYPOINT ["gcp-mcp-server"]
# Example command
# CMD ["--project", "your-gcp-project-id", "--location", "your-gcp-location"]
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/server.py:
--------------------------------------------------------------------------------
```python
# import logging
# import os
# from contextlib import asynccontextmanager
# from typing import Any, AsyncIterator, Dict
# from auth import get_credentials
# from mcp.server.fastmcp import FastMCP
# from ..services import bigquery, compute, iam, storage
# logging.basicConfig(level=logging.INFO)
# logger = logging.getLogger(__name__)
# @asynccontextmanager
# async def gcp_lifespan(server) -> AsyncIterator[Dict[str, Any]]:
# """Set up GCP context with credentials."""
# logger.info("Initializing GCP MCP server...")
# try:
# # Get GCP credentials
# credentials = get_credentials()
# project_id = os.environ.get("GCP_PROJECT_ID")
# if not project_id:
# logger.warning(
# "GCP_PROJECT_ID not set in environment. Some features may not work correctly."
# )
# logger.info(f"Server initialized with project: {project_id or 'Not set'}")
# # Yield context to be used by handlers
# yield {"credentials": credentials, "project_id": project_id}
# except Exception as e:
# logger.error(f"Failed to initialize GCP context: {str(e)}")
# raise
# finally:
# logger.info("Shutting down GCP MCP server...")
# # Create main server FIRST
# mcp = FastMCP(
# "GCP Manager",
# description="Manage Google Cloud Platform Resources",
# lifespan=gcp_lifespan,
# )
# # Register all services
# compute.register(mcp)
# storage.register(mcp)
# bigquery.register(mcp)
# iam.register(mcp)
# # THEN define resources and tools
# @mcp.resource("gcp://project")
# def get_project_info():
# """Get information about the current GCP project"""
# project_id = os.environ.get("GCP_PROJECT_ID")
# return f"Project ID: {project_id}"
# @mcp.resource("gcp://storage/buckets")
# def list_buckets():
# """List GCP storage buckets"""
# from google.cloud import storage
# client = storage.Client()
# buckets = list(client.list_buckets(max_results=10))
# return "\n".join([f"- {bucket.name}" for bucket in buckets])
# @mcp.resource("test://hello")
# def hello_resource():
# """A simple test resource"""
# return "Hello World"
# @mcp.tool()
# def list_gcp_instances(region: str = "us-central1") -> str:
# """List GCP compute instances in a region"""
# # Use your credentials to list instances
# return f"Instances in {region}: [instance list would go here]"
# @mcp.tool()
# def test_gcp_auth() -> str:
# """Test GCP authentication"""
# try:
# from google.cloud import storage
# client = storage.Client()
# buckets = list(client.list_buckets(max_results=5))
# return f"Authentication successful. Found {len(buckets)} buckets."
# except Exception as e:
# return f"Authentication failed: {str(e)}"
# if __name__ == "__main__":
# mcp.run()
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/auth.py:
--------------------------------------------------------------------------------
```python
import json
import os
import google.auth
from google.auth.exceptions import DefaultCredentialsError
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from .logging_handler import MCPLogger
# Define required scopes for GCP APIs
REQUIRED_SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]
logger = MCPLogger("auth")
def get_credentials():
"""
Get Google Cloud credentials from environment.
Attempts to load credentials in the following order:
1. From GOOGLE_APPLICATION_CREDENTIALS environment variable
2. From GCP_SERVICE_ACCOUNT_JSON environment variable containing JSON
3. Default application credentials
"""
try:
# Check for credentials file path
creds_file = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
if creds_file and os.path.exists(creds_file):
logger.audit_log(
action="credential_load",
resource="service_account",
details={"method": "file", "file": creds_file},
)
logger.info(f"Loading credentials from file: {creds_file}")
credentials = service_account.Credentials.from_service_account_file(
creds_file, scopes=REQUIRED_SCOPES
)
return validate_credentials(credentials)
# Check for service account JSON in environment
sa_json = os.environ.get("GCP_SERVICE_ACCOUNT_JSON")
if sa_json:
try:
service_account_info = json.loads(sa_json)
logger.info(
"Loading credentials from GCP_SERVICE_ACCOUNT_JSON environment variable"
)
credentials = service_account.Credentials.from_service_account_info(
service_account_info, scopes=REQUIRED_SCOPES
)
logger.audit_log(
action="credential_load",
resource="service_account",
details={"method": "environment_json"},
)
return validate_credentials(credentials)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse GCP_SERVICE_ACCOUNT_JSON: {str(e)}")
# Fall back to default credentials
try:
logger.audit_log(
action="credential_load",
resource="application_default",
details={"method": "default"},
)
logger.info("Loading default application credentials")
credentials, project_id = google.auth.default(scopes=REQUIRED_SCOPES)
if project_id and not os.environ.get("GCP_PROJECT_ID"):
# Set project ID from default credentials if not already set
os.environ["GCP_PROJECT_ID"] = project_id
logger.info(f"Using project ID from default credentials: {project_id}")
return validate_credentials(credentials)
except DefaultCredentialsError as e:
logger.error(f"Failed to load GCP credentials: {str(e)}")
raise AuthenticationError("Failed to obtain valid credentials")
except Exception as e:
logger.critical(f"Authentication failure: {str(e)}")
raise AuthenticationError(f"Failed to obtain valid credentials: {str(e)}")
def validate_credentials(credentials):
"""Validate credential permissions and expiration"""
# Some credentials don't have a valid attribute or may not need refresh
if hasattr(credentials, "valid") and not credentials.valid:
credentials.refresh(Request())
# Check expiration
if hasattr(credentials, "expiry"):
if hasattr(credentials, "expired") and credentials.expired:
try:
credentials.refresh(Request())
except Exception as e:
raise AuthenticationError(
f"Failed to refresh expired credentials: {str(e)}"
)
return credentials
class AuthenticationError(Exception):
"""Custom exception for authentication failures"""
pass
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/main.py:
--------------------------------------------------------------------------------
```python
import os
# At the top of main.py, add these imports and logging statements
import sys
# Get the directory containing main.py
current_dir = os.path.dirname(os.path.abspath(__file__))
# Add it to Python's module search path
sys.path.append(current_dir)
# Also add the parent directory if needed
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)
import inspect
import logging
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator, Dict
# Import the GCP Clients and Context
from mcp.server.fastmcp import FastMCP
from services import client_instances
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# After adding paths, print them to see what's happening
print(f"Current working directory: {os.getcwd()}")
print(f"Updated Python path: {sys.path}")
@asynccontextmanager
async def gcp_lifespan(server) -> AsyncIterator[Dict[str, Any]]:
"""Set up GCP context with credentials."""
logger.info("Initializing GCP MCP server...")
try:
# Initialize GCP credentials
credentials = None
# Check for service account key file path
sa_path = os.environ.get("GCP_SERVICE_ACCOUNT_KEY_PATH")
if sa_path and os.path.exists(sa_path):
logger.info(f"Using service account key from: {sa_path}")
# Initialize GCP clients using the client_instances module
client_instances.initialize_clients(credentials)
clients = client_instances.get_clients()
project_id = client_instances.get_project_id()
location = client_instances.get_location()
logger.info(f"Server initialized with project: {project_id}")
# Yield context with clients and credentials
yield {
"clients": clients,
"project_id": project_id,
"credentials": credentials,
"location": location,
}
except Exception as e:
logger.error(f"Failed to initialize GCP context: {str(e)}")
raise
finally:
logger.info("Shutting down GCP MCP server...")
# Create the global MCP instance
mcp = FastMCP(
"GCP Manager",
description="Manage Google Cloud Platform Resources",
lifespan=gcp_lifespan,
dependencies=[
"mcp>=1.0.0",
"google-cloud-bigquery",
"google-cloud-storage",
"google-cloud-run",
"google-cloud-artifact-registry",
"google-cloud-logging",
"python-dotenv",
"google-cloud-monitoring",
"google-cloud-compute",
"google-cloud-build",
],
)
# Basic resources and tools
@mcp.resource("test://hello")
def hello_resource():
"""A simple test resource"""
return "Hello World"
@mcp.tool()
def test_gcp_auth() -> str:
"""Test GCP authentication"""
try:
# Get clients from client_instances module instead of context
clients = client_instances.get_clients()
# Test if we can list storage buckets
if hasattr(clients, "storage"):
try:
buckets = list(clients.storage.list_buckets(max_results=5))
return f"Authentication successful. Found {len(buckets)} buckets. {buckets}"
except Exception as e:
return f"Storage authentication failed: {str(e)}"
except Exception as e:
return f"Authentication failed: {str(e)}"
# # Function to register services
# def register_services():
# """Register all service modules with the MCP instance."""
# services_dir = os.path.join(os.path.dirname(__file__), "services")
# logger.info(f"Loading services from {services_dir}")
# if not os.path.exists(services_dir):
# logger.warning(f"Services directory not found: {services_dir}")
# return
# # Get all Python files in the services directory
# for filename in os.listdir(services_dir):
# if (
# filename.endswith(".py")
# and filename != "__init__.py"
# and filename != "client_instances.py"
# ):
# module_name = filename[:-3] # Remove .py extension
# try:
# # Load the module using importlib
# module_path = os.path.join(services_dir, filename)
# spec = importlib.util.spec_from_file_location(
# f"services.{module_name}", module_path
# )
# module = importlib.util.module_from_spec(spec)
# # Inject mcp instance into the module
# module.mcp = mcp
# # Execute the module
# spec.loader.exec_module(module)
# logger.info(f"Loaded service module: {module_name}")
# # If the module has a register function, call it
# if hasattr(module, "register"):
# # Pass the mcp instance and the server's request_context to register
# module.register(mcp)
# logger.info(f"Registered service: {module_name}")
# except Exception as e:
# logger.error(f"Error loading service {module_name}: {e}")
def register_services():
"""Register all service modules with the MCP instance."""
logger.info("Explicitly registering service modules")
# Print diagnostic information
logger.info(f"Python sys.path: {sys.path}")
logger.info(f"Current working directory: {os.getcwd()}")
logger.info(f"Script location: {os.path.abspath(__file__)}")
logger.info(f"Parent directory: {os.path.dirname(os.path.abspath(__file__))}")
try:
# Try importing a service module to check if imports work
logger.info("Attempting to import artifact_registry")
import services.artifact_registry as artifact_registry
logger.info(f"Module location: {inspect.getfile(artifact_registry)}")
from services import (
artifact_registry,
cloud_audit_logs,
cloud_bigquery,
cloud_build,
cloud_compute_engine,
cloud_monitoring,
cloud_run,
cloud_storage,
)
# Register each service module
artifact_registry.register(mcp)
cloud_audit_logs.register(mcp)
cloud_bigquery.register(mcp)
cloud_build.register(mcp)
cloud_compute_engine.register(mcp)
cloud_monitoring.register(mcp)
cloud_run.register(mcp)
cloud_storage.register(mcp)
logger.info("All service modules registered successfully")
except Exception as e:
logger.error(f"Error registering service modules: {e}")
# Add detailed error logging
import traceback
logger.error(traceback.format_exc())
if __name__ == "__main__":
logger.info("Starting GCP MCP server")
# Register services
register_services()
# Run the server
mcp.run()
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/context.py:
--------------------------------------------------------------------------------
```python
# context.py
import json
import os
from typing import Any, Optional, Type
import google.auth
# Import other services as needed
from google.cloud import (
artifactregistry_v1,
bigquery,
compute_v1,
monitoring_v3,
storage,
)
from google.oauth2 import service_account
class GCPClients:
"""Client manager for GCP services"""
def __init__(self, credentials=None):
self.credentials = self._get_credentials(credentials)
self.project_id = self._get_project_id()
self.location = self._get_location()
self._clients = {}
# Initialize clients
self._storage_client = None
self._bigquery_client = None
self._run_client = None
self._logging_client = None
self._monitoring_client = None
self._compute_client = None
self._sql_client = None
self._cloudbuild_client = None
self._artifactregistry_client = None
def _get_credentials(self, credentials=None):
"""Get credentials from various sources"""
# If credentials are directly provided
if credentials:
return credentials
# Check for service account JSON in environment variable
sa_json = os.environ.get("GCP_SERVICE_ACCOUNT_JSON")
if sa_json:
try:
sa_info = json.loads(sa_json)
return service_account.Credentials.from_service_account_info(sa_info)
except Exception as e:
print(f"Error loading service account JSON: {e}")
# Check for service account key file path
sa_path = os.environ.get("GCP_SERVICE_ACCOUNT_KEY_PATH")
if sa_path:
try:
return service_account.Credentials.from_service_account_file(sa_path)
except Exception as e:
print(f"Error loading service account key file: {e}")
# Fall back to application default credentials
try:
credentials, project = google.auth.default()
return credentials
except Exception as e:
print(f"Error getting default credentials: {e}")
raise RuntimeError("No valid GCP credentials found")
def _get_project_id(self) -> str:
"""Get project ID from environment or credentials"""
project_id = os.environ.get("GCP_PROJECT_ID")
if project_id:
return project_id
if hasattr(self.credentials, "project_id"):
return self.credentials.project_id
try:
_, project_id = google.auth.default()
return project_id
except Exception:
raise RuntimeError("Unable to determine GCP project ID")
def _get_location(self) -> str:
"""Get default location/region from environment"""
return os.environ.get("GCP_LOCATION", "us-central1")
# def _init_client(
# self, client_class: Type, current_client: Optional[Any], **kwargs
# ) -> Any:
# """Helper method to initialize clients with error handling"""
# if not current_client:
# try:
# return client_class(
# project=self.project_id, # <-- This adds project automatically
# credentials=self.credentials, # <-- This adds credentials automatically
# **kwargs, # <-- This adds any extra params (like database)
# )
# except Exception as e:
# raise RuntimeError(
# f"Failed to initialize {client_class.__name__}: {str(e)}"
# )
# return current_client
def _init_client(
self, client_class: Type, current_client: Optional[Any], **kwargs
) -> Any:
"""Helper method to initialize clients with error handling"""
if not current_client:
try:
# Check if this client accepts project parameter
if client_class in [
storage.Client
]: # Add other clients that accept project
kwargs["project"] = self.project_id
return client_class(credentials=self.credentials, **kwargs)
except Exception as e:
raise RuntimeError(
f"Failed to initialize {client_class.__name__}: {str(e)}"
)
return current_client
@property
def storage(self) -> storage.Client:
self._storage_client = self._init_client(storage.Client, self._storage_client)
return self._storage_client
@property
def bigquery(self) -> bigquery.Client:
self._bigquery_client = self._init_client(
bigquery.Client, self._bigquery_client
)
return self._bigquery_client
@property
def artifactregistry(self) -> artifactregistry_v1.ArtifactRegistryClient:
"""Get the Artifact Registry client."""
if not self._artifactregistry_client:
try:
# ArtifactRegistryClient doesn't accept project parameter
self._artifactregistry_client = (
artifactregistry_v1.ArtifactRegistryClient(
credentials=self.credentials
)
)
except Exception as e:
raise RuntimeError(
f"Failed to initialize ArtifactRegistryClient: {str(e)}"
)
return self._artifactregistry_client
# Uncomment and implement other client properties as needed
# @property
# def storage(self) -> storage.Client:
# self._storage_client = self._init_client(storage.Client, self._storage_client)
# return self._storage_client
def close_all(self):
"""Close all open clients"""
for client in self._clients.values():
if hasattr(client, "transport") and hasattr(client.transport, "close"):
client.transport.close()
elif hasattr(client, "close"):
client.close()
self._clients.clear()
# Update Context class to handle credentials properly
class Context:
def __init__(self, request_context):
self.request_context = request_context
# Get credentials from lifespan context
credentials = request_context.lifespan_context.get("credentials")
# Initialize GCPClients with those credentials
self.clients = GCPClients(credentials)
def close(self):
"""Clean up when request ends"""
if hasattr(self, "clients"):
self.clients.close_all()
# @property
# def run(self) -> run_v2.CloudRunClient:
# self._run_client = self._init_client(run_v2.CloudRunClient, self._run_client)
# return self._run_client
# @property
# def logging(self) -> logging_v2.LoggingServiceV2Client:
# self._logging_client = self._init_client(
# logging_v2.LoggingServiceV2Client, self._logging_client
# )
# return self._logging_client
@property
def monitoring(self) -> monitoring_v3.MetricServiceClient:
self._monitoring_client = self._init_client(
monitoring_v3.MetricServiceClient, self._monitoring_client
)
return self._monitoring_client
@property
def compute(self) -> compute_v1.InstancesClient:
self._compute_client = self._init_client(
compute_v1.InstancesClient, self._compute_client
)
return self._compute_client
# @property
# def sql(self) -> sql_v1.InstancesClient:
# self._sql_client = self._init_client(sql_v1.InstancesClient, self._sql_client)
# return self._sql_client
# @property
# def cloudbuild(self) -> cloudbuild_v1.CloudBuildClient:
# self._cloudbuild_client = self._init_client(
# cloudbuild_v1.CloudBuildClient, self._cloudbuild_client
# )
# return self._cloudbuild_client
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_build.py:
--------------------------------------------------------------------------------
```python
import json
from typing import Dict, Optional
from services import client_instances
def register(mcp_instance):
"""Register all Cloud Build resources and tools with the MCP instance."""
# Resources
@mcp_instance.resource("gcp://cloudbuild/{project_id}/builds")
def list_builds_resource(project_id: str = None) -> str:
"""List all Cloud Build executions for a project"""
try:
# Get client from client_instances
client = client_instances.get_clients().cloudbuild
project_id = project_id or client_instances.get_project_id()
builds = client.list_builds(project_id=project_id)
result = []
for build in builds:
result.append(
{
"id": build.id,
"status": build.status.name if build.status else None,
"source": build.source.repo_source.commit_sha
if build.source and build.source.repo_source
else None,
"create_time": build.create_time.isoformat()
if build.create_time
else None,
"logs_url": build.logs_url,
"substitutions": dict(build.substitutions)
if build.substitutions
else {},
}
)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp_instance.resource("gcp://cloudbuild/{project_id}/triggers")
def list_triggers_resource(project_id: str = None) -> str:
"""List all Cloud Build triggers for a project"""
try:
# Get client from client_instances
client = client_instances.get_clients().cloudbuild
project_id = project_id or client_instances.get_project_id()
triggers = client.list_triggers(project_id=project_id)
result = []
for trigger in triggers:
result.append(
{
"id": trigger.id,
"name": trigger.name,
"description": trigger.description,
"trigger_template": {
"repo_name": trigger.trigger_template.repo_name,
"branch_name": trigger.trigger_template.branch_name,
}
if trigger.trigger_template
else None,
}
)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
# Tools
@mcp_instance.tool()
def trigger_build(
build_config: Dict,
project_id: str = None,
) -> str:
"""
Trigger a new Cloud Build execution
Args:
build_config: Dictionary containing the build configuration
project_id: GCP project ID (defaults to configured project)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().cloudbuild
project_id = project_id or client_instances.get_project_id()
# Convert config dict to Build object
build = cloudbuild_v1.Build.from_json(json.dumps(build_config))
print(f"Triggering build in project {project_id}...")
operation = client.create_build(project_id=project_id, build=build)
response = operation.result()
return json.dumps(
{
"status": "success",
"build_id": response.id,
"status": response.status.name if response.status else None,
"logs_url": response.logs_url,
"build_name": response.name,
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def list_builds(
project_id: str = None,
filter: Optional[str] = None,
page_size: Optional[int] = 100,
) -> str:
"""
List Cloud Build executions with optional filtering
Args:
project_id: GCP project ID (defaults to configured project)
filter: Filter expression for builds
page_size: Maximum number of results to return
"""
try:
# Get client from client_instances
client = client_instances.get_clients().cloudbuild
project_id = project_id or client_instances.get_project_id()
request = cloudbuild_v1.ListBuildsRequest(
project_id=project_id, filter=filter, page_size=page_size
)
builds = []
for build in client.list_builds(request=request):
builds.append(
{
"id": build.id,
"status": build.status.name if build.status else None,
"create_time": build.create_time.isoformat()
if build.create_time
else None,
"source": build.source.repo_source.commit_sha
if build.source and build.source.repo_source
else None,
"logs_url": build.logs_url,
}
)
return json.dumps(
{"status": "success", "builds": builds, "count": len(builds)}, indent=2
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def create_build_trigger(
trigger_config: Dict,
project_id: str = None,
) -> str:
"""
Create a new Cloud Build trigger
Args:
trigger_config: Dictionary containing the trigger configuration
project_id: GCP project ID (defaults to configured project)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().cloudbuild
project_id = project_id or client_instances.get_project_id()
# Convert config dict to Trigger object
trigger = cloudbuild_v1.BuildTrigger.from_json(json.dumps(trigger_config))
print(f"Creating trigger in project {project_id}...")
response = client.create_trigger(project_id=project_id, trigger=trigger)
return json.dumps(
{
"status": "success",
"trigger_id": response.id,
"name": response.name,
"description": response.description,
"create_time": response.create_time.isoformat()
if response.create_time
else None,
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
# Prompts
@mcp_instance.prompt()
def build_config_prompt(service_type: str = "docker") -> str:
"""Prompt for creating a Cloud Build configuration"""
return f"""
I need to create a Cloud Build configuration for {service_type} deployments. Please help with:
1. Recommended build steps for {service_type}
2. Proper use of substitutions
3. Caching strategies
4. Security best practices
5. Integration with other GCP services
"""
@mcp_instance.prompt()
def trigger_setup_prompt(repo_type: str = "github") -> str:
"""Prompt for configuring build triggers"""
return f"""
I want to set up a {repo_type} trigger for Cloud Build. Please explain:
1. Required permissions and connections
2. Event types (push, PR, etc.)
3. File pattern matching
4. Branch/tag filtering
5. Approval workflows
"""
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_compute_engine.py:
--------------------------------------------------------------------------------
```python
import json
from typing import Optional
from services import client_instances
def register(mcp_instance):
"""Register all Compute Engine resources and tools with the MCP instance."""
# Resources
@mcp_instance.resource("gcp://compute/{project_id}/{zone}/instances")
def list_instances_resource(project_id: str = None, zone: str = None) -> str:
"""List all Compute Engine instances in a zone"""
try:
# Get client from client_instances
client = client_instances.get_clients().compute
project_id = project_id or client_instances.get_project_id()
zone = zone or client_instances.get_location()
instance_list = client.list(project=project_id, zone=zone)
result = []
for instance in instance_list:
result.append(
{
"name": instance.name,
"status": instance.status,
"machine_type": instance.machine_type.split("/")[-1]
if instance.machine_type
else None,
"internal_ip": instance.network_interfaces[0].network_i_p
if instance.network_interfaces
and len(instance.network_interfaces) > 0
else None,
"creation_timestamp": instance.creation_timestamp,
}
)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
# Tools
@mcp_instance.tool()
def create_instance(
instance_name: str,
machine_type: str,
project_id: str = None,
zone: str = None,
image: str = "projects/debian-cloud/global/images/family/debian-11",
network: str = "global/networks/default",
disk_size_gb: int = 10,
) -> str:
"""
Create a new Compute Engine instance
Args:
instance_name: Name for the new instance
machine_type: Machine type (e.g., n2-standard-2)
project_id: GCP project ID (defaults to configured project)
zone: GCP zone (defaults to configured location)
image: Source image for boot disk
network: Network to connect to
disk_size_gb: Size of boot disk in GB
"""
try:
# Get client from client_instances
client = client_instances.get_clients().compute
project_id = project_id or client_instances.get_project_id()
zone = zone or client_instances.get_location()
# Build instance configuration
instance_config = {
"name": instance_name,
"machine_type": f"zones/{zone}/machineTypes/{machine_type}",
"disks": [
{
"boot": True,
"auto_delete": True,
"initialize_params": {
"source_image": image,
"disk_size_gb": disk_size_gb,
},
}
],
"network_interfaces": [
{
"network": network,
"access_configs": [
{"type": "ONE_TO_ONE_NAT", "name": "External NAT"}
],
}
],
}
print(f"Creating instance {instance_name} in {zone}...")
operation = client.insert(
project=project_id, zone=zone, instance_resource=instance_config
)
# Wait for operation to complete
operation.result()
return json.dumps(
{
"status": "success",
"instance_name": instance_name,
"zone": zone,
"operation_type": "insert",
"operation_status": "DONE",
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def list_instances(
project_id: str = None,
zone: str = None,
filter: Optional[str] = None,
) -> str:
"""
List Compute Engine instances in a zone
Args:
project_id: GCP project ID (defaults to configured project)
zone: GCP zone (defaults to configured location)
filter: Optional filter expression (e.g., "status=RUNNING")
"""
try:
# Get client from client_instances
client = client_instances.get_clients().compute
project_id = project_id or client_instances.get_project_id()
zone = zone or client_instances.get_location()
instance_list = client.list(project=project_id, zone=zone, filter=filter)
instances = []
for instance in instance_list:
instances.append(
{
"name": instance.name,
"status": instance.status,
"machine_type": instance.machine_type.split("/")[-1]
if instance.machine_type
else None,
"internal_ip": instance.network_interfaces[0].network_i_p
if instance.network_interfaces
and len(instance.network_interfaces) > 0
else None,
"creation_timestamp": instance.creation_timestamp,
}
)
return json.dumps(
{"status": "success", "instances": instances, "count": len(instances)},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def start_instance(
instance_name: str,
project_id: str = None,
zone: str = None,
) -> str:
"""
Start a stopped Compute Engine instance
Args:
instance_name: Name of the instance to start
project_id: GCP project ID (defaults to configured project)
zone: GCP zone (defaults to configured location)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().compute
project_id = project_id or client_instances.get_project_id()
zone = zone or client_instances.get_location()
print(f"Starting instance {instance_name}...")
operation = client.start(
project=project_id, zone=zone, instance=instance_name
)
operation.result()
return json.dumps(
{
"status": "success",
"instance_name": instance_name,
"operation_status": "DONE",
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def stop_instance(
instance_name: str,
project_id: str = None,
zone: str = None,
) -> str:
"""
Stop a running Compute Engine instance
Args:
instance_name: Name of the instance to stop
project_id: GCP project ID (defaults to configured project)
zone: GCP zone (defaults to configured location)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().compute
project_id = project_id or client_instances.get_project_id()
zone = zone or client_instances.get_location()
print(f"Stopping instance {instance_name}...")
operation = client.stop(
project=project_id, zone=zone, instance=instance_name
)
operation.result()
return json.dumps(
{
"status": "success",
"instance_name": instance_name,
"operation_status": "DONE",
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def delete_instance(
instance_name: str,
project_id: str = None,
zone: str = None,
) -> str:
"""
Delete a Compute Engine instance
Args:
instance_name: Name of the instance to delete
project_id: GCP project ID (defaults to configured project)
zone: GCP zone (defaults to configured location)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().compute
project_id = project_id or client_instances.get_project_id()
zone = zone or client_instances.get_location()
print(f"Deleting instance {instance_name}...")
operation = client.delete(
project=project_id, zone=zone, instance=instance_name
)
operation.result()
return json.dumps(
{
"status": "success",
"instance_name": instance_name,
"operation_status": "DONE",
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
# Prompts
@mcp_instance.prompt()
def instance_config_prompt(workload_type: str = "web-server") -> str:
"""Prompt for creating Compute Engine configurations"""
return f"""
I need to configure a Compute Engine instance for {workload_type}. Please help with:
1. Recommended machine types for {workload_type}
2. Disk type and size recommendations
3. Network configuration best practices
4. Security considerations (service accounts, firewall rules)
5. Cost optimization strategies
"""
@mcp_instance.prompt()
def troubleshooting_prompt(issue: str = "instance not responding") -> str:
"""Prompt for troubleshooting Compute Engine issues"""
return f"""
I'm experiencing {issue} with my Compute Engine instance. Please guide me through:
1. Common causes for this issue
2. Diagnostic steps using Cloud Console and CLI
3. Log analysis techniques
4. Recovery procedures
5. Prevention strategies
"""
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_audit_logs.py:
--------------------------------------------------------------------------------
```python
import json
from google.cloud import logging_v2
from services import client_instances
def register(mcp_instance):
"""Register all Cloud Audit Logs resources and tools with the MCP instance."""
# Resources
@mcp_instance.resource("gcp://audit-logs/{project_id}")
def list_audit_logs_resource(project_id: str = None) -> str:
"""List recent audit logs from a specific project"""
try:
# Get client from client_instances
client = client_instances.get_clients().logging
project_id = project_id or client_instances.get_project_id()
# Filter for audit logs only
filter_str = 'logName:"cloudaudit.googleapis.com"'
# List log entries
entries = client.list_log_entries(
request={
"resource_names": [f"projects/{project_id}"],
"filter": filter_str,
"page_size": 10, # Limiting to 10 for responsiveness
}
)
result = []
for entry in entries:
log_data = {
"timestamp": entry.timestamp.isoformat()
if entry.timestamp
else None,
"severity": entry.severity.name if entry.severity else None,
"log_name": entry.log_name,
"resource": {
"type": entry.resource.type,
"labels": dict(entry.resource.labels)
if entry.resource.labels
else {},
}
if entry.resource
else {},
}
# Handle payload based on type
if hasattr(entry, "json_payload") and entry.json_payload:
log_data["payload"] = dict(entry.json_payload)
elif hasattr(entry, "proto_payload") and entry.proto_payload:
log_data["payload"] = "Proto payload (details omitted)"
elif hasattr(entry, "text_payload") and entry.text_payload:
log_data["payload"] = entry.text_payload
result.append(log_data)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp_instance.resource("gcp://audit-logs/{project_id}/sinks")
def list_log_sinks_resource(project_id: str = None) -> str:
"""List log sinks configured for a project"""
try:
# Get client from client_instances
client = client_instances.get_clients().logging
project_id = project_id or client_instances.get_project_id()
parent = f"projects/{project_id}"
sinks = client.list_sinks(parent=parent)
result = []
for sink in sinks:
result.append(
{
"name": sink.name,
"destination": sink.destination,
"filter": sink.filter,
"description": sink.description,
"disabled": sink.disabled,
}
)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
# Tools
@mcp_instance.tool()
def list_audit_logs(
project_id: str = None,
filter_str: str = 'logName:"cloudaudit.googleapis.com"',
page_size: int = 20,
) -> str:
"""
List Google Cloud Audit logs from a project
Args:
project_id: GCP project ID (defaults to configured project)
filter_str: Log filter expression (defaults to audit logs)
page_size: Maximum number of entries to return
"""
try:
# Get client from client_instances
client = client_instances.get_clients().logging
project_id = project_id or client_instances.get_project_id()
print(f"Retrieving audit logs from project {project_id}...")
# List log entries
entries = client.list_log_entries(
request={
"resource_names": [f"projects/{project_id}"],
"filter": filter_str,
"page_size": page_size,
}
)
result = []
for entry in entries:
log_data = {
"timestamp": entry.timestamp.isoformat()
if entry.timestamp
else None,
"severity": entry.severity.name if entry.severity else None,
"log_name": entry.log_name,
"resource": {
"type": entry.resource.type,
"labels": dict(entry.resource.labels)
if entry.resource.labels
else {},
}
if entry.resource
else {},
}
# Handle payload based on type
if hasattr(entry, "json_payload") and entry.json_payload:
log_data["payload"] = dict(entry.json_payload)
elif hasattr(entry, "proto_payload") and entry.proto_payload:
log_data["payload"] = "Proto payload (details omitted)"
elif hasattr(entry, "text_payload") and entry.text_payload:
log_data["payload"] = entry.text_payload
result.append(log_data)
return json.dumps(
{
"status": "success",
"entries": result,
"count": len(result),
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def filter_admin_activity_logs(
project_id: str = None,
service_name: str = "",
resource_type: str = "",
time_range: str = "1h",
page_size: int = 20,
) -> str:
"""
Filter Admin Activity audit logs for specific services or resource types
Args:
project_id: GCP project ID (defaults to configured project)
service_name: Optional service name to filter by (e.g., compute.googleapis.com)
resource_type: Optional resource type to filter by (e.g., gce_instance)
time_range: Time range for logs (e.g., 1h, 24h, 7d)
page_size: Maximum number of entries to return
"""
try:
# Get client from client_instances
client = client_instances.get_clients().logging
project_id = project_id or client_instances.get_project_id()
# Build filter string
filter_parts = ['logName:"cloudaudit.googleapis.com%2Factivity"']
if service_name:
filter_parts.append(f'protoPayload.serviceName="{service_name}"')
if resource_type:
filter_parts.append(f'resource.type="{resource_type}"')
if time_range:
filter_parts.append(f'timestamp >= "-{time_range}"')
filter_str = " AND ".join(filter_parts)
print(f"Filtering Admin Activity logs with: {filter_str}")
# List log entries
entries = client.list_log_entries(
request={
"resource_names": [f"projects/{project_id}"],
"filter": filter_str,
"page_size": page_size,
}
)
result = []
for entry in entries:
# Extract relevant fields for admin activity logs
log_data = {
"timestamp": entry.timestamp.isoformat()
if entry.timestamp
else None,
"method_name": None,
"resource_name": None,
"service_name": None,
"user": None,
}
# Extract data from proto payload
if hasattr(entry, "proto_payload") and entry.proto_payload:
payload = entry.proto_payload
if hasattr(payload, "method_name"):
log_data["method_name"] = payload.method_name
if hasattr(payload, "resource_name"):
log_data["resource_name"] = payload.resource_name
if hasattr(payload, "service_name"):
log_data["service_name"] = payload.service_name
# Extract authentication info
if hasattr(payload, "authentication_info"):
auth_info = payload.authentication_info
if hasattr(auth_info, "principal_email"):
log_data["user"] = auth_info.principal_email
result.append(log_data)
return json.dumps(
{
"status": "success",
"entries": result,
"count": len(result),
"filter": filter_str,
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def create_log_sink(
sink_name: str,
destination: str,
project_id: str = None,
filter_str: str = 'logName:"cloudaudit.googleapis.com"',
description: str = "",
) -> str:
"""
Create a log sink to export audit logs to a destination
Args:
sink_name: Name for the new log sink
destination: Destination for logs (e.g., storage.googleapis.com/my-bucket)
project_id: GCP project ID (defaults to configured project)
filter_str: Log filter expression (defaults to audit logs)
description: Optional description for the sink
"""
try:
# Get client from client_instances
client = client_instances.get_clients().logging
project_id = project_id or client_instances.get_project_id()
parent = f"projects/{project_id}"
# Create sink configuration
sink = logging_v2.LogSink(
name=sink_name,
destination=destination,
filter=filter_str,
description=description,
)
print(f"Creating log sink '{sink_name}' to export to {destination}...")
# Create the sink
response = client.create_sink(
request={
"parent": parent,
"sink": sink,
}
)
# Important: Recommend setting up IAM permissions
writer_identity = response.writer_identity
return json.dumps(
{
"status": "success",
"name": response.name,
"destination": response.destination,
"filter": response.filter,
"writer_identity": writer_identity,
"next_steps": f"Important: Grant {writer_identity} the appropriate permissions on the destination",
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
# Prompts
@mcp_instance.prompt()
def audit_log_investigation() -> str:
"""Prompt for investigating security incidents through audit logs"""
return """
I need to investigate a potential security incident in my Google Cloud project.
Please help me:
1. Determine what types of audit logs I should check (Admin Activity, Data Access, System Event)
2. Create an effective filter query to find relevant logs
3. Identify key fields to examine for signs of unusual activity
4. Suggest common indicators of potential security issues in audit logs
"""
@mcp_instance.prompt()
def log_export_setup() -> str:
"""Prompt for setting up log exports for compliance"""
return """
I need to set up log exports for compliance requirements.
Please help me:
1. Understand the different destinations available for log exports
2. Design an effective filter to capture all required audit events
3. Implement a log sink with appropriate permissions
4. Verify my setup is correctly capturing and exporting logs
"""
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_storage.py:
--------------------------------------------------------------------------------
```python
import json
import os
from typing import Dict, Optional
from services import client_instances
def register(mcp_instance):
"""Register all Cloud Storage resources and tools with the MCP instance."""
# Resources
@mcp_instance.resource("gcp://storage/{project_id}/buckets")
def list_buckets_resource(project_id: str = None) -> str:
"""List all Cloud Storage buckets in a project"""
try:
# Get client from client_instances
client = client_instances.get_clients().storage
project_id = project_id or client_instances.get_project_id()
buckets = client.list_buckets()
result = []
for bucket in buckets:
result.append(
{
"name": bucket.name,
"location": bucket.location,
"storage_class": bucket.storage_class,
"time_created": bucket.time_created.isoformat()
if bucket.time_created
else None,
"versioning_enabled": bucket.versioning_enabled,
"labels": dict(bucket.labels) if bucket.labels else {},
}
)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp_instance.resource("gcp://storage/{project_id}/bucket/{bucket_name}")
def get_bucket_resource(project_id: str = None, bucket_name: str = None) -> str:
"""Get details for a specific Cloud Storage bucket"""
try:
# Get client from client_instances
client = client_instances.get_clients().storage
project_id = project_id or client_instances.get_project_id()
bucket = client.get_bucket(bucket_name)
result = {
"name": bucket.name,
"location": bucket.location,
"storage_class": bucket.storage_class,
"time_created": bucket.time_created.isoformat()
if bucket.time_created
else None,
"versioning_enabled": bucket.versioning_enabled,
"requester_pays": bucket.requester_pays,
"lifecycle_rules": bucket.lifecycle_rules,
"cors": bucket.cors,
"labels": dict(bucket.labels) if bucket.labels else {},
}
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp_instance.resource("gcp://storage/{project_id}/bucket/{bucket_name}/objects")
def list_objects_resource(project_id: str = None, bucket_name: str = None) -> str:
"""List objects in a specific Cloud Storage bucket"""
prefix = "" # Move it inside the function with a default value
try:
# Get client from client_instances
client = client_instances.get_clients().storage
project_id = project_id or client_instances.get_project_id()
bucket = client.get_bucket(bucket_name)
blobs = bucket.list_blobs(prefix=prefix)
result = []
for blob in blobs:
result.append(
{
"name": blob.name,
"size": blob.size,
"updated": blob.updated.isoformat() if blob.updated else None,
"content_type": blob.content_type,
"md5_hash": blob.md5_hash,
"generation": blob.generation,
"metadata": blob.metadata,
}
)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
# Tools
@mcp_instance.tool()
def create_bucket(
bucket_name: str,
project_id: str = None,
location: str = "us-central1",
storage_class: str = "STANDARD",
labels: Optional[Dict[str, str]] = None,
versioning_enabled: bool = False,
) -> str:
"""
Create a Cloud Storage bucket
Args:
bucket_name: Name for the new bucket
project_id: GCP project ID (defaults to configured project)
location: GCP region (e.g., us-central1)
storage_class: Storage class (STANDARD, NEARLINE, COLDLINE, ARCHIVE)
labels: Optional key-value pairs for bucket labels
versioning_enabled: Whether to enable object versioning
"""
try:
# Get client from client_instances
client = client_instances.get_clients().storage
project_id = project_id or client_instances.get_project_id()
# Validate storage class
valid_storage_classes = ["STANDARD", "NEARLINE", "COLDLINE", "ARCHIVE"]
if storage_class not in valid_storage_classes:
return json.dumps(
{
"status": "error",
"message": f"Invalid storage class: {storage_class}. Valid classes are: {', '.join(valid_storage_classes)}",
},
indent=2,
)
# Log info (similar to ctx.info)
print(f"Creating bucket {bucket_name} in {location}...")
bucket = client.bucket(bucket_name)
bucket.create(location=location, storage_class=storage_class, labels=labels)
# Set versioning if enabled
if versioning_enabled:
bucket.versioning_enabled = True
bucket.patch()
return json.dumps(
{
"status": "success",
"name": bucket.name,
"location": bucket.location,
"storage_class": bucket.storage_class,
"time_created": bucket.time_created.isoformat()
if bucket.time_created
else None,
"versioning_enabled": bucket.versioning_enabled,
"url": f"gs://{bucket_name}/",
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def list_buckets(project_id: str = None, prefix: str = "") -> str:
"""
List Cloud Storage buckets in a project
Args:
project_id: GCP project ID (defaults to configured project)
prefix: Optional prefix to filter bucket names
"""
try:
# Get client from client_instances
client = client_instances.get_clients().storage
project_id = project_id or client_instances.get_project_id()
# Log info (similar to ctx.info)
print(f"Listing buckets in project {project_id}...")
# List buckets with optional prefix filter
if prefix:
buckets = [
b for b in client.list_buckets() if b.name.startswith(prefix)
]
else:
buckets = list(client.list_buckets())
result = []
for bucket in buckets:
result.append(
{
"name": bucket.name,
"location": bucket.location,
"storage_class": bucket.storage_class,
"time_created": bucket.time_created.isoformat()
if bucket.time_created
else None,
}
)
return json.dumps(
{"status": "success", "buckets": result, "count": len(result)}, indent=2
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def upload_object(
bucket_name: str,
source_file_path: str,
destination_blob_name: Optional[str] = None,
project_id: str = None,
content_type: Optional[str] = None,
metadata: Optional[Dict[str, str]] = None,
) -> str:
"""
Upload an object to a Cloud Storage bucket
Args:
bucket_name: Name of the bucket
source_file_path: Local path to the file to upload
destination_blob_name: Name to assign to the blob (defaults to file basename)
project_id: GCP project ID (defaults to configured project)
content_type: Content type of the object (optional)
metadata: Custom metadata dictionary (optional)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().storage
project_id = project_id or client_instances.get_project_id()
# Check if file exists
if not os.path.exists(source_file_path):
return json.dumps(
{
"status": "error",
"message": f"File not found: {source_file_path}",
},
indent=2,
)
# Get bucket
bucket = client.bucket(bucket_name)
# Use filename if destination_blob_name not provided
if not destination_blob_name:
destination_blob_name = os.path.basename(source_file_path)
# Create blob
blob = bucket.blob(destination_blob_name)
# Set content type if provided
if content_type:
blob.content_type = content_type
# Set metadata if provided
if metadata:
blob.metadata = metadata
# Get file size for progress reporting
file_size = os.path.getsize(source_file_path)
print(
f"Uploading {source_file_path} ({file_size} bytes) to gs://{bucket_name}/{destination_blob_name}..."
)
# Upload file
blob.upload_from_filename(source_file_path)
return json.dumps(
{
"status": "success",
"bucket": bucket_name,
"name": blob.name,
"size": blob.size,
"content_type": blob.content_type,
"md5_hash": blob.md5_hash,
"generation": blob.generation,
"public_url": blob.public_url,
"gsutil_uri": f"gs://{bucket_name}/{destination_blob_name}",
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def download_object(
bucket_name: str,
source_blob_name: str,
destination_file_path: str,
project_id: str = None,
) -> str:
"""
Download an object from a Cloud Storage bucket
Args:
bucket_name: Name of the bucket
source_blob_name: Name of the blob to download
destination_file_path: Local path where the file should be saved
project_id: GCP project ID (defaults to configured project)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().storage
project_id = project_id or client_instances.get_project_id()
# Get bucket and blob
bucket = client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
# Check if blob exists
if not blob.exists():
return json.dumps(
{
"status": "error",
"message": f"Object not found: gs://{bucket_name}/{source_blob_name}",
},
indent=2,
)
# Create directory if doesn't exist
os.makedirs(
os.path.dirname(os.path.abspath(destination_file_path)), exist_ok=True
)
print(
f"Downloading gs://{bucket_name}/{source_blob_name} to {destination_file_path}..."
)
# Download file
blob.download_to_filename(destination_file_path)
return json.dumps(
{
"status": "success",
"bucket": bucket_name,
"blob_name": source_blob_name,
"size": blob.size,
"content_type": blob.content_type,
"downloaded_to": destination_file_path,
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def delete_object(bucket_name: str, blob_name: str, project_id: str = None) -> str:
"""
Delete an object from a Cloud Storage bucket
Args:
bucket_name: Name of the bucket
blob_name: Name of the blob to delete
project_id: GCP project ID (defaults to configured project)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().storage
project_id = project_id or client_instances.get_project_id()
# Get bucket and blob
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
print(f"Deleting gs://{bucket_name}/{blob_name}...")
# Delete the blob
blob.delete()
return json.dumps(
{
"status": "success",
"message": f"Successfully deleted gs://{bucket_name}/{blob_name}",
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
# Prompts
@mcp_instance.prompt()
def create_bucket_prompt(location: str = "us-central1") -> str:
"""Prompt for creating a new Cloud Storage bucket"""
return f"""
I need to create a new Cloud Storage bucket in {location}.
Please help me with:
1. Understanding storage classes (STANDARD, NEARLINE, COLDLINE, ARCHIVE)
2. Best practices for bucket naming
3. When to enable versioning
4. Recommendations for bucket security settings
5. Steps to create the bucket
"""
@mcp_instance.prompt()
def upload_file_prompt() -> str:
"""Prompt for help with uploading files to Cloud Storage"""
return """
I need to upload files to a Cloud Storage bucket.
Please help me understand:
1. The best way to organize files in Cloud Storage
2. When to use folders/prefixes
3. How to set appropriate permissions on uploaded files
4. How to make files publicly accessible (if needed)
5. The steps to perform the upload
"""
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/artifact_registry.py:
--------------------------------------------------------------------------------
```python
import json
from services import client_instances
def register(mcp_instance):
"""Register all resources and tools with the MCP instance."""
@mcp_instance.resource(
"gcp://artifactregistry/{project_id}/{location}/repositories"
)
def list_repositories_resource(project_id: str = None, location: str = None) -> str:
"""List all Artifact Registry repositories in a specific location"""
try:
# define the client
client = client_instances.get_clients().artifactregistry
project_id = project_id or client_instances.get_project_id()
location = location or client_instances.get_location()
# Use parameters directly from URL path
parent = f"projects/{project_id}/locations/{location}"
repositories = client.list_repositories(parent=parent)
result = []
for repo in repositories:
result.append(
{
"name": repo.name.split("/")[-1],
"format": repo.format.name
if hasattr(repo.format, "name")
else str(repo.format),
"description": repo.description,
"create_time": repo.create_time.isoformat()
if repo.create_time
else None,
"update_time": repo.update_time.isoformat()
if repo.update_time
else None,
"kms_key_name": repo.kms_key_name,
"labels": dict(repo.labels) if repo.labels else {},
}
)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
# Add a tool for creating repositories
@mcp_instance.tool()
def create_artifact_repository(
name: str, format: str, description: str = ""
) -> str:
"""Create a new Artifact Registry repository"""
try:
# define the client
client = client_instances.get_clients().artifactregistry
project_id = client_instances.get_project_id()
location = client_instances.get_location()
parent = f"projects/{project_id}/locations/{location}"
# Create repository request
from google.cloud.artifactregistry_v1 import (
CreateRepositoryRequest,
Repository,
)
# Create the repository format enum value
if format.upper() not in ["DOCKER", "MAVEN", "NPM", "PYTHON", "APT", "YUM"]:
return json.dumps(
{
"error": f"Invalid format: {format}. Must be one of: DOCKER, MAVEN, NPM, PYTHON, APT, YUM"
},
indent=2,
)
repo = Repository(
name=f"{parent}/repositories/{name}",
format=getattr(Repository.Format, format.upper()),
description=description,
)
request = CreateRepositoryRequest(
parent=parent, repository_id=name, repository=repo
)
operation = client.create_repository(request=request)
result = operation.result() # Wait for operation to complete
return json.dumps(
{
"name": result.name.split("/")[-1],
"format": result.format.name
if hasattr(result.format, "name")
else str(result.format),
"description": result.description,
"create_time": result.create_time.isoformat()
if result.create_time
else None,
"update_time": result.update_time.isoformat()
if result.update_time
else None,
},
indent=2,
)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
# @mcp_instance.tool()
# def get_artifact_repository(
# project_id: str = None,
# location: str = None,
# repository_id: str = None,
# ) -> str:
# """Get details about a specific Artifact Registry repository"""
# try:
# # Get the client from the context
# clients = ctx.lifespan_context["clients"]
# client = clients.artifactregistry
# # Use context values if parameters not provided
# project_id = project_id or ctx.lifespan_context["project_id"]
# location = location or ctx.lifespan_context["location"]
# if not repository_id:
# return json.dumps({"error": "Repository ID is required"}, indent=2)
# name = f"projects/{project_id}/locations/{location}/repositories/{repository_id}"
# repo = client.get_repository(name=name)
# result = {
# "name": repo.name.split("/")[-1],
# "format": repo.format.name
# if hasattr(repo.format, "name")
# else str(repo.format),
# "description": repo.description,
# "create_time": repo.create_time.isoformat()
# if repo.create_time
# else None,
# "update_time": repo.update_time.isoformat()
# if repo.update_time
# else None,
# "kms_key_name": repo.kms_key_name,
# "labels": dict(repo.labels) if repo.labels else {},
# }
# return json.dumps(result, indent=2)
# except Exception as e:
# return json.dumps({"error": str(e)}, indent=2)
# @mcp_instance.tool()
# def upload_artifact(
# repo_name: str,
# artifact_path: str,
# package: str = "",
# version: str = "",
# project_id: str = None,
# location: str = None,
# ) -> str:
# """
# Upload an artifact to Artifact Registry
# Args:
# project_id: GCP project ID
# location: GCP region (e.g., us-central1)
# repo_name: Name of the repository
# artifact_path: Local path to the artifact file
# package: Package name (optional)
# version: Version string (optional)
# """
# import os
# try:
# # Get the client from the context
# clients = ctx.lifespan_context["clients"]
# client = clients.artifactregistry
# # Use context values if parameters not provided
# project_id = project_id or ctx.lifespan_context["project_id"]
# location = location or ctx.lifespan_context["location"]
# if not os.path.exists(artifact_path):
# return json.dumps(
# {"status": "error", "message": f"File not found: {artifact_path}"}
# )
# filename = os.path.basename(artifact_path)
# file_size = os.path.getsize(artifact_path)
# parent = (
# f"projects/{project_id}/locations/{location}/repositories/{repo_name}"
# )
# with open(artifact_path, "rb") as f:
# file_contents = f.read()
# # Use the standard client for upload
# result = client.upload_artifact(
# parent=parent, contents=file_contents, artifact_path=filename
# )
# return json.dumps(
# {
# "status": "success",
# "uri": result.uri if hasattr(result, "uri") else None,
# "message": f"Successfully uploaded {filename}",
# },
# indent=2,
# )
# except Exception as e:
# return json.dumps({"status": "error", "message": str(e)}, indent=2)
# # Prompts
# @mcp_instance.prompt()
# def create_repository_prompt(location: str = "us-central1") -> str:
# """Prompt for creating a new Artifact Registry repository"""
# return f"""
# I need to create a new Artifact Registry repository in {location}.
# Please help me with:
# 1. What formats are available (Docker, NPM, Maven, etc.)
# 2. Best practices for naming repositories
# 3. Recommendations for labels I should apply
# 4. Steps to create the repository
# """
# @mcp_instance.prompt()
# def upload_artifact_prompt(repo_format: str = "DOCKER") -> str:
# """Prompt for help with uploading artifacts"""
# return f"""
# I need to upload a {repo_format.lower()} artifact to my Artifact Registry repository.
# Please help me understand:
# 1. The recommended way to upload {repo_format.lower()} artifacts
# 2. Any naming conventions I should follow
# 3. How to ensure proper versioning
# 4. The steps to perform the upload
# """
# import json
# import os
# from typing import Optional
# from google.cloud import artifactregistry_v1
# from mcp.server.fastmcp import Context
# # Instead of importing from core.server
# # Resources
# @mcp.resource("gcp://artifactregistry/{project_id}/{location}/repositories")
# def list_repositories_resource(
# project_id: str = None, location: str = None, ctx: Context = None
# ) -> str:
# """List all Artifact Registry repositories in a specific location"""
# client = ctx.clients.artifactregistry
# project_id = project_id or ctx.clients.project_id
# location = location or ctx.clients.location
# parent = f"projects/{project_id}/locations/{location}"
# repositories = client.list_repositories(parent=parent)
# result = []
# for repo in repositories:
# result.append(
# {
# "name": repo.name.split("/")[-1],
# "format": repo.format.name,
# "description": repo.description,
# "create_time": repo.create_time.isoformat()
# if repo.create_time
# else None,
# "update_time": repo.update_time.isoformat()
# if repo.update_time
# else None,
# "kms_key_name": repo.kms_key_name,
# "labels": dict(repo.labels) if repo.labels else {},
# }
# )
# return json.dumps(result, indent=2)
# @mcp.resource("gcp://artifactregistry/{project_id}/{location}/repository/{repo_name}")
# def get_repository_resource(
# repo_name: str, project_id: str = None, location: str = None, ctx: Context = None
# ) -> str:
# """Get details for a specific Artifact Registry repository"""
# client = ctx.clients.artifactregistry
# project_id = project_id or ctx.clients.project_id
# location = location or ctx.clients.location
# name = f"projects/{project_id}/locations/{location}/repositories/{repo_name}"
# try:
# repo = client.get_repository(name=name)
# result = {
# "name": repo.name.split("/")[-1],
# "format": repo.format.name,
# "description": repo.description,
# "create_time": repo.create_time.isoformat() if repo.create_time else None,
# "update_time": repo.update_time.isoformat() if repo.update_time else None,
# "kms_key_name": repo.kms_key_name,
# "labels": dict(repo.labels) if repo.labels else {},
# }
# return json.dumps(result, indent=2)
# except Exception as e:
# return json.dumps({"error": str(e)})
# @mcp.resource(
# "gcp://artifactregistry/{project_id}/{location}/repository/{repo_name}/packages"
# )
# def list_packages_resource(
# repo_name: str, project_id: str = None, location: str = None, ctx: Context = None
# ) -> str:
# """List packages in a specific Artifact Registry repository"""
# client = ctx.clients.artifactregistry
# project_id = project_id or ctx.clients.project_id
# location = location or ctx.clients.location
# parent = f"projects/{project_id}/locations/{location}/repositories/{repo_name}"
# packages = client.list_packages(parent=parent)
# result = []
# for package in packages:
# result.append(
# {
# "name": package.name.split("/")[-1],
# "display_name": package.display_name,
# "create_time": package.create_time.isoformat()
# if package.create_time
# else None,
# "update_time": package.update_time.isoformat()
# if package.update_time
# else None,
# }
# )
# return json.dumps(result, indent=2)
# # Tools
# @mcp.tool()
# async def create_repository(
# repo_name: str,
# format: str = "DOCKER",
# description: str = "",
# labels: Optional[dict] = None,
# project_id: str = None,
# location: str = None,
# ctx: Context = None,
# ) -> str:
# """
# Create an Artifact Registry repository
# Args:
# project_id: GCP project ID
# location: GCP region (e.g., us-central1)
# repo_name: Name for the new repository
# format: Repository format (DOCKER, NPM, PYTHON, MAVEN, APT, YUM, GO)
# description: Optional description for the repository
# labels: Optional key-value pairs for repository labels
# """
# client = ctx.clients.artifactregistry
# project_id = project_id or ctx.clients.project_id
# location = location or ctx.clients.location
# # Validate format
# try:
# format_enum = artifactregistry_v1.Repository.Format[format]
# except KeyError:
# return json.dumps(
# {
# "status": "error",
# "message": f"Invalid format: {format}. Valid formats are: {', '.join(artifactregistry_v1.Repository.Format._member_names_)}",
# }
# )
# # Create repository
# parent = f"projects/{project_id}/locations/{location}"
# repository = artifactregistry_v1.Repository(
# format=format_enum,
# description=description,
# )
# if labels:
# repository.labels = labels
# request = artifactregistry_v1.CreateRepositoryRequest(
# parent=parent, repository_id=repo_name, repository=repository
# )
# try:
# ctx.info(f"Creating repository {repo_name} in {location}...")
# response = client.create_repository(request=request)
# return json.dumps(
# {
# "status": "success",
# "name": response.name,
# "format": response.format.name,
# "description": response.description,
# "create_time": response.create_time.isoformat()
# if response.create_time
# else None,
# },
# indent=2,
# )
# except Exception as e:
# return json.dumps({"status": "error", "message": str(e)})
# @mcp.tool()
# async def list_repositories(
# project_id: str = None, location: str = None, ctx: Context = None
# ) -> str:
# """
# List Artifact Registry repositories in a specific location
# Args:
# project_id: GCP project ID
# location: GCP region (e.g., us-central1)
# """
# client = ctx.clients.artifactregistry
# project_id = project_id or ctx.clients.project_id
# location = location or ctx.clients.location
# parent = f"projects/{project_id}/locations/{location}"
# try:
# ctx.info(f"Listing repositories in {location}...")
# repositories = client.list_repositories(parent=parent)
# result = []
# for repo in repositories:
# result.append(
# {
# "name": repo.name.split("/")[-1],
# "format": repo.format.name,
# "description": repo.description,
# "create_time": repo.create_time.isoformat()
# if repo.create_time
# else None,
# }
# )
# return json.dumps(
# {"status": "success", "repositories": result, "count": len(result)},
# indent=2,
# )
# except Exception as e:
# return json.dumps({"status": "error", "message": str(e)})
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_bigquery.py:
--------------------------------------------------------------------------------
```python
import json
from typing import Any, Dict, List, Optional
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from services import client_instances
def register(mcp_instance):
"""Register all BigQuery resources and tools with the MCP instance."""
# Resources
@mcp_instance.resource("gcp://bigquery/{project_id}/datasets")
def list_datasets_resource(project_id: str = None) -> str:
"""List all BigQuery datasets in a project"""
try:
# Get client from client_instances
client = client_instances.get_clients().bigquery
project_id = project_id or client_instances.get_project_id()
datasets = list(client.list_datasets())
result = []
for dataset in datasets:
result.append(
{
"id": dataset.dataset_id,
"full_id": dataset.full_dataset_id,
"friendly_name": dataset.friendly_name,
"location": dataset.location,
"labels": dict(dataset.labels) if dataset.labels else {},
"created": dataset.created.isoformat()
if dataset.created
else None,
}
)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp_instance.resource("gcp://bigquery/{project_id}/dataset/{dataset_id}")
def get_dataset_resource(project_id: str = None, dataset_id: str = None) -> str:
"""Get details for a specific BigQuery dataset"""
try:
# Get client from client_instances
client = client_instances.get_clients().bigquery
project_id = project_id or client_instances.get_project_id()
dataset_ref = client.dataset(dataset_id)
dataset = client.get_dataset(dataset_ref)
result = {
"id": dataset.dataset_id,
"full_id": dataset.full_dataset_id,
"friendly_name": dataset.friendly_name,
"description": dataset.description,
"location": dataset.location,
"labels": dict(dataset.labels) if dataset.labels else {},
"created": dataset.created.isoformat() if dataset.created else None,
"modified": dataset.modified.isoformat() if dataset.modified else None,
"default_table_expiration_ms": dataset.default_table_expiration_ms,
"default_partition_expiration_ms": dataset.default_partition_expiration_ms,
}
return json.dumps(result, indent=2)
except NotFound:
return json.dumps({"error": f"Dataset {dataset_id} not found"}, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp_instance.resource("gcp://bigquery/{project_id}/dataset/{dataset_id}/tables")
def list_tables_resource(project_id: str = None, dataset_id: str = None) -> str:
"""List all tables in a BigQuery dataset"""
try:
# Get client from client_instances
client = client_instances.get_clients().bigquery
project_id = project_id or client_instances.get_project_id()
tables = list(client.list_tables(dataset_id))
result = []
for table in tables:
result.append(
{
"id": table.table_id,
"full_id": f"{table.project}.{table.dataset_id}.{table.table_id}",
"table_type": table.table_type,
}
)
return json.dumps(result, indent=2)
except NotFound:
return json.dumps({"error": f"Dataset {dataset_id} not found"}, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp_instance.resource(
"gcp://bigquery/{project_id}/dataset/{dataset_id}/table/{table_id}"
)
def get_table_resource(
project_id: str = None, dataset_id: str = None, table_id: str = None
) -> str:
"""Get details for a specific BigQuery table"""
try:
# Get client from client_instances
client = client_instances.get_clients().bigquery
project_id = project_id or client_instances.get_project_id()
table_ref = client.dataset(dataset_id).table(table_id)
table = client.get_table(table_ref)
# Extract schema information
schema_fields = []
for field in table.schema:
schema_fields.append(
{
"name": field.name,
"type": field.field_type,
"mode": field.mode,
"description": field.description,
}
)
result = {
"id": table.table_id,
"full_id": f"{table.project}.{table.dataset_id}.{table.table_id}",
"friendly_name": table.friendly_name,
"description": table.description,
"num_rows": table.num_rows,
"num_bytes": table.num_bytes,
"table_type": table.table_type,
"created": table.created.isoformat() if table.created else None,
"modified": table.modified.isoformat() if table.modified else None,
"expires": table.expires.isoformat() if table.expires else None,
"schema": schema_fields,
"labels": dict(table.labels) if table.labels else {},
}
return json.dumps(result, indent=2)
except NotFound:
return json.dumps(
{"error": f"Table {dataset_id}.{table_id} not found"}, indent=2
)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
# Tools
@mcp_instance.tool()
def run_query(
query: str,
project_id: str = None,
location: str = None,
max_results: int = 100,
use_legacy_sql: bool = False,
timeout_ms: int = 30000,
) -> str:
"""
Run a BigQuery query and return the results
Args:
query: SQL query to execute
project_id: GCP project ID (defaults to configured project)
location: Optional BigQuery location (us, eu, etc.)
max_results: Maximum number of rows to return
use_legacy_sql: Whether to use legacy SQL syntax
timeout_ms: Query timeout in milliseconds
"""
try:
# Get client from client_instances
client = client_instances.get_clients().bigquery
project_id = project_id or client_instances.get_project_id()
location = location or client_instances.get_location()
job_config = bigquery.QueryJobConfig(
use_legacy_sql=use_legacy_sql,
)
# Log info (similar to ctx.info)
print(f"Running query: {query[:100]}...")
query_job = client.query(
query,
job_config=job_config,
location=location,
timeout=timeout_ms / 1000.0,
)
# Wait for the query to complete
results = query_job.result(max_results=max_results)
# Get the schema
schema = [field.name for field in results.schema]
# Convert rows to a list of dictionaries
rows = []
for row in results:
row_dict = {}
for key in schema:
value = row[key]
if hasattr(value, "isoformat"): # Handle datetime objects
row_dict[key] = value.isoformat()
else:
row_dict[key] = value
rows.append(row_dict)
# Create summary statistics
stats = {
"total_rows": query_job.total_rows,
"total_bytes_processed": query_job.total_bytes_processed,
"total_bytes_billed": query_job.total_bytes_billed,
"billing_tier": query_job.billing_tier,
"created": query_job.created.isoformat() if query_job.created else None,
"started": query_job.started.isoformat() if query_job.started else None,
"ended": query_job.ended.isoformat() if query_job.ended else None,
"duration_ms": (query_job.ended - query_job.started).total_seconds()
* 1000
if query_job.started and query_job.ended
else None,
}
return json.dumps(
{
"status": "success",
"schema": schema,
"rows": rows,
"returned_rows": len(rows),
"stats": stats,
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def create_dataset(
dataset_id: str,
project_id: str = None,
location: str = None,
description: str = "",
friendly_name: str = None,
labels: Optional[Dict[str, str]] = None,
default_table_expiration_ms: Optional[int] = None,
) -> str:
"""
Create a new BigQuery dataset
Args:
dataset_id: ID for the new dataset
project_id: GCP project ID (defaults to configured project)
location: Dataset location (US, EU, asia-northeast1, etc.)
description: Optional dataset description
friendly_name: Optional user-friendly name
labels: Optional key-value pairs for dataset labels
default_table_expiration_ms: Default expiration time for tables in milliseconds
"""
try:
# Get client from client_instances
client = client_instances.get_clients().bigquery
project_id = project_id or client_instances.get_project_id()
location = location or client_instances.get_location()
dataset = bigquery.Dataset(f"{project_id}.{dataset_id}")
dataset.location = location
if description:
dataset.description = description
if friendly_name:
dataset.friendly_name = friendly_name
if labels:
dataset.labels = labels
if default_table_expiration_ms:
dataset.default_table_expiration_ms = default_table_expiration_ms
# Log info (similar to ctx.info)
print(f"Creating dataset {dataset_id} in {location}...")
dataset = client.create_dataset(dataset)
return json.dumps(
{
"status": "success",
"dataset_id": dataset.dataset_id,
"full_dataset_id": dataset.full_dataset_id,
"location": dataset.location,
"created": dataset.created.isoformat() if dataset.created else None,
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def create_table(
dataset_id: str,
table_id: str,
schema_fields: List[Dict[str, Any]],
project_id: str = None,
description: str = "",
friendly_name: str = None,
expiration_ms: Optional[int] = None,
labels: Optional[Dict[str, str]] = None,
clustering_fields: Optional[List[str]] = None,
time_partitioning_field: Optional[str] = None,
time_partitioning_type: str = "DAY",
) -> str:
"""
Create a new BigQuery table
Args:
dataset_id: Dataset ID where the table will be created
table_id: ID for the new table
schema_fields: List of field definitions, each with name, type, mode, and description
project_id: GCP project ID (defaults to configured project)
description: Optional table description
friendly_name: Optional user-friendly name
expiration_ms: Optional table expiration time in milliseconds
labels: Optional key-value pairs for table labels
clustering_fields: Optional list of fields to cluster by
time_partitioning_field: Optional field to use for time-based partitioning
time_partitioning_type: Partitioning type (DAY, HOUR, MONTH, YEAR)
Example schema_fields:
[
{"name": "name", "type": "STRING", "mode": "REQUIRED", "description": "Name field"},
{"name": "age", "type": "INTEGER", "mode": "NULLABLE", "description": "Age field"}
]
"""
try:
# Get client from client_instances
client = client_instances.get_clients().bigquery
project_id = project_id or client_instances.get_project_id()
# Convert schema_fields to SchemaField objects
schema = []
for field in schema_fields:
schema.append(
bigquery.SchemaField(
name=field["name"],
field_type=field["type"],
mode=field.get("mode", "NULLABLE"),
description=field.get("description", ""),
)
)
# Create table reference
table_ref = client.dataset(dataset_id).table(table_id)
table = bigquery.Table(table_ref, schema=schema)
# Set table properties
if description:
table.description = description
if friendly_name:
table.friendly_name = friendly_name
if expiration_ms:
table.expires = expiration_ms
if labels:
table.labels = labels
# Set clustering if specified
if clustering_fields:
table.clustering_fields = clustering_fields
# Set time partitioning if specified
if time_partitioning_field:
if time_partitioning_type == "DAY":
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field=time_partitioning_field,
)
elif time_partitioning_type == "HOUR":
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.HOUR,
field=time_partitioning_field,
)
elif time_partitioning_type == "MONTH":
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.MONTH,
field=time_partitioning_field,
)
elif time_partitioning_type == "YEAR":
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.YEAR,
field=time_partitioning_field,
)
# Log info (similar to ctx.info)
print(f"Creating table {dataset_id}.{table_id}...")
table = client.create_table(table)
return json.dumps(
{
"status": "success",
"table_id": table.table_id,
"full_table_id": f"{table.project}.{table.dataset_id}.{table.table_id}",
"created": table.created.isoformat() if table.created else None,
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def delete_table(dataset_id: str, table_id: str, project_id: str = None) -> str:
"""
Delete a BigQuery table
Args:
dataset_id: Dataset ID containing the table
table_id: ID of the table to delete
project_id: GCP project ID (defaults to configured project)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().bigquery
project_id = project_id or client_instances.get_project_id()
table_ref = client.dataset(dataset_id).table(table_id)
# Log info (similar to ctx.info)
print(f"Deleting table {dataset_id}.{table_id}...")
client.delete_table(table_ref)
return json.dumps(
{
"status": "success",
"message": f"Table {project_id}.{dataset_id}.{table_id} successfully deleted",
},
indent=2,
)
except NotFound:
return json.dumps(
{
"status": "error",
"message": f"Table {project_id}.{dataset_id}.{table_id} not found",
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def load_table_from_json(
dataset_id: str,
table_id: str,
json_data: List[Dict[str, Any]],
project_id: str = None,
schema_fields: Optional[List[Dict[str, Any]]] = None,
write_disposition: str = "WRITE_APPEND",
) -> str:
"""
Load data into a BigQuery table from JSON data
Args:
dataset_id: Dataset ID containing the table
table_id: ID of the table to load data into
json_data: List of dictionaries representing rows to insert
project_id: GCP project ID (defaults to configured project)
schema_fields: Optional schema definition (if not using existing table schema)
write_disposition: How to handle existing data (WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().bigquery
project_id = project_id or client_instances.get_project_id()
table_ref = client.dataset(dataset_id).table(table_id)
# Convert write_disposition to the appropriate enum
if write_disposition == "WRITE_TRUNCATE":
disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
elif write_disposition == "WRITE_APPEND":
disposition = bigquery.WriteDisposition.WRITE_APPEND
elif write_disposition == "WRITE_EMPTY":
disposition = bigquery.WriteDisposition.WRITE_EMPTY
else:
return json.dumps(
{
"status": "error",
"message": f"Invalid write_disposition: {write_disposition}. Use WRITE_TRUNCATE, WRITE_APPEND, or WRITE_EMPTY.",
},
indent=2,
)
# Create job config
job_config = bigquery.LoadJobConfig(
write_disposition=disposition,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
)
# Set schema if provided
if schema_fields:
schema = []
for field in schema_fields:
schema.append(
bigquery.SchemaField(
name=field["name"],
field_type=field["type"],
mode=field.get("mode", "NULLABLE"),
description=field.get("description", ""),
)
)
job_config.schema = schema
# Convert JSON data to newline-delimited JSON (not needed but keeping the log)
print(f"Loading {len(json_data)} rows into {dataset_id}.{table_id}...")
# Create and run the load job
load_job = client.load_table_from_json(
json_data, table_ref, job_config=job_config
)
# Wait for the job to complete
load_job.result()
# Get updated table info
table = client.get_table(table_ref)
return json.dumps(
{
"status": "success",
"rows_loaded": len(json_data),
"total_rows": table.num_rows,
"message": f"Successfully loaded data into {project_id}.{dataset_id}.{table_id}",
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def export_table_to_csv(
dataset_id: str,
table_id: str,
destination_uri: str,
project_id: str = None,
print_header: bool = True,
field_delimiter: str = ",",
) -> str:
"""
Export a BigQuery table to Cloud Storage as CSV
Args:
dataset_id: Dataset ID containing the table
table_id: ID of the table to export
destination_uri: GCS URI (gs://bucket/path)
project_id: GCP project ID (defaults to configured project)
print_header: Whether to include column headers
field_delimiter: Delimiter character for fields
"""
try:
# Get client from client_instances
client = client_instances.get_clients().bigquery
project_id = project_id or client_instances.get_project_id()
table_ref = client.dataset(dataset_id).table(table_id)
# Validate destination URI
if not destination_uri.startswith("gs://"):
return json.dumps(
{
"status": "error",
"message": "destination_uri must start with gs://",
},
indent=2,
)
job_config = bigquery.ExtractJobConfig()
job_config.destination_format = bigquery.DestinationFormat.CSV
job_config.print_header = print_header
job_config.field_delimiter = field_delimiter
# Log info (similar to ctx.info)
print(f"Exporting {dataset_id}.{table_id} to {destination_uri}...")
# Create and run the extract job
extract_job = client.extract_table(
table_ref, destination_uri, job_config=job_config
)
# Wait for the job to complete
extract_job.result()
return json.dumps(
{
"status": "success",
"destination": destination_uri,
"message": f"Successfully exported {project_id}.{dataset_id}.{table_id} to {destination_uri}",
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
# Prompts
@mcp_instance.prompt()
def create_dataset_prompt() -> str:
"""Prompt for creating a new BigQuery dataset"""
return """
I need to create a new BigQuery dataset.
Please help me with:
1. Choosing an appropriate location for my dataset
2. Understanding dataset naming conventions
3. Best practices for dataset configuration (expiration, labels, etc.)
4. The process to create the dataset
"""
@mcp_instance.prompt()
def query_optimization_prompt() -> str:
"""Prompt for BigQuery query optimization help"""
return """
I have a BigQuery query that's slow or expensive to run.
Please help me optimize it by:
1. Analyzing key factors that affect BigQuery performance
2. Identifying common patterns that lead to inefficient queries
3. Suggesting specific optimization techniques
4. Helping me understand how to use EXPLAIN plan analysis
"""
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_run.py:
--------------------------------------------------------------------------------
```python
import json
from typing import Dict, Optional
from google.cloud import run_v2
from google.protobuf import field_mask_pb2
from services import client_instances
def register(mcp_instance):
"""Register all Cloud Run resources and tools with the MCP instance."""
# Resources
@mcp_instance.resource("gcp://cloudrun/{project_id}/{region}/services")
def list_services_resource(project_id: str = None, region: str = None) -> str:
"""List all Cloud Run services in a specific region"""
try:
# Get client from client_instances
client = client_instances.get_clients().run
project_id = project_id or client_instances.get_project_id()
region = region or client_instances.get_location()
parent = f"projects/{project_id}/locations/{region}"
services = client.list_services(parent=parent)
result = []
for service in services:
result.append(
{
"name": service.name.split("/")[-1],
"uid": service.uid,
"generation": service.generation,
"labels": dict(service.labels) if service.labels else {},
"annotations": dict(service.annotations)
if service.annotations
else {},
"create_time": service.create_time.isoformat()
if service.create_time
else None,
"update_time": service.update_time.isoformat()
if service.update_time
else None,
"uri": service.uri,
}
)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp_instance.resource("gcp://run/{project_id}/{location}/service/{service_name}")
def get_service_resource(
service_name: str, project_id: str = None, location: str = None
) -> str:
"""Get details for a specific Cloud Run service"""
try:
# Get client from client_instances
client = client_instances.get_clients().run
project_id = project_id or client_instances.get_project_id()
location = location or client_instances.get_location()
name = f"projects/{project_id}/locations/{location}/services/{service_name}"
service = client.get_service(name=name)
# Extract container details
containers = []
if service.template and service.template.containers:
for container in service.template.containers:
container_info = {
"image": container.image,
"command": list(container.command) if container.command else [],
"args": list(container.args) if container.args else [],
"env": [
{"name": env.name, "value": env.value}
for env in container.env
]
if container.env
else [],
"resources": {
"limits": dict(container.resources.limits)
if container.resources and container.resources.limits
else {},
"cpu_idle": container.resources.cpu_idle
if container.resources
else None,
}
if container.resources
else {},
"ports": [
{"name": port.name, "container_port": port.container_port}
for port in container.ports
]
if container.ports
else [],
}
containers.append(container_info)
# Extract scaling details
scaling = None
if service.template and service.template.scaling:
scaling = {
"min_instance_count": service.template.scaling.min_instance_count,
"max_instance_count": service.template.scaling.max_instance_count,
}
result = {
"name": service.name.split("/")[-1],
"uid": service.uid,
"generation": service.generation,
"labels": dict(service.labels) if service.labels else {},
"annotations": dict(service.annotations) if service.annotations else {},
"create_time": service.create_time.isoformat()
if service.create_time
else None,
"update_time": service.update_time.isoformat()
if service.update_time
else None,
"creator": service.creator,
"last_modifier": service.last_modifier,
"client": service.client,
"client_version": service.client_version,
"ingress": service.ingress.name if service.ingress else None,
"launch_stage": service.launch_stage.name
if service.launch_stage
else None,
"traffic": [
{
"type": traffic.type_.name if traffic.type_ else None,
"revision": traffic.revision.split("/")[-1]
if traffic.revision
else None,
"percent": traffic.percent,
"tag": traffic.tag,
}
for traffic in service.traffic
]
if service.traffic
else [],
"uri": service.uri,
"template": {
"containers": containers,
"execution_environment": service.template.execution_environment.name
if service.template and service.template.execution_environment
else None,
"vpc_access": {
"connector": service.template.vpc_access.connector,
"egress": service.template.vpc_access.egress.name
if service.template.vpc_access.egress
else None,
}
if service.template and service.template.vpc_access
else None,
"scaling": scaling,
"timeout": f"{service.template.timeout.seconds}s {service.template.timeout.nanos}ns"
if service.template and service.template.timeout
else None,
"service_account": service.template.service_account
if service.template
else None,
}
if service.template
else {},
}
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp_instance.resource(
"gcp://run/{project_id}/{location}/service/{service_name}/revisions"
)
def list_revisions_resource(
service_name: str, project_id: str = None, location: str = None
) -> str:
"""List revisions for a specific Cloud Run service"""
try:
# Get client from client_instances
client = client_instances.get_clients().run
project_id = project_id or client_instances.get_project_id()
location = location or client_instances.get_location()
parent = f"projects/{project_id}/locations/{location}"
# List all revisions
revisions = client.list_revisions(parent=parent)
# Filter revisions for the specified service
service_revisions = []
for revision in revisions:
# Check if this revision belongs to the specified service
if service_name in revision.name:
service_revisions.append(
{
"name": revision.name.split("/")[-1],
"uid": revision.uid,
"generation": revision.generation,
"service": revision.service.split("/")[-1]
if revision.service
else None,
"create_time": revision.create_time.isoformat()
if revision.create_time
else None,
"update_time": revision.update_time.isoformat()
if revision.update_time
else None,
"scaling": {
"min_instance_count": revision.scaling.min_instance_count
if revision.scaling
else None,
"max_instance_count": revision.scaling.max_instance_count
if revision.scaling
else None,
}
if revision.scaling
else None,
"conditions": [
{
"type": condition.type,
"state": condition.state.name
if condition.state
else None,
"message": condition.message,
"last_transition_time": condition.last_transition_time.isoformat()
if condition.last_transition_time
else None,
"severity": condition.severity.name
if condition.severity
else None,
}
for condition in revision.conditions
]
if revision.conditions
else [],
}
)
return json.dumps(service_revisions, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
# Tools
@mcp_instance.tool()
def create_service(
service_name: str,
image: str,
project_id: str = None,
location: str = None,
env_vars: Optional[Dict[str, str]] = None,
memory_limit: str = "512Mi",
cpu_limit: str = "1",
min_instances: int = 0,
max_instances: int = 100,
port: int = 8080,
allow_unauthenticated: bool = True,
service_account: str = None,
vpc_connector: str = None,
timeout_seconds: int = 300,
) -> str:
"""
Create a new Cloud Run service
Args:
service_name: Name for the new service
image: Container image to deploy (e.g., gcr.io/project/image:tag)
project_id: GCP project ID (defaults to configured project)
location: GCP region (e.g., us-central1) (defaults to configured location)
env_vars: Environment variables as key-value pairs
memory_limit: Memory limit (e.g., 512Mi, 1Gi)
cpu_limit: CPU limit (e.g., 1, 2)
min_instances: Minimum number of instances
max_instances: Maximum number of instances
port: Container port
allow_unauthenticated: Whether to allow unauthenticated access
service_account: Service account email
vpc_connector: VPC connector name
timeout_seconds: Request timeout in seconds
"""
try:
# Get client from client_instances
client = client_instances.get_clients().run
project_id = project_id or client_instances.get_project_id()
location = location or client_instances.get_location()
print(f"Creating Cloud Run service {service_name} in {location}...")
# Create container
container = run_v2.Container(
image=image,
resources=run_v2.ResourceRequirements(
limits={
"memory": memory_limit,
"cpu": cpu_limit,
}
),
)
# Add environment variables if provided
if env_vars:
container.env = [
run_v2.EnvVar(name=key, value=value)
for key, value in env_vars.items()
]
# Add port
container.ports = [run_v2.ContainerPort(container_port=port)]
# Create template
template = run_v2.RevisionTemplate(
containers=[container],
scaling=run_v2.RevisionScaling(
min_instance_count=min_instances,
max_instance_count=max_instances,
),
timeout=run_v2.Duration(seconds=timeout_seconds),
)
# Add service account if provided
if service_account:
template.service_account = service_account
# Add VPC connector if provided
if vpc_connector:
template.vpc_access = run_v2.VpcAccess(
connector=vpc_connector,
egress=run_v2.VpcAccess.VpcEgress.ALL_TRAFFIC,
)
# Create service
service = run_v2.Service(
template=template,
ingress=run_v2.IngressTraffic.INGRESS_TRAFFIC_ALL
if allow_unauthenticated
else run_v2.IngressTraffic.INGRESS_TRAFFIC_INTERNAL_ONLY,
)
# Create the service
parent = f"projects/{project_id}/locations/{location}"
operation = client.create_service(
parent=parent,
service_id=service_name,
service=service,
)
# Wait for the operation to complete
print(f"Waiting for service {service_name} to be created...")
result = operation.result()
return json.dumps(
{
"status": "success",
"name": result.name.split("/")[-1],
"uri": result.uri,
"create_time": result.create_time.isoformat()
if result.create_time
else None,
"update_time": result.update_time.isoformat()
if result.update_time
else None,
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def list_services(project_id: str = None, region: str = None) -> str:
"""
List Cloud Run services in a specific region
Args:
project_id: GCP project ID (defaults to configured project)
region: GCP region (e.g., us-central1) (defaults to configured location)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().run
project_id = project_id or client_instances.get_project_id()
region = region or client_instances.get_location()
parent = f"projects/{project_id}/locations/{region}"
print(f"Listing Cloud Run services in {region}...")
services = client.list_services(parent=parent)
result = []
for service in services:
result.append(
{
"name": service.name.split("/")[-1],
"uri": service.uri,
"create_time": service.create_time.isoformat()
if service.create_time
else None,
"update_time": service.update_time.isoformat()
if service.update_time
else None,
"labels": dict(service.labels) if service.labels else {},
}
)
return json.dumps(
{"status": "success", "services": result, "count": len(result)},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def update_service(
service_name: str,
project_id: str = None,
region: str = None,
image: Optional[str] = None,
memory: Optional[str] = None,
cpu: Optional[str] = None,
max_instances: Optional[int] = None,
min_instances: Optional[int] = None,
env_vars: Optional[Dict[str, str]] = None,
concurrency: Optional[int] = None,
timeout_seconds: Optional[int] = None,
service_account: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
) -> str:
"""
Update an existing Cloud Run service
Args:
service_name: Name of the service to update
project_id: GCP project ID (defaults to configured project)
region: GCP region (e.g., us-central1) (defaults to configured location)
image: New container image URL (optional)
memory: New memory allocation (optional)
cpu: New CPU allocation (optional)
max_instances: New maximum number of instances (optional)
min_instances: New minimum number of instances (optional)
env_vars: New environment variables (optional)
concurrency: New maximum concurrent requests per instance (optional)
timeout_seconds: New request timeout in seconds (optional)
service_account: New service account email (optional)
labels: New key-value labels to apply to the service (optional)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().run
project_id = project_id or client_instances.get_project_id()
region = region or client_instances.get_location()
# Get the existing service
name = f"projects/{project_id}/locations/{region}/services/{service_name}"
print(f"Getting current service configuration for {service_name}...")
service = client.get_service(name=name)
# Track which fields are being updated
update_mask_fields = []
# Update the container image if specified
if image and service.template and service.template.containers:
service.template.containers[0].image = image
update_mask_fields.append("template.containers[0].image")
# Update resources if specified
if (memory or cpu) and service.template and service.template.containers:
if not service.template.containers[0].resources:
service.template.containers[
0
].resources = run_v2.ResourceRequirements(limits={})
if not service.template.containers[0].resources.limits:
service.template.containers[0].resources.limits = {}
if memory:
service.template.containers[0].resources.limits["memory"] = memory
update_mask_fields.append(
"template.containers[0].resources.limits.memory"
)
if cpu:
service.template.containers[0].resources.limits["cpu"] = cpu
update_mask_fields.append(
"template.containers[0].resources.limits.cpu"
)
# Update scaling parameters
if max_instances is not None and service.template:
service.template.max_instance_count = max_instances
update_mask_fields.append("template.max_instance_count")
if min_instances is not None and service.template:
service.template.min_instance_count = min_instances
update_mask_fields.append("template.min_instance_count")
# Update concurrency
if concurrency is not None and service.template:
service.template.max_instance_request_concurrency = concurrency
update_mask_fields.append("template.max_instance_request_concurrency")
# Update timeout
if timeout_seconds is not None and service.template:
service.template.timeout = {"seconds": timeout_seconds}
update_mask_fields.append("template.timeout")
# Update service account
if service_account is not None and service.template:
service.template.service_account = service_account
update_mask_fields.append("template.service_account")
# Update environment variables
if (
env_vars is not None
and service.template
and service.template.containers
):
# Create new env var list
new_env_vars = [
run_v2.EnvVar(name=key, value=value)
for key, value in env_vars.items()
]
service.template.containers[0].env = new_env_vars
update_mask_fields.append("template.containers[0].env")
# Update labels
if labels is not None:
service.labels = labels
update_mask_fields.append("labels")
# Only proceed if there are fields to update
if not update_mask_fields:
return json.dumps(
{"status": "info", "message": "No updates specified"}, indent=2
)
# Create update mask
update_mask = field_mask_pb2.FieldMask(paths=update_mask_fields)
# Create the request
request = run_v2.UpdateServiceRequest(
service=service, update_mask=update_mask
)
print(f"Updating Cloud Run service {service_name}...")
operation = client.update_service(request=request)
print("Waiting for service update to complete...")
response = operation.result()
return json.dumps(
{
"status": "success",
"name": response.name,
"uri": response.uri,
"updated_fields": update_mask_fields,
"conditions": [
{
"type": condition.type_,
"state": condition.state.name
if hasattr(condition.state, "name")
else str(condition.state),
"message": condition.message,
}
for condition in response.conditions
]
if response.conditions
else [],
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def delete_service(
service_name: str, project_id: str = None, region: str = None
) -> str:
"""
Delete a Cloud Run service
Args:
service_name: Name of the service to delete
project_id: GCP project ID (defaults to configured project)
region: GCP region (e.g., us-central1) (defaults to configured location)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().run
project_id = project_id or client_instances.get_project_id()
region = region or client_instances.get_location()
name = f"projects/{project_id}/locations/{region}/services/{service_name}"
print(f"Deleting Cloud Run service {service_name} in {region}...")
operation = client.delete_service(name=name)
print("Waiting for service deletion to complete...")
operation.result()
return json.dumps(
{
"status": "success",
"message": f"Service {service_name} successfully deleted",
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def get_service(
service_name: str, project_id: str = None, region: str = None
) -> str:
"""
Get details of a specific Cloud Run service
Args:
service_name: Name of the service
project_id: GCP project ID (defaults to configured project)
region: GCP region (e.g., us-central1) (defaults to configured location)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().run
project_id = project_id or client_instances.get_project_id()
region = region or client_instances.get_location()
name = f"projects/{project_id}/locations/{region}/services/{service_name}"
print(f"Getting details for Cloud Run service {service_name}...")
service = client.get_service(name=name)
# Parse container details
containers = []
if service.template and service.template.containers:
for container in service.template.containers:
container_info = {
"image": container.image,
"resources": {
"limits": dict(container.resources.limits)
if container.resources and container.resources.limits
else {}
}
if container.resources
else {},
"env_vars": {env.name: env.value for env in container.env}
if container.env
else {},
}
containers.append(container_info)
# Parse traffic
traffic_result = []
if service.traffic:
for traffic in service.traffic:
traffic_info = {
"type": traffic.type_.name
if hasattr(traffic.type_, "name")
else str(traffic.type_),
"revision": traffic.revision.split("/")[-1]
if traffic.revision
else None,
"percent": traffic.percent,
"tag": traffic.tag,
}
traffic_result.append(traffic_info)
return json.dumps(
{
"status": "success",
"name": service.name.split("/")[-1],
"uri": service.uri,
"containers": containers,
"traffic": traffic_result,
"update_time": service.update_time.isoformat()
if service.update_time
else None,
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
# Prompts
@mcp_instance.prompt()
def deploy_service_prompt(
service_name: str,
image: str,
min_instances: str = "0",
max_instances: str = "100",
env_vars: str = "{}",
) -> str:
"""Prompt to deploy a Cloud Run service with the given configuration"""
return f"""
I need to deploy a Cloud Run service with the following configuration:
- Service name: {service_name}
- Container image: {image}
- Min instances: {min_instances}
- Max instances: {max_instances}
- Environment variables: {env_vars}
Please help me set up this service and explain the deployment process and any best practices I should follow.
"""
@mcp_instance.prompt()
def check_service_status_prompt(service_name: str) -> str:
"""Prompt to check the status of a deployed Cloud Run service"""
return f"""
I need to check the current status of my Cloud Run service named "{service_name}".
Please provide me with:
1. Is the service currently running?
2. What is the URL to access it?
3. What revision is currently serving traffic?
4. Are there any issues with the service?
"""
@mcp_instance.prompt()
def create_service_prompt(region: str = "us-central1") -> str:
"""Prompt for creating a new Cloud Run service"""
return f"""
I need to create a new Cloud Run service in {region}.
Please help me with:
1. What container image should I use?
2. How much CPU and memory should I allocate?
3. Should I set min and max instances for scaling?
4. Do I need to set any environment variables?
5. Should I allow unauthenticated access?
6. What's the best way to deploy my service?
"""
@mcp_instance.prompt()
def update_service_prompt() -> str:
"""Prompt for updating a Cloud Run service"""
return """
I need to update my Cloud Run service.
Please help me understand:
1. How to update the container image
2. How to change resource allocations
3. How to add or modify environment variables
4. How to update scaling settings
5. Best practices for zero-downtime updates
"""
```
--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_monitoring.py:
--------------------------------------------------------------------------------
```python
import datetime
import json
from typing import Dict, List, Optional
from google.cloud import monitoring_v3
from google.protobuf.duration_pb2 import Duration
from google.protobuf.timestamp_pb2 import Timestamp
from services import client_instances
def register(mcp_instance):
"""Register all Cloud Monitoring resources and tools with the MCP instance."""
# Resources
@mcp_instance.resource("gcp://monitoring/{project_id}/metrics")
def list_metrics_resource(project_id: str = None) -> str:
"""List all available metrics for a GCP project"""
try:
# Get client from client_instances
client = client_instances.get_clients().monitoring
project_id = project_id or client_instances.get_project_id()
name = f"projects/{project_id}"
metrics = client.list_metric_descriptors(name=name)
result = []
for metric in metrics:
result.append(
{
"name": metric.name,
"type": metric.type,
"display_name": metric.display_name,
"description": metric.description,
"kind": monitoring_v3.MetricDescriptor.MetricKind.Name(
metric.metric_kind
),
"value_type": monitoring_v3.MetricDescriptor.ValueType.Name(
metric.value_type
),
"unit": metric.unit,
}
)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp_instance.resource("gcp://monitoring/{project_id}/alerts")
def list_alerts_resource(project_id: str = None) -> str:
"""List all alert policies for a GCP project"""
try:
# Get client from client_instances
client = client_instances.get_clients().monitoring
project_id = project_id or client_instances.get_project_id()
name = f"projects/{project_id}"
policies = client.list_alert_policies(name=name)
result = []
for policy in policies:
result.append(
{
"name": policy.name,
"display_name": policy.display_name,
"enabled": policy.enabled,
"conditions_count": len(policy.conditions),
"creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat()
if policy.creation_record and policy.creation_record.mutate_time
else None,
"notification_channels": [
chan.split("/")[-1] for chan in policy.notification_channels
]
if policy.notification_channels
else [],
}
)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp_instance.resource("gcp://monitoring/{project_id}/alert/{alert_id}")
def get_alert_resource(project_id: str = None, alert_id: str = None) -> str:
"""Get details for a specific alert policy"""
try:
# Get client from client_instances
client = client_instances.get_clients().monitoring
project_id = project_id or client_instances.get_project_id()
name = f"projects/{project_id}/alertPolicies/{alert_id}"
policy = client.get_alert_policy(name=name)
conditions = []
for condition in policy.conditions:
condition_data = {
"name": condition.name,
"display_name": condition.display_name,
"type": condition.condition_type_name
if hasattr(condition, "condition_type_name")
else None,
}
# Add condition-specific details
if condition.HasField("condition_threshold"):
threshold = condition.condition_threshold
condition_data["threshold"] = {
"filter": threshold.filter,
"comparison": monitoring_v3.ComparisonType.Name(
threshold.comparison
)
if threshold.comparison
else None,
"threshold_value": threshold.threshold_value,
"duration": f"{threshold.duration.seconds}s"
if threshold.duration
else None,
"aggregation": {
"alignment_period": f"{threshold.aggregations[0].alignment_period.seconds}s"
if threshold.aggregations
and threshold.aggregations[0].alignment_period
else None,
"per_series_aligner": monitoring_v3.Aggregation.Aligner.Name(
threshold.aggregations[0].per_series_aligner
)
if threshold.aggregations
and threshold.aggregations[0].per_series_aligner
else None,
"cross_series_reducer": monitoring_v3.Aggregation.Reducer.Name(
threshold.aggregations[0].cross_series_reducer
)
if threshold.aggregations
and threshold.aggregations[0].cross_series_reducer
else None,
}
if threshold.aggregations and len(threshold.aggregations) > 0
else None,
}
conditions.append(condition_data)
result = {
"name": policy.name,
"display_name": policy.display_name,
"enabled": policy.enabled,
"conditions": conditions,
"combiner": monitoring_v3.AlertPolicy.ConditionCombinerType.Name(
policy.combiner
)
if policy.combiner
else None,
"notification_channels": policy.notification_channels,
"creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat()
if policy.creation_record and policy.creation_record.mutate_time
else None,
"documentation": {
"content": policy.documentation.content,
"mime_type": policy.documentation.mime_type,
}
if policy.HasField("documentation")
else None,
}
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
@mcp_instance.resource("gcp://monitoring/{project_id}/notification_channels")
def list_notification_channels_resource(project_id: str = None) -> str:
"""List notification channels for a GCP project"""
try:
# Get client from client_instances
client = client_instances.get_clients().monitoring
project_id = project_id or client_instances.get_project_id()
name = f"projects/{project_id}"
channels = client.list_notification_channels(name=name)
result = []
for channel in channels:
result.append(
{
"name": channel.name,
"type": channel.type,
"display_name": channel.display_name,
"description": channel.description,
"verification_status": monitoring_v3.NotificationChannel.VerificationStatus.Name(
channel.verification_status
)
if channel.verification_status
else None,
"enabled": channel.enabled,
"labels": dict(channel.labels) if channel.labels else {},
}
)
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
# Tools
@mcp_instance.tool()
def list_metrics(project_id: str = None, filter_str: str = "") -> str:
"""
List metrics in a GCP project with optional filtering
Args:
project_id: GCP project ID (defaults to configured project)
filter_str: Optional filter string to narrow results (e.g., "metric.type = starts_with(\"compute.googleapis.com\")")
"""
try:
# Get client from client_instances
client = client_instances.get_clients().monitoring
project_id = project_id or client_instances.get_project_id()
parent = f"projects/{project_id}"
request = monitoring_v3.ListMetricDescriptorsRequest(
name=parent, filter=filter_str
)
print(f"Listing metrics for project {project_id}...")
metrics = client.list_metric_descriptors(request=request)
result = []
for metric in metrics:
result.append(
{
"name": metric.name,
"type": metric.type,
"display_name": metric.display_name,
"description": metric.description,
"kind": monitoring_v3.MetricDescriptor.MetricKind.Name(
metric.metric_kind
),
"value_type": monitoring_v3.MetricDescriptor.ValueType.Name(
metric.value_type
),
"unit": metric.unit,
}
)
return json.dumps(
{"status": "success", "metrics": result, "count": len(result)}, indent=2
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def fetch_metric_timeseries(
metric_type: str,
project_id: str = None,
filter_additions: str = "",
hours: int = 1,
alignment_period_seconds: int = 60,
) -> str:
"""
Fetch time series data for a specific metric
Args:
metric_type: The metric type (e.g., "compute.googleapis.com/instance/cpu/utilization")
project_id: GCP project ID (defaults to configured project)
filter_additions: Additional filter criteria (e.g., "resource.labels.instance_id = \"my-instance\"")
hours: Number of hours of data to fetch (default: 1)
alignment_period_seconds: Data point alignment period in seconds (default: 60)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().monitoring
project_id = project_id or client_instances.get_project_id()
# Build the filter
filter_str = f'metric.type = "{metric_type}"'
if filter_additions:
filter_str += f" AND {filter_additions}"
# Calculate time interval
now = datetime.datetime.utcnow()
seconds = int(now.timestamp())
end_time = Timestamp(seconds=seconds)
start_time = Timestamp(seconds=seconds - hours * 3600)
# Create interval
interval = monitoring_v3.TimeInterval(
end_time=end_time, start_time=start_time
)
# Create aggregation
aggregation = monitoring_v3.Aggregation(
alignment_period=Duration(seconds=alignment_period_seconds),
per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN,
)
# Build request
request = monitoring_v3.ListTimeSeriesRequest(
name=f"projects/{project_id}",
filter=filter_str,
interval=interval,
view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
aggregation=aggregation,
)
print(f"Fetching time series data for {metric_type}...")
time_series = client.list_time_series(request=request)
result = []
for series in time_series:
data_points = []
for point in series.points:
point_time = point.interval.end_time.ToDatetime().isoformat()
# Handle different value types
if point.value.HasField("double_value"):
value = point.value.double_value
elif point.value.HasField("int64_value"):
value = point.value.int64_value
elif point.value.HasField("bool_value"):
value = point.value.bool_value
elif point.value.HasField("string_value"):
value = point.value.string_value
elif point.value.HasField("distribution_value"):
value = (
"distribution" # Simplified, as distributions are complex
)
else:
value = None
data_points.append({"time": point_time, "value": value})
series_data = {
"metric": dict(series.metric.labels)
if series.metric and series.metric.labels
else {},
"resource": {
"type": series.resource.type,
"labels": dict(series.resource.labels)
if series.resource and series.resource.labels
else {},
}
if series.resource
else {},
"points": data_points,
}
result.append(series_data)
return json.dumps(
{
"status": "success",
"metric_type": metric_type,
"time_range": {
"start": start_time.ToDatetime().isoformat(),
"end": end_time.ToDatetime().isoformat(),
},
"time_series": result,
"count": len(result),
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def list_alert_policies(project_id: str = None, filter_str: str = "") -> str:
"""
List alert policies in a GCP project with optional filtering
Args:
project_id: GCP project ID (defaults to configured project)
filter_str: Optional filter string (e.g., "display_name = \"High CPU Alert\"")
"""
try:
# Get client from client_instances
client = client_instances.get_clients().monitoring
project_id = project_id or client_instances.get_project_id()
parent = f"projects/{project_id}"
request = monitoring_v3.ListAlertPoliciesRequest(
name=parent, filter=filter_str
)
print(f"Listing alert policies for project {project_id}...")
policies = client.list_alert_policies(request=request)
result = []
for policy in policies:
policy_data = {
"name": policy.name,
"display_name": policy.display_name,
"enabled": policy.enabled,
"conditions_count": len(policy.conditions),
"combiner": monitoring_v3.AlertPolicy.ConditionCombinerType.Name(
policy.combiner
)
if policy.combiner
else None,
"notification_channels": [
chan.split("/")[-1] for chan in policy.notification_channels
]
if policy.notification_channels
else [],
"creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat()
if policy.creation_record and policy.creation_record.mutate_time
else None,
}
result.append(policy_data)
return json.dumps(
{"status": "success", "alert_policies": result, "count": len(result)},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def create_metric_threshold_alert(
display_name: str,
metric_type: str,
filter_str: str,
threshold_value: float,
project_id: str = None,
comparison: str = "COMPARISON_GT",
duration_seconds: int = 300,
alignment_period_seconds: int = 60,
aligner: str = "ALIGN_MEAN",
reducer: str = "REDUCE_MEAN",
notification_channels: Optional[List[str]] = None,
documentation: str = "",
enabled: bool = True,
) -> str:
"""
Create a metric threshold alert policy
Args:
display_name: Human-readable name for the alert
metric_type: The metric to alert on (e.g., "compute.googleapis.com/instance/cpu/utilization")
filter_str: Filter string to define which resources to monitor
threshold_value: The threshold value to trigger the alert
project_id: GCP project ID (defaults to configured project)
comparison: Comparison type (COMPARISON_GT, COMPARISON_GE, COMPARISON_LT, COMPARISON_LE, COMPARISON_EQ, COMPARISON_NE)
duration_seconds: Duration in seconds the condition must be met to trigger
alignment_period_seconds: Period in seconds for data point alignment
aligner: Per-series aligner (ALIGN_MEAN, ALIGN_MAX, ALIGN_MIN, etc.)
reducer: Cross-series reducer (REDUCE_MEAN, REDUCE_MAX, REDUCE_MIN, etc.)
notification_channels: List of notification channel IDs
documentation: Documentation for the alert (markdown supported)
enabled: Whether the alert should be enabled
"""
try:
# Get client from client_instances
client = client_instances.get_clients().monitoring
project_id = project_id or client_instances.get_project_id()
# Validate and convert enums
try:
comparison_enum = getattr(monitoring_v3.ComparisonType, comparison)
aligner_enum = getattr(monitoring_v3.Aggregation.Aligner, aligner)
reducer_enum = getattr(monitoring_v3.Aggregation.Reducer, reducer)
except AttributeError as e:
return json.dumps(
{
"status": "error",
"message": f"Invalid enum value: {str(e)}. Please check documentation for valid values.",
},
indent=2,
)
# Prepare notification channels with full paths
full_notification_channels = []
if notification_channels:
for channel in notification_channels:
if not channel.startswith(
f"projects/{project_id}/notificationChannels/"
):
channel = (
f"projects/{project_id}/notificationChannels/{channel}"
)
full_notification_channels.append(channel)
# Create aggregation
aggregation = monitoring_v3.Aggregation(
alignment_period=Duration(seconds=alignment_period_seconds),
per_series_aligner=aligner_enum,
cross_series_reducer=reducer_enum,
group_by_fields=[],
)
# Create condition threshold
condition_threshold = monitoring_v3.AlertPolicy.Condition.MetricThreshold(
filter=f'metric.type = "{metric_type}" AND {filter_str}',
comparison=comparison_enum,
threshold_value=threshold_value,
duration=Duration(seconds=duration_seconds),
aggregations=[aggregation],
)
# Create condition
condition = monitoring_v3.AlertPolicy.Condition(
display_name=f"Threshold condition for {display_name}",
condition_threshold=condition_threshold,
)
# Create alert policy
alert_policy = monitoring_v3.AlertPolicy(
display_name=display_name,
conditions=[condition],
combiner=monitoring_v3.AlertPolicy.ConditionCombinerType.OR,
notification_channels=full_notification_channels,
enabled=monitoring_v3.BoolValue(value=enabled),
)
# Add documentation if provided
if documentation:
alert_policy.documentation = monitoring_v3.AlertPolicy.Documentation(
content=documentation, mime_type="text/markdown"
)
# Create the request
request = monitoring_v3.CreateAlertPolicyRequest(
name=f"projects/{project_id}", alert_policy=alert_policy
)
print(f"Creating alert policy: {display_name}...")
response = client.create_alert_policy(request=request)
return json.dumps(
{
"status": "success",
"name": response.name,
"display_name": response.display_name,
"enabled": response.enabled,
"conditions_count": len(response.conditions),
"notification_channels": response.notification_channels,
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def update_alert_policy(
alert_id: str,
project_id: str = None,
display_name: Optional[str] = None,
notification_channels: Optional[List[str]] = None,
enabled: Optional[bool] = None,
documentation: Optional[str] = None,
) -> str:
"""
Update an existing alert policy
Args:
alert_id: ID of the alert to update
project_id: GCP project ID (defaults to configured project)
display_name: New name for the alert
notification_channels: List of notification channel IDs
enabled: Whether the alert should be enabled
documentation: Documentation for the alert (markdown supported)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().monitoring
project_id = project_id or client_instances.get_project_id()
# Get the existing policy
name = f"projects/{project_id}/alertPolicies/{alert_id}"
policy = client.get_alert_policy(name=name)
# Update fields if provided
if display_name:
policy.display_name = display_name
if notification_channels is not None:
full_notification_channels = []
for channel in notification_channels:
if not channel.startswith(
f"projects/{project_id}/notificationChannels/"
):
channel = (
f"projects/{project_id}/notificationChannels/{channel}"
)
full_notification_channels.append(channel)
policy.notification_channels = full_notification_channels
if enabled is not None:
policy.enabled = monitoring_v3.BoolValue(value=enabled)
if documentation is not None:
policy.documentation = monitoring_v3.AlertPolicy.Documentation(
content=documentation, mime_type="text/markdown"
)
# Create update mask
update_mask = []
if display_name:
update_mask.append("display_name")
if notification_channels is not None:
update_mask.append("notification_channels")
if enabled is not None:
update_mask.append("enabled")
if documentation is not None:
update_mask.append("documentation")
# Update the policy
request = monitoring_v3.UpdateAlertPolicyRequest(
alert_policy=policy, update_mask={"paths": update_mask}
)
print(f"Updating alert policy: {policy.name}...")
response = client.update_alert_policy(request=request)
return json.dumps(
{
"status": "success",
"name": response.name,
"display_name": response.display_name,
"enabled": response.enabled,
"conditions_count": len(response.conditions),
"notification_channels": [
chan.split("/")[-1] for chan in response.notification_channels
]
if response.notification_channels
else [],
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def delete_alert_policy(alert_id: str, project_id: str = None) -> str:
"""
Delete an alert policy
Args:
alert_id: ID of the alert to delete
project_id: GCP project ID (defaults to configured project)
"""
try:
# Get client from client_instances
client = client_instances.get_clients().monitoring
project_id = project_id or client_instances.get_project_id()
name = f"projects/{project_id}/alertPolicies/{alert_id}"
print(f"Deleting alert policy: {alert_id}...")
client.delete_alert_policy(name=name)
return json.dumps(
{
"status": "success",
"message": f"Alert policy {alert_id} successfully deleted",
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
@mcp_instance.tool()
def create_notification_channel(
display_name: str,
channel_type: str,
labels: Dict[str, str],
project_id: str = None,
description: str = "",
enabled: bool = True,
) -> str:
"""
Create a notification channel
Args:
display_name: Human-readable name for the channel
channel_type: Type of channel (email, sms, slack, pagerduty, etc.)
labels: Channel-specific configuration (e.g., {"email_address": "[email protected]"})
project_id: GCP project ID (defaults to configured project)
description: Optional description for the channel
enabled: Whether the channel should be enabled
"""
try:
# Get client from client_instances
client = client_instances.get_clients().monitoring
project_id = project_id or client_instances.get_project_id()
# Create notification channel
notification_channel = monitoring_v3.NotificationChannel(
type=channel_type,
display_name=display_name,
description=description,
labels=labels,
enabled=monitoring_v3.BoolValue(value=enabled),
)
# Create the request
request = monitoring_v3.CreateNotificationChannelRequest(
name=f"projects/{project_id}", notification_channel=notification_channel
)
print(f"Creating notification channel: {display_name} ({channel_type})...")
response = client.create_notification_channel(request=request)
return json.dumps(
{
"status": "success",
"name": response.name,
"type": response.type,
"display_name": response.display_name,
"description": response.description,
"verification_status": monitoring_v3.NotificationChannel.VerificationStatus.Name(
response.verification_status
)
if response.verification_status
else None,
"enabled": response.enabled,
"labels": dict(response.labels) if response.labels else {},
},
indent=2,
)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)}, indent=2)
# Prompts
@mcp_instance.prompt()
def create_alert_prompt() -> str:
"""Prompt for creating a new alert policy"""
return """
I need to create a new alert policy in Cloud Monitoring.
Please help me with:
1. Selecting the appropriate metric type for my alert
2. Setting up sensible thresholds and durations
3. Understanding the different comparison types
4. Best practices for alert documentation
5. Setting up notification channels
I'd like to create an alert that triggers when:
"""
@mcp_instance.prompt()
def monitor_resources_prompt() -> str:
"""Prompt for guidance on monitoring GCP resources"""
return """
I need to set up monitoring for my GCP resources. Please help me understand:
1. What are the most important metrics I should be monitoring for:
- Compute Engine instances
- Cloud SQL databases
- Cloud Storage buckets
- App Engine applications
- Kubernetes Engine clusters
2. What are recommended thresholds for alerts on these resources?
3. How should I organize my monitoring to keep it manageable?
4. What visualization options do I have in Cloud Monitoring?
"""
```