#
tokens: 37727/50000 30/30 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── .python-version
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── requirements.txt
├── smithery.yaml
├── src
│   └── gcp-mcp-server
│       ├── __init__.py
│       ├── core
│       │   ├── __init__.py
│       │   ├── auth.py
│       │   ├── context.py
│       │   ├── logging_handler.py
│       │   ├── security.py
│       │   └── server.py
│       ├── main.py
│       ├── prompts
│       │   ├── __init__.py
│       │   └── common.py
│       └── services
│           ├── __init__.py
│           ├── artifact_registry.py
│           ├── client_instances.py
│           ├── cloud_audit_logs.py
│           ├── cloud_bigquery.py
│           ├── cloud_build.py
│           ├── cloud_compute_engine.py
│           ├── cloud_monitoring.py
│           ├── cloud_run.py
│           ├── cloud_storage.py
│           └── README.md
├── utils
│   ├── __init__.py
│   └── helpers.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.13
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info

# Virtual environments
.venv
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/README.md:
--------------------------------------------------------------------------------

```markdown
# GCP Services MCP Implementation

This repository contains Model Context Protocol (MCP) implementations for various Google Cloud Platform (GCP) services. These modules expose GCP resources, tools, and prompts in a standardized way for AI assistants to interact with GCP infrastructure.

## Overview

The MCP framework provides a consistent way to expose cloud resources to AI assistants. Each service module follows a common pattern:

1. A `register` function that takes an MCP instance as a parameter
2. Resources for retrieving information about GCP resources
3. Tools for performing operations on GCP resources
4. Prompts for guiding users through common tasks

## Implementation Pattern

All service modules follow a consistent implementation pattern:

```python
import json
from services import client_instances
from google.cloud import service_client

def register(mcp_instance):
    """Register all resources and tools with the MCP instance."""
    
    # Resources
    @mcp_instance.resource("gcp://service/{project_id}/resources")
    def list_resources(project_id: str = None) -> str:
        # Implement resource listing
        pass
        
    # Tools
    @mcp_instance.tool()
    def create_resource(resource_name: str, project_id: str = None) -> str:
        # Implement resource creation
        pass
        
    # Prompts
    @mcp_instance.prompt()
    def configuration_help() -> str:
        # Return helpful prompt text
        return """Help text here"""
```

## Key Design Principles

### 1. Client Management

Clients are accessed through a central `client_instances` module:

```python
client = client_instances.get_clients().service_name
```

This approach:
- Centralizes client creation and authentication
- Avoids duplicating client instantiation logic
- Enables consistent credential management

### 2. Parameter Defaults

Required parameters come first, followed by optional parameters with sensible defaults:

```python
def create_resource(
    resource_name: str,     # Required parameter first
    project_id: str = None, # Optional parameters with defaults
    location: str = None
) -> str:
    # Use defaults from client_instances when not provided
    project_id = project_id or client_instances.get_project_id()
    location = location or client_instances.get_location()
```

### 3. Error Handling

All functions include consistent error handling:

```python
try:
    # Implementation code
    return json.dumps(result, indent=2)
except Exception as e:
    return json.dumps({"error": str(e)}, indent=2)
```

### 4. JSON Responses

All responses are consistently formatted JSON strings with proper indentation:

```python
return json.dumps(
    {
        "status": "success",
        "resources": result,
        "count": len(result)
    },
    indent=2
)
```

### 5. Defensive Coding

All implementations use defensive coding practices to handle potentially missing or null values:

```python
"labels": dict(resource.labels) if resource.labels else {}
```

## Available Services

The following GCP services are implemented:

1. **BigQuery** - Data warehouse operations
2. **Cloud Storage** - Object storage operations 
3. **Cloud Run** - Serverless container management
4. **Cloud Build** - CI/CD pipeline management
5. **Artifact Registry** - Container and package registry
6. **Compute Engine** - VM instance management
7. **Cloud Monitoring** - Metrics and alerting
8. **Cloud Audit Logs** - Logging and auditing

## Service-Specific Features

### BigQuery

- Dataset and table management
- Query execution
- Data import/export

### Cloud Storage

- Bucket management
- Object operations (upload, download, delete)
- Bucket configuration

### Cloud Run

- Service deployment
- Service configuration
- Traffic management

### Cloud Build

- Trigger management
- Build execution
- Build monitoring

### Artifact Registry

- Repository management
- Package operations

### Compute Engine

- Instance lifecycle management
- Instance configuration

### Cloud Monitoring

- Metric exploration
- Alert policy management
- Notification channel configuration

### Cloud Audit Logs

- Log querying
- Log sink management

## Usage Example

To register all services with an MCP instance:

```python
from mcp.server.fastmcp import FastMCP
from services import (
    bigquery,
    storage,
    cloudrun,
    cloudbuild,
    artifactregistry,
    compute,
    monitoring,
    auditlogs
)

# Create MCP instance
mcp = FastMCP("GCP Services")

# Register all services
bigquery.register(mcp)
storage.register(mcp)
cloudrun.register(mcp)
cloudbuild.register(mcp)
artifactregistry.register(mcp)
compute.register(mcp)
monitoring.register(mcp)
auditlogs.register(mcp)
```

## Important Notes

1. **Default Project and Location**:
   - Services default to the project and location configured in `client_instances`
   - Always allow these to be overridden by parameters

2. **Authentication**:
   - Authentication is handled by the `client_instances` module
   - No credentials should be hardcoded in service implementations

3. **Error Handling**:
   - All operations should include robust error handling
   - Error responses should be informative but not expose sensitive information

4. **Response Formatting**:
   - All responses should be valid JSON with proper indentation
   - Success responses should include a "status": "success" field
   - Error responses should include a "status": "error" field with a "message"

## Best Practices for MCP Integration

1. **Resource Naming**:
   - Use consistent URI patterns for resources (gcp://service/{project_id}/resource)
   - Group related resources logically

2. **Tool Design**:
   - Design tools around specific user intents
   - Order parameters with required ones first, followed by optional ones
   - Provide sensible defaults for optional parameters

3. **Prompt Design**:
   - Create prompts for common user scenarios
   - Include enough context for the AI to provide helpful responses
   - Structure prompts as guiding questions or templates

4. **Documentation**:
   - Include descriptive docstrings for all resources, tools, and prompts
   - Document all parameters and return values
   - Include usage examples where appropriate

## Extending the Services

To add a new GCP service:

1. Create a new module following the pattern above
2. Implement resources for retrieving information
3. Implement tools for operations
4. Create prompts for common user tasks
5. Register the new service in your MCP initialization

## Troubleshooting

If you encounter issues:

1. Check client initialization in `client_instances`
2. Verify required permissions for the service operations
3. Look for additional error details in the returned JSON
4. Add print statements for debugging complex operations
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# This is not a Ready MCP Server



# GCP MCP Server

A comprehensive Model Context Protocol (MCP) server implementation for Google Cloud Platform (GCP) services, enabling AI assistants to interact with and manage GCP resources through a standardized interface.

## Overview

GCP MCP Server provides AI assistants with capabilities to:

- **Query GCP Resources**: Get information about your cloud infrastructure
- **Manage Cloud Resources**: Create, configure, and manage GCP services
- **Receive Assistance**: Get AI-guided help with GCP configurations and best practices

The implementation follows the MCP specification to enable AI systems to interact with GCP services in a secure, controlled manner.

## Supported GCP Services

This implementation includes support for the following GCP services:

- **Artifact Registry**: Container and package management
- **BigQuery**: Data warehousing and analytics
- **Cloud Audit Logs**: Logging and audit trail analysis
- **Cloud Build**: CI/CD pipeline management
- **Cloud Compute Engine**: Virtual machine instances
- **Cloud Monitoring**: Metrics, alerting, and dashboards
- **Cloud Run**: Serverless container deployments
- **Cloud Storage**: Object storage management

## Architecture

The project is structured as follows:

```
gcp-mcp-server/
├── core/            # Core MCP server functionality auth context logging_handler security 
├── prompts/         # AI assistant prompts for GCP operations
├── services/        # GCP service implementations
│   ├── README.md    # Service implementation details
│   └── ...          # Individual service modules
├── main.py          # Main server entry point
└── ...
```

Key components:

- **Service Modules**: Each GCP service has its own module with resources, tools, and prompts
- **Client Instances**: Centralized client management for authentication and resource access
- **Core Components**: Base functionality for the MCP server implementation

## Getting Started

### Prerequisites

- Python 3.10+
- GCP project with enabled APIs for the services you want to use
- Authenticated GCP credentials (Application Default Credentials recommended)

### Installation

1. Clone the repository:
   ```bash
   git clone https://github.com/yourusername/gcp-mcp-server.git
   cd gcp-mcp-server
   ```

2. Set up a virtual environment:
   ```bash
   python -m venv venv
   source venv/bin/activate  # On Windows: venv\Scripts\activate
   ```

3. Install dependencies:
   ```bash
   pip install -r requirements.txt
   ```

4. Configure your GCP credentials:
   ```bash
   # Using gcloud
   gcloud auth application-default login
   
   # Or set GOOGLE_APPLICATION_CREDENTIALS
   export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"
   ```

5. Set up environment variables:
   ```bash
   cp .env.example .env
   # Edit .env with your configuration
   ```

### Running the Server

Start the MCP server:

```bash
python main.py
```

For development and testing:

```bash
# Development mode with auto-reload
python main.py --dev

# Run with specific configuration
python main.py --config config.yaml
```

## Docker Deployment

Build and run with Docker:

```bash
# Build the image
docker build -t gcp-mcp-server .

# Run the container
docker run -p 8080:8080 -v ~/.config/gcloud:/root/.config/gcloud gcp-mcp-server
```

## Configuration

The server can be configured through environment variables or a configuration file:

| Environment Variable | Description | Default |
|----------------------|-------------|---------|
| `GCP_PROJECT_ID` | Default GCP project ID | None (required) |
| `GCP_DEFAULT_LOCATION` | Default region/zone | `us-central1` |
| `MCP_SERVER_PORT` | Server port | `8080` |
| `LOG_LEVEL` | Logging level | `INFO` |

See `.env.example` for a complete list of configuration options.
 
## Development

### Adding a New GCP Service

1. Create a new file in the `services/` directory
2. Implement the service following the pattern in existing services
3. Register the service in `main.py`

See the [services README](services/README.md) for detailed implementation guidance.
 

## Security Considerations

- The server uses Application Default Credentials for authentication
- Authorization is determined by the permissions of the authenticated identity
- No credentials are hardcoded in the service implementations
- Consider running with a service account with appropriate permissions

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

1. Fork the repository
2. Create your feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add some amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## Acknowledgments

- Google Cloud Platform team for their comprehensive APIs
- Model Context Protocol for providing a standardized way for AI to interact with services

### Using the Server

To use this server:

1. Place your GCP service account key file as `service-account.json` in the same directory
2. Install the MCP package: `pip install "mcp[cli]"`
3. Install the required GCP package: `pip install google-cloud-run`
4. Run: `mcp dev gcp_cloudrun_server.py`

Or install it in Claude Desktop:
```
mcp install gcp_cloudrun_server.py --name "GCP Cloud Run Manager"
```


## MCP Server Configuration

The following configuration can be added to your configuration file for GCP Cloud Tools:

```json
"mcpServers": {
  "GCP Cloud Tools": {
    "command": "uv",
    "args": [
      "run",
      "--with",
      "google-cloud-artifact-registry>=1.10.0",
      "--with",
      "google-cloud-bigquery>=3.27.0",
      "--with",
      "google-cloud-build>=3.0.0",
      "--with",
      "google-cloud-compute>=1.0.0",
      "--with",
      "google-cloud-logging>=3.5.0",
      "--with",
      "google-cloud-monitoring>=2.0.0",
      "--with",
      "google-cloud-run>=0.9.0",
      "--with",
      "google-cloud-storage>=2.10.0",
      "--with",
      "mcp[cli]",
      "--with",
      "python-dotenv>=1.0.0",
      "mcp",
      "run",
      "C:\\Users\\enes_\\Desktop\\mcp-repo-final\\gcp-mcp\\src\\gcp-mcp-server\\main.py"
    ],
    "env": {
      "GOOGLE_APPLICATION_CREDENTIALS": "C:/Users/enes_/Desktop/mcp-repo-final/gcp-mcp/service-account.json",
      "GCP_PROJECT_ID": "gcp-mcp-cloud-project",
      "GCP_LOCATION": "us-east1"
    }
  }
}
```

### Configuration Details

This configuration sets up an MCP server for Google Cloud Platform tools with the following:

- **Command**: Uses `uv` package manager to run the server
- **Dependencies**: Includes various Google Cloud libraries (Artifact Registry, BigQuery, Cloud Build, etc.)
- **Environment Variables**:
  - `GOOGLE_APPLICATION_CREDENTIALS`: Path to your GCP service account credentials
  - `GCP_PROJECT_ID`: Your Google Cloud project ID
  - `GCP_LOCATION`: GCP region (us-east1)

### Usage

Add this configuration to your MCP configuration file to enable GCP Cloud Tools functionality.

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/prompts/common.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/__init__.py:
--------------------------------------------------------------------------------

```python
"""Core package for MCP server components."""

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/__init__.py:
--------------------------------------------------------------------------------

```python
"""GCP MCP server package."""

__version__ = "0.1.0"

```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
mcp>=1.0.0
google-cloud-bigquery
google-cloud-storage
google-cloud-run
google-cloud-artifact-registry
google-cloud-logging
python-dotenv
google-cloud-monitoring
google-cloud-compute
google-cloud-build

```

--------------------------------------------------------------------------------
/utils/helpers.py:
--------------------------------------------------------------------------------

```python
"""Common utility functions."""


def format_resource_name(project_id: str, resource_name: str) -> str:
    """Format a resource name with project ID."""
    return f"projects/{project_id}/resources/{resource_name}"

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/security.py:
--------------------------------------------------------------------------------

```python
import re


class DataSanitizer:
    PATTERNS = [
        r"(?i)(token|key|secret|password)",
        r"\b\d{4}-\d{4}-\d{4}-\d{4}\b",  # CC-like numbers
    ]

    @classmethod
    def sanitize(cls, text: str) -> str:
        for pattern in cls.PATTERNS:
            text = re.sub(pattern, "[REDACTED]", text)
        return text

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/__init__.py:
--------------------------------------------------------------------------------

```python
"""
GCP MCP Server services package.
This package contains service modules that register tools and resources with the MCP server.
"""

# The following allows these modules to be imported directly from the services package
from . import (
    artifact_registry,
    client_instances,
    cloud_audit_logs,
    cloud_bigquery,
    cloud_build,
    cloud_compute_engine,
    cloud_monitoring,
    cloud_run,
    cloud_storage,
)

```

--------------------------------------------------------------------------------
/utils/__init__.py:
--------------------------------------------------------------------------------

```python
"""Utility functions for MCP server."""

import json
from typing import Any


def format_json_response(data: Any) -> str:
    """Format data as a JSON string with consistent styling."""
    return json.dumps({"status": "success", "data": data}, indent=2)


def format_error_response(message: str) -> str:
    """Format error message as a JSON string."""
    return json.dumps({"status": "error", "message": message}, indent=2)

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/logging_handler.py:
--------------------------------------------------------------------------------

```python
import logging


class MCPLogger:
    """Simple logger for MCP server"""

    def __init__(self, name):
        self.logger = logging.getLogger(f"mcp.{name}")

    def info(self, message):
        self.logger.info(message)

    def warning(self, message):
        self.logger.warning(message)

    def error(self, message):
        self.logger.error(message)

    def critical(self, message):
        self.logger.critical(message)

    def debug(self, message):
        self.logger.debug(message)

    def audit_log(self, action, resource, details=None):
        """Simple audit logging"""
        self.logger.info(f"AUDIT: {action} on {resource}")

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/prompts/__init__.py:
--------------------------------------------------------------------------------

```python
"""Common prompts for GCP operations."""

from ..core import mcp


@mcp.prompt()
def gcp_service_help(service_name: str) -> str:
    """Get help for using a GCP service."""
    return f"""
    I need help with using {service_name} in Google Cloud Platform.
    
    Please help me understand:
    1. Common operations and best practices
    2. Required parameters and configuration
    3. Security considerations
    4. Recommended patterns for {service_name}
    """


@mcp.prompt()
def error_analysis(error_message: str) -> str:
    """Analyze a GCP error message."""
    return f"""
    I received this error from GCP:
    {error_message}
    
    Please help me:
    1. Understand what caused this error
    2. Find potential solutions
    3. Prevent similar errors in the future
    """

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "gcp-mcp-server"
version = "0.1.0"
description = "A Model Context Protocol server that provides access to Google Cloud Platform services, enabling LLMs to manage and interact with GCP resources."
readme = "README.md"
requires-python = ">=3.13"

dependencies = [
    "mcp[cli]>=1.0.0",
    "google-cloud-bigquery",
    "google-cloud-storage",
    "google-cloud-run",
    "google-cloud-artifact-registry",
    "google-cloud-logging",
    "python-dotenv",
    "google-cloud-monitoring",
    "google-cloud-compute",
    "google-cloud-build",
]
[[project.authors]]
name = "Enes Bol"
email = "[email protected]"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/gcp_mcp"]

[project.scripts]
gcp-mcp-server = "gcp_mcp.main:main"

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/client_instances.py:
--------------------------------------------------------------------------------

```python
import logging

from core.context import GCPClients

# Set up logging
logger = logging.getLogger(__name__)

# Global client instance
_gcp_clients = None
_project_id = None
_location = None


def initialize_clients(credentials=None):
    """Initialize GCP clients with credentials."""
    global _gcp_clients, _project_id, _location

    try:
        # Initialize GCP clients
        _gcp_clients = GCPClients(credentials)
        _project_id = _gcp_clients.project_id
        _location = _gcp_clients.location
        logger.info(f"GCP clients initialized with project: {_project_id}")
        return True
    except Exception as e:
        logger.error(f"Failed to initialize GCP clients: {str(e)}")
        return False


def get_clients():
    """Get the initialized GCP clients."""
    if _gcp_clients is None:
        logger.warning("GCP clients not initialized. Attempting to initialize...")
        initialize_clients()
    return _gcp_clients


def get_project_id():
    """Get the GCP project ID."""
    return _project_id


def get_location():
    """Get the GCP location."""
    return _location

```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
# Smithery.ai configuration
startCommand:
  type: stdio
  configSchema:
    type: object
    properties:
      project_id:
        type: string
        description: "Google Cloud Project ID"
      region:
        type: string
        description: "Default GCP region"
        default: "us-central1"
      timeout:
        type: integer
        description: "Default timeout for operations in seconds"
        default: 300
      service_account_json:
        type: string
        description: "GCP Service Account key JSON (as string)"
      service_account_path:
        type: string
        description: "Path to GCP Service Account key file"
    required:
      - project_id
  commandFunction: |-
    (config) => ({
      "command": "python",
      "args": [
        "main.py"
      ],
      "env": {
        "GCP_PROJECT_ID": config.project_id,
        "GCP_REGION": config.region || "us-central1",
        "GCP_TIMEOUT": String(config.timeout || 300),
        "GCP_SERVICE_ACCOUNT_JSON": config.service_account_json || "",
        "GCP_SERVICE_ACCOUNT_KEY_PATH": config.service_account_path || ""
      }
    })
```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
# Use a Python image with uv pre-installed
FROM ghcr.io/astral-sh/uv:python3.13-bookworm-slim AS uv

# Set working directory
WORKDIR /app

# Enable bytecode compilation
ENV UV_COMPILE_BYTECODE=1

# Copy pyproject.toml and lock file for dependencies
COPY pyproject.toml uv.lock ./

# Install the project's dependencies
RUN --mount=type=cache,target=/root/.cache/uv \
    uv sync --frozen --no-install-project --no-dev --no-editable

# Add the rest of the project source code and install it
ADD src /app/src

# Sync and install the project
RUN --mount=type=cache,target=/root/.cache/uv \
    uv sync --frozen --no-dev --no-editable

FROM python:3.13-slim-bookworm

# Set working directory
WORKDIR /app

# Copy virtual environment from the builder
COPY --from=uv /root/.local /root/.local
COPY --from=uv --chown=app:app /app/.venv /app/.venv

# Place executables in the environment at the front of the path
ENV PATH="/app/.venv/bin:$PATH"

# Define the entry point
ENTRYPOINT ["gcp-mcp-server"]

# Example command
# CMD ["--project", "your-gcp-project-id", "--location", "your-gcp-location"]
```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/server.py:
--------------------------------------------------------------------------------

```python
# import logging
# import os
# from contextlib import asynccontextmanager
# from typing import Any, AsyncIterator, Dict

# from auth import get_credentials
# from mcp.server.fastmcp import FastMCP

# from ..services import bigquery, compute, iam, storage

# logging.basicConfig(level=logging.INFO)
# logger = logging.getLogger(__name__)


# @asynccontextmanager
# async def gcp_lifespan(server) -> AsyncIterator[Dict[str, Any]]:
#     """Set up GCP context with credentials."""
#     logger.info("Initializing GCP MCP server...")
#     try:
#         # Get GCP credentials
#         credentials = get_credentials()
#         project_id = os.environ.get("GCP_PROJECT_ID")

#         if not project_id:
#             logger.warning(
#                 "GCP_PROJECT_ID not set in environment. Some features may not work correctly."
#             )

#         logger.info(f"Server initialized with project: {project_id or 'Not set'}")

#         # Yield context to be used by handlers
#         yield {"credentials": credentials, "project_id": project_id}
#     except Exception as e:
#         logger.error(f"Failed to initialize GCP context: {str(e)}")
#         raise
#     finally:
#         logger.info("Shutting down GCP MCP server...")


# # Create main server FIRST
# mcp = FastMCP(
#     "GCP Manager",
#     description="Manage Google Cloud Platform Resources",
#     lifespan=gcp_lifespan,
# )

# # Register all services
# compute.register(mcp)
# storage.register(mcp)
# bigquery.register(mcp)
# iam.register(mcp)


# # THEN define resources and tools
# @mcp.resource("gcp://project")
# def get_project_info():
#     """Get information about the current GCP project"""
#     project_id = os.environ.get("GCP_PROJECT_ID")
#     return f"Project ID: {project_id}"


# @mcp.resource("gcp://storage/buckets")
# def list_buckets():
#     """List GCP storage buckets"""
#     from google.cloud import storage

#     client = storage.Client()
#     buckets = list(client.list_buckets(max_results=10))
#     return "\n".join([f"- {bucket.name}" for bucket in buckets])


# @mcp.resource("test://hello")
# def hello_resource():
#     """A simple test resource"""
#     return "Hello World"


# @mcp.tool()
# def list_gcp_instances(region: str = "us-central1") -> str:
#     """List GCP compute instances in a region"""
#     # Use your credentials to list instances
#     return f"Instances in {region}: [instance list would go here]"


# @mcp.tool()
# def test_gcp_auth() -> str:
#     """Test GCP authentication"""
#     try:
#         from google.cloud import storage

#         client = storage.Client()
#         buckets = list(client.list_buckets(max_results=5))
#         return f"Authentication successful. Found {len(buckets)} buckets."
#     except Exception as e:
#         return f"Authentication failed: {str(e)}"


# if __name__ == "__main__":
#     mcp.run()

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/auth.py:
--------------------------------------------------------------------------------

```python
import json
import os

import google.auth
from google.auth.exceptions import DefaultCredentialsError
from google.auth.transport.requests import Request
from google.oauth2 import service_account

from .logging_handler import MCPLogger

# Define required scopes for GCP APIs
REQUIRED_SCOPES = [
    "https://www.googleapis.com/auth/cloud-platform",
]

logger = MCPLogger("auth")


def get_credentials():
    """
    Get Google Cloud credentials from environment.
    Attempts to load credentials in the following order:
    1. From GOOGLE_APPLICATION_CREDENTIALS environment variable
    2. From GCP_SERVICE_ACCOUNT_JSON environment variable containing JSON
    3. Default application credentials
    """
    try:
        # Check for credentials file path
        creds_file = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
        if creds_file and os.path.exists(creds_file):
            logger.audit_log(
                action="credential_load",
                resource="service_account",
                details={"method": "file", "file": creds_file},
            )
            logger.info(f"Loading credentials from file: {creds_file}")
            credentials = service_account.Credentials.from_service_account_file(
                creds_file, scopes=REQUIRED_SCOPES
            )
            return validate_credentials(credentials)

        # Check for service account JSON in environment
        sa_json = os.environ.get("GCP_SERVICE_ACCOUNT_JSON")
        if sa_json:
            try:
                service_account_info = json.loads(sa_json)
                logger.info(
                    "Loading credentials from GCP_SERVICE_ACCOUNT_JSON environment variable"
                )
                credentials = service_account.Credentials.from_service_account_info(
                    service_account_info, scopes=REQUIRED_SCOPES
                )
                logger.audit_log(
                    action="credential_load",
                    resource="service_account",
                    details={"method": "environment_json"},
                )
                return validate_credentials(credentials)
            except json.JSONDecodeError as e:
                logger.error(f"Failed to parse GCP_SERVICE_ACCOUNT_JSON: {str(e)}")

        # Fall back to default credentials
        try:
            logger.audit_log(
                action="credential_load",
                resource="application_default",
                details={"method": "default"},
            )
            logger.info("Loading default application credentials")
            credentials, project_id = google.auth.default(scopes=REQUIRED_SCOPES)
            if project_id and not os.environ.get("GCP_PROJECT_ID"):
                # Set project ID from default credentials if not already set
                os.environ["GCP_PROJECT_ID"] = project_id
                logger.info(f"Using project ID from default credentials: {project_id}")
            return validate_credentials(credentials)
        except DefaultCredentialsError as e:
            logger.error(f"Failed to load GCP credentials: {str(e)}")
            raise AuthenticationError("Failed to obtain valid credentials")

    except Exception as e:
        logger.critical(f"Authentication failure: {str(e)}")
        raise AuthenticationError(f"Failed to obtain valid credentials: {str(e)}")


def validate_credentials(credentials):
    """Validate credential permissions and expiration"""
    # Some credentials don't have a valid attribute or may not need refresh
    if hasattr(credentials, "valid") and not credentials.valid:
        credentials.refresh(Request())

    # Check expiration
    if hasattr(credentials, "expiry"):
        if hasattr(credentials, "expired") and credentials.expired:
            try:
                credentials.refresh(Request())
            except Exception as e:
                raise AuthenticationError(
                    f"Failed to refresh expired credentials: {str(e)}"
                )

    return credentials


class AuthenticationError(Exception):
    """Custom exception for authentication failures"""

    pass

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/main.py:
--------------------------------------------------------------------------------

```python
import os

# At the top of main.py, add these imports and logging statements
import sys

# Get the directory containing main.py
current_dir = os.path.dirname(os.path.abspath(__file__))
# Add it to Python's module search path
sys.path.append(current_dir)
# Also add the parent directory if needed
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)


import inspect
import logging
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator, Dict

# Import the GCP Clients and Context
from mcp.server.fastmcp import FastMCP
from services import client_instances

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# After adding paths, print them to see what's happening
print(f"Current working directory: {os.getcwd()}")
print(f"Updated Python path: {sys.path}")


@asynccontextmanager
async def gcp_lifespan(server) -> AsyncIterator[Dict[str, Any]]:
    """Set up GCP context with credentials."""
    logger.info("Initializing GCP MCP server...")
    try:
        # Initialize GCP credentials
        credentials = None
        # Check for service account key file path
        sa_path = os.environ.get("GCP_SERVICE_ACCOUNT_KEY_PATH")
        if sa_path and os.path.exists(sa_path):
            logger.info(f"Using service account key from: {sa_path}")

        # Initialize GCP clients using the client_instances module
        client_instances.initialize_clients(credentials)
        clients = client_instances.get_clients()
        project_id = client_instances.get_project_id()
        location = client_instances.get_location()

        logger.info(f"Server initialized with project: {project_id}")
        # Yield context with clients and credentials
        yield {
            "clients": clients,
            "project_id": project_id,
            "credentials": credentials,
            "location": location,
        }
    except Exception as e:
        logger.error(f"Failed to initialize GCP context: {str(e)}")
        raise
    finally:
        logger.info("Shutting down GCP MCP server...")


# Create the global MCP instance
mcp = FastMCP(
    "GCP Manager",
    description="Manage Google Cloud Platform Resources",
    lifespan=gcp_lifespan,
    dependencies=[
        "mcp>=1.0.0",
        "google-cloud-bigquery",
        "google-cloud-storage",
        "google-cloud-run",
        "google-cloud-artifact-registry",
        "google-cloud-logging",
        "python-dotenv",
        "google-cloud-monitoring",
        "google-cloud-compute",
        "google-cloud-build",
    ],
)


# Basic resources and tools
@mcp.resource("test://hello")
def hello_resource():
    """A simple test resource"""
    return "Hello World"


@mcp.tool()
def test_gcp_auth() -> str:
    """Test GCP authentication"""
    try:
        # Get clients from client_instances module instead of context
        clients = client_instances.get_clients()

        # Test if we can list storage buckets
        if hasattr(clients, "storage"):
            try:
                buckets = list(clients.storage.list_buckets(max_results=5))
                return f"Authentication successful. Found {len(buckets)} buckets. {buckets}"
            except Exception as e:
                return f"Storage authentication failed: {str(e)}"
    except Exception as e:
        return f"Authentication failed: {str(e)}"


# # Function to register services
# def register_services():
#     """Register all service modules with the MCP instance."""
#     services_dir = os.path.join(os.path.dirname(__file__), "services")
#     logger.info(f"Loading services from {services_dir}")
#     if not os.path.exists(services_dir):
#         logger.warning(f"Services directory not found: {services_dir}")
#         return
#     # Get all Python files in the services directory
#     for filename in os.listdir(services_dir):
#         if (
#             filename.endswith(".py")
#             and filename != "__init__.py"
#             and filename != "client_instances.py"
#         ):
#             module_name = filename[:-3]  # Remove .py extension
#             try:
#                 # Load the module using importlib
#                 module_path = os.path.join(services_dir, filename)
#                 spec = importlib.util.spec_from_file_location(
#                     f"services.{module_name}", module_path
#                 )
#                 module = importlib.util.module_from_spec(spec)
#                 # Inject mcp instance into the module
#                 module.mcp = mcp
#                 # Execute the module
#                 spec.loader.exec_module(module)
#                 logger.info(f"Loaded service module: {module_name}")
#                 # If the module has a register function, call it
#                 if hasattr(module, "register"):
#                     # Pass the mcp instance and the server's request_context to register
#                     module.register(mcp)
#                     logger.info(f"Registered service: {module_name}")
#             except Exception as e:
#                 logger.error(f"Error loading service {module_name}: {e}")


def register_services():
    """Register all service modules with the MCP instance."""
    logger.info("Explicitly registering service modules")

    # Print diagnostic information
    logger.info(f"Python sys.path: {sys.path}")
    logger.info(f"Current working directory: {os.getcwd()}")
    logger.info(f"Script location: {os.path.abspath(__file__)}")
    logger.info(f"Parent directory: {os.path.dirname(os.path.abspath(__file__))}")

    try:
        # Try importing a service module to check if imports work
        logger.info("Attempting to import artifact_registry")
        import services.artifact_registry as artifact_registry

        logger.info(f"Module location: {inspect.getfile(artifact_registry)}")
        from services import (
            artifact_registry,
            cloud_audit_logs,
            cloud_bigquery,
            cloud_build,
            cloud_compute_engine,
            cloud_monitoring,
            cloud_run,
            cloud_storage,
        )

        # Register each service module
        artifact_registry.register(mcp)
        cloud_audit_logs.register(mcp)
        cloud_bigquery.register(mcp)
        cloud_build.register(mcp)
        cloud_compute_engine.register(mcp)
        cloud_monitoring.register(mcp)
        cloud_run.register(mcp)
        cloud_storage.register(mcp)

        logger.info("All service modules registered successfully")
    except Exception as e:
        logger.error(f"Error registering service modules: {e}")
        # Add detailed error logging
        import traceback

        logger.error(traceback.format_exc())


if __name__ == "__main__":
    logger.info("Starting GCP MCP server")
    # Register services
    register_services()
    # Run the server
    mcp.run()

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/core/context.py:
--------------------------------------------------------------------------------

```python
# context.py
import json
import os
from typing import Any, Optional, Type

import google.auth

# Import other services as needed
from google.cloud import (
    artifactregistry_v1,
    bigquery,
    compute_v1,
    monitoring_v3,
    storage,
)
from google.oauth2 import service_account


class GCPClients:
    """Client manager for GCP services"""

    def __init__(self, credentials=None):
        self.credentials = self._get_credentials(credentials)
        self.project_id = self._get_project_id()
        self.location = self._get_location()
        self._clients = {}

        # Initialize clients
        self._storage_client = None
        self._bigquery_client = None
        self._run_client = None
        self._logging_client = None
        self._monitoring_client = None
        self._compute_client = None
        self._sql_client = None
        self._cloudbuild_client = None
        self._artifactregistry_client = None

    def _get_credentials(self, credentials=None):
        """Get credentials from various sources"""
        # If credentials are directly provided
        if credentials:
            return credentials

        # Check for service account JSON in environment variable
        sa_json = os.environ.get("GCP_SERVICE_ACCOUNT_JSON")
        if sa_json:
            try:
                sa_info = json.loads(sa_json)
                return service_account.Credentials.from_service_account_info(sa_info)
            except Exception as e:
                print(f"Error loading service account JSON: {e}")

        # Check for service account key file path
        sa_path = os.environ.get("GCP_SERVICE_ACCOUNT_KEY_PATH")
        if sa_path:
            try:
                return service_account.Credentials.from_service_account_file(sa_path)
            except Exception as e:
                print(f"Error loading service account key file: {e}")

        # Fall back to application default credentials
        try:
            credentials, project = google.auth.default()
            return credentials
        except Exception as e:
            print(f"Error getting default credentials: {e}")
            raise RuntimeError("No valid GCP credentials found")

    def _get_project_id(self) -> str:
        """Get project ID from environment or credentials"""
        project_id = os.environ.get("GCP_PROJECT_ID")
        if project_id:
            return project_id

        if hasattr(self.credentials, "project_id"):
            return self.credentials.project_id

        try:
            _, project_id = google.auth.default()
            return project_id
        except Exception:
            raise RuntimeError("Unable to determine GCP project ID")

    def _get_location(self) -> str:
        """Get default location/region from environment"""
        return os.environ.get("GCP_LOCATION", "us-central1")

    # def _init_client(
    #     self, client_class: Type, current_client: Optional[Any], **kwargs
    # ) -> Any:
    #     """Helper method to initialize clients with error handling"""
    #     if not current_client:
    #         try:
    #             return client_class(
    #                 project=self.project_id,  # <-- This adds project automatically
    #                 credentials=self.credentials,  # <-- This adds credentials automatically
    #                 **kwargs,  # <-- This adds any extra params (like database)
    #             )
    #         except Exception as e:
    #             raise RuntimeError(
    #                 f"Failed to initialize {client_class.__name__}: {str(e)}"
    #             )
    #     return current_client

    def _init_client(
        self, client_class: Type, current_client: Optional[Any], **kwargs
    ) -> Any:
        """Helper method to initialize clients with error handling"""
        if not current_client:
            try:
                # Check if this client accepts project parameter
                if client_class in [
                    storage.Client
                ]:  # Add other clients that accept project
                    kwargs["project"] = self.project_id

                return client_class(credentials=self.credentials, **kwargs)
            except Exception as e:
                raise RuntimeError(
                    f"Failed to initialize {client_class.__name__}: {str(e)}"
                )
        return current_client

    @property
    def storage(self) -> storage.Client:
        self._storage_client = self._init_client(storage.Client, self._storage_client)
        return self._storage_client

    @property
    def bigquery(self) -> bigquery.Client:
        self._bigquery_client = self._init_client(
            bigquery.Client, self._bigquery_client
        )
        return self._bigquery_client

    @property
    def artifactregistry(self) -> artifactregistry_v1.ArtifactRegistryClient:
        """Get the Artifact Registry client."""
        if not self._artifactregistry_client:
            try:
                # ArtifactRegistryClient doesn't accept project parameter
                self._artifactregistry_client = (
                    artifactregistry_v1.ArtifactRegistryClient(
                        credentials=self.credentials
                    )
                )
            except Exception as e:
                raise RuntimeError(
                    f"Failed to initialize ArtifactRegistryClient: {str(e)}"
                )
        return self._artifactregistry_client

    # Uncomment and implement other client properties as needed
    # @property
    # def storage(self) -> storage.Client:
    #     self._storage_client = self._init_client(storage.Client, self._storage_client)
    #     return self._storage_client

    def close_all(self):
        """Close all open clients"""
        for client in self._clients.values():
            if hasattr(client, "transport") and hasattr(client.transport, "close"):
                client.transport.close()
            elif hasattr(client, "close"):
                client.close()
        self._clients.clear()


# Update Context class to handle credentials properly
class Context:
    def __init__(self, request_context):
        self.request_context = request_context
        # Get credentials from lifespan context
        credentials = request_context.lifespan_context.get("credentials")
        # Initialize GCPClients with those credentials
        self.clients = GCPClients(credentials)

    def close(self):
        """Clean up when request ends"""
        if hasattr(self, "clients"):
            self.clients.close_all()

    # @property
    # def run(self) -> run_v2.CloudRunClient:
    #     self._run_client = self._init_client(run_v2.CloudRunClient, self._run_client)
    #     return self._run_client

    # @property
    # def logging(self) -> logging_v2.LoggingServiceV2Client:
    #     self._logging_client = self._init_client(
    #         logging_v2.LoggingServiceV2Client, self._logging_client
    #     )
    #     return self._logging_client

    @property
    def monitoring(self) -> monitoring_v3.MetricServiceClient:
        self._monitoring_client = self._init_client(
            monitoring_v3.MetricServiceClient, self._monitoring_client
        )
        return self._monitoring_client

    @property
    def compute(self) -> compute_v1.InstancesClient:
        self._compute_client = self._init_client(
            compute_v1.InstancesClient, self._compute_client
        )
        return self._compute_client

    # @property
    # def sql(self) -> sql_v1.InstancesClient:
    #     self._sql_client = self._init_client(sql_v1.InstancesClient, self._sql_client)
    #     return self._sql_client

    # @property
    # def cloudbuild(self) -> cloudbuild_v1.CloudBuildClient:
    #     self._cloudbuild_client = self._init_client(
    #         cloudbuild_v1.CloudBuildClient, self._cloudbuild_client
    #     )
    #     return self._cloudbuild_client

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_build.py:
--------------------------------------------------------------------------------

```python
import json
from typing import Dict, Optional

from services import client_instances


def register(mcp_instance):
    """Register all Cloud Build resources and tools with the MCP instance."""

    # Resources
    @mcp_instance.resource("gcp://cloudbuild/{project_id}/builds")
    def list_builds_resource(project_id: str = None) -> str:
        """List all Cloud Build executions for a project"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().cloudbuild
            project_id = project_id or client_instances.get_project_id()

            builds = client.list_builds(project_id=project_id)
            result = []
            for build in builds:
                result.append(
                    {
                        "id": build.id,
                        "status": build.status.name if build.status else None,
                        "source": build.source.repo_source.commit_sha
                        if build.source and build.source.repo_source
                        else None,
                        "create_time": build.create_time.isoformat()
                        if build.create_time
                        else None,
                        "logs_url": build.logs_url,
                        "substitutions": dict(build.substitutions)
                        if build.substitutions
                        else {},
                    }
                )
            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    @mcp_instance.resource("gcp://cloudbuild/{project_id}/triggers")
    def list_triggers_resource(project_id: str = None) -> str:
        """List all Cloud Build triggers for a project"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().cloudbuild
            project_id = project_id or client_instances.get_project_id()

            triggers = client.list_triggers(project_id=project_id)
            result = []
            for trigger in triggers:
                result.append(
                    {
                        "id": trigger.id,
                        "name": trigger.name,
                        "description": trigger.description,
                        "trigger_template": {
                            "repo_name": trigger.trigger_template.repo_name,
                            "branch_name": trigger.trigger_template.branch_name,
                        }
                        if trigger.trigger_template
                        else None,
                    }
                )
            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    # Tools
    @mcp_instance.tool()
    def trigger_build(
        build_config: Dict,
        project_id: str = None,
    ) -> str:
        """
        Trigger a new Cloud Build execution

        Args:
            build_config: Dictionary containing the build configuration
            project_id: GCP project ID (defaults to configured project)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().cloudbuild
            project_id = project_id or client_instances.get_project_id()

            # Convert config dict to Build object
            build = cloudbuild_v1.Build.from_json(json.dumps(build_config))

            print(f"Triggering build in project {project_id}...")
            operation = client.create_build(project_id=project_id, build=build)
            response = operation.result()

            return json.dumps(
                {
                    "status": "success",
                    "build_id": response.id,
                    "status": response.status.name if response.status else None,
                    "logs_url": response.logs_url,
                    "build_name": response.name,
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def list_builds(
        project_id: str = None,
        filter: Optional[str] = None,
        page_size: Optional[int] = 100,
    ) -> str:
        """
        List Cloud Build executions with optional filtering

        Args:
            project_id: GCP project ID (defaults to configured project)
            filter: Filter expression for builds
            page_size: Maximum number of results to return
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().cloudbuild
            project_id = project_id or client_instances.get_project_id()

            request = cloudbuild_v1.ListBuildsRequest(
                project_id=project_id, filter=filter, page_size=page_size
            )

            builds = []
            for build in client.list_builds(request=request):
                builds.append(
                    {
                        "id": build.id,
                        "status": build.status.name if build.status else None,
                        "create_time": build.create_time.isoformat()
                        if build.create_time
                        else None,
                        "source": build.source.repo_source.commit_sha
                        if build.source and build.source.repo_source
                        else None,
                        "logs_url": build.logs_url,
                    }
                )

            return json.dumps(
                {"status": "success", "builds": builds, "count": len(builds)}, indent=2
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def create_build_trigger(
        trigger_config: Dict,
        project_id: str = None,
    ) -> str:
        """
        Create a new Cloud Build trigger

        Args:
            trigger_config: Dictionary containing the trigger configuration
            project_id: GCP project ID (defaults to configured project)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().cloudbuild
            project_id = project_id or client_instances.get_project_id()

            # Convert config dict to Trigger object
            trigger = cloudbuild_v1.BuildTrigger.from_json(json.dumps(trigger_config))

            print(f"Creating trigger in project {project_id}...")
            response = client.create_trigger(project_id=project_id, trigger=trigger)

            return json.dumps(
                {
                    "status": "success",
                    "trigger_id": response.id,
                    "name": response.name,
                    "description": response.description,
                    "create_time": response.create_time.isoformat()
                    if response.create_time
                    else None,
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    # Prompts
    @mcp_instance.prompt()
    def build_config_prompt(service_type: str = "docker") -> str:
        """Prompt for creating a Cloud Build configuration"""
        return f"""
I need to create a Cloud Build configuration for {service_type} deployments. Please help with:

1. Recommended build steps for {service_type}
2. Proper use of substitutions
3. Caching strategies
4. Security best practices
5. Integration with other GCP services
"""

    @mcp_instance.prompt()
    def trigger_setup_prompt(repo_type: str = "github") -> str:
        """Prompt for configuring build triggers"""
        return f"""
I want to set up a {repo_type} trigger for Cloud Build. Please explain:

1. Required permissions and connections
2. Event types (push, PR, etc.)
3. File pattern matching
4. Branch/tag filtering
5. Approval workflows
"""

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_compute_engine.py:
--------------------------------------------------------------------------------

```python
import json
from typing import Optional

from services import client_instances


def register(mcp_instance):
    """Register all Compute Engine resources and tools with the MCP instance."""

    # Resources
    @mcp_instance.resource("gcp://compute/{project_id}/{zone}/instances")
    def list_instances_resource(project_id: str = None, zone: str = None) -> str:
        """List all Compute Engine instances in a zone"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().compute
            project_id = project_id or client_instances.get_project_id()
            zone = zone or client_instances.get_location()

            instance_list = client.list(project=project_id, zone=zone)
            result = []
            for instance in instance_list:
                result.append(
                    {
                        "name": instance.name,
                        "status": instance.status,
                        "machine_type": instance.machine_type.split("/")[-1]
                        if instance.machine_type
                        else None,
                        "internal_ip": instance.network_interfaces[0].network_i_p
                        if instance.network_interfaces
                        and len(instance.network_interfaces) > 0
                        else None,
                        "creation_timestamp": instance.creation_timestamp,
                    }
                )
            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    # Tools
    @mcp_instance.tool()
    def create_instance(
        instance_name: str,
        machine_type: str,
        project_id: str = None,
        zone: str = None,
        image: str = "projects/debian-cloud/global/images/family/debian-11",
        network: str = "global/networks/default",
        disk_size_gb: int = 10,
    ) -> str:
        """
        Create a new Compute Engine instance

        Args:
            instance_name: Name for the new instance
            machine_type: Machine type (e.g., n2-standard-2)
            project_id: GCP project ID (defaults to configured project)
            zone: GCP zone (defaults to configured location)
            image: Source image for boot disk
            network: Network to connect to
            disk_size_gb: Size of boot disk in GB
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().compute
            project_id = project_id or client_instances.get_project_id()
            zone = zone or client_instances.get_location()

            # Build instance configuration
            instance_config = {
                "name": instance_name,
                "machine_type": f"zones/{zone}/machineTypes/{machine_type}",
                "disks": [
                    {
                        "boot": True,
                        "auto_delete": True,
                        "initialize_params": {
                            "source_image": image,
                            "disk_size_gb": disk_size_gb,
                        },
                    }
                ],
                "network_interfaces": [
                    {
                        "network": network,
                        "access_configs": [
                            {"type": "ONE_TO_ONE_NAT", "name": "External NAT"}
                        ],
                    }
                ],
            }

            print(f"Creating instance {instance_name} in {zone}...")
            operation = client.insert(
                project=project_id, zone=zone, instance_resource=instance_config
            )

            # Wait for operation to complete
            operation.result()
            return json.dumps(
                {
                    "status": "success",
                    "instance_name": instance_name,
                    "zone": zone,
                    "operation_type": "insert",
                    "operation_status": "DONE",
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def list_instances(
        project_id: str = None,
        zone: str = None,
        filter: Optional[str] = None,
    ) -> str:
        """
        List Compute Engine instances in a zone

        Args:
            project_id: GCP project ID (defaults to configured project)
            zone: GCP zone (defaults to configured location)
            filter: Optional filter expression (e.g., "status=RUNNING")
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().compute
            project_id = project_id or client_instances.get_project_id()
            zone = zone or client_instances.get_location()

            instance_list = client.list(project=project_id, zone=zone, filter=filter)

            instances = []
            for instance in instance_list:
                instances.append(
                    {
                        "name": instance.name,
                        "status": instance.status,
                        "machine_type": instance.machine_type.split("/")[-1]
                        if instance.machine_type
                        else None,
                        "internal_ip": instance.network_interfaces[0].network_i_p
                        if instance.network_interfaces
                        and len(instance.network_interfaces) > 0
                        else None,
                        "creation_timestamp": instance.creation_timestamp,
                    }
                )

            return json.dumps(
                {"status": "success", "instances": instances, "count": len(instances)},
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def start_instance(
        instance_name: str,
        project_id: str = None,
        zone: str = None,
    ) -> str:
        """
        Start a stopped Compute Engine instance

        Args:
            instance_name: Name of the instance to start
            project_id: GCP project ID (defaults to configured project)
            zone: GCP zone (defaults to configured location)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().compute
            project_id = project_id or client_instances.get_project_id()
            zone = zone or client_instances.get_location()

            print(f"Starting instance {instance_name}...")
            operation = client.start(
                project=project_id, zone=zone, instance=instance_name
            )
            operation.result()

            return json.dumps(
                {
                    "status": "success",
                    "instance_name": instance_name,
                    "operation_status": "DONE",
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def stop_instance(
        instance_name: str,
        project_id: str = None,
        zone: str = None,
    ) -> str:
        """
        Stop a running Compute Engine instance

        Args:
            instance_name: Name of the instance to stop
            project_id: GCP project ID (defaults to configured project)
            zone: GCP zone (defaults to configured location)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().compute
            project_id = project_id or client_instances.get_project_id()
            zone = zone or client_instances.get_location()

            print(f"Stopping instance {instance_name}...")
            operation = client.stop(
                project=project_id, zone=zone, instance=instance_name
            )
            operation.result()

            return json.dumps(
                {
                    "status": "success",
                    "instance_name": instance_name,
                    "operation_status": "DONE",
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def delete_instance(
        instance_name: str,
        project_id: str = None,
        zone: str = None,
    ) -> str:
        """
        Delete a Compute Engine instance

        Args:
            instance_name: Name of the instance to delete
            project_id: GCP project ID (defaults to configured project)
            zone: GCP zone (defaults to configured location)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().compute
            project_id = project_id or client_instances.get_project_id()
            zone = zone or client_instances.get_location()

            print(f"Deleting instance {instance_name}...")
            operation = client.delete(
                project=project_id, zone=zone, instance=instance_name
            )
            operation.result()

            return json.dumps(
                {
                    "status": "success",
                    "instance_name": instance_name,
                    "operation_status": "DONE",
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    # Prompts
    @mcp_instance.prompt()
    def instance_config_prompt(workload_type: str = "web-server") -> str:
        """Prompt for creating Compute Engine configurations"""
        return f"""
I need to configure a Compute Engine instance for {workload_type}. Please help with:

1. Recommended machine types for {workload_type}
2. Disk type and size recommendations
3. Network configuration best practices
4. Security considerations (service accounts, firewall rules)
5. Cost optimization strategies
"""

    @mcp_instance.prompt()
    def troubleshooting_prompt(issue: str = "instance not responding") -> str:
        """Prompt for troubleshooting Compute Engine issues"""
        return f"""
I'm experiencing {issue} with my Compute Engine instance. Please guide me through:

1. Common causes for this issue
2. Diagnostic steps using Cloud Console and CLI
3. Log analysis techniques
4. Recovery procedures
5. Prevention strategies
"""

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_audit_logs.py:
--------------------------------------------------------------------------------

```python
import json

from google.cloud import logging_v2
from services import client_instances


def register(mcp_instance):
    """Register all Cloud Audit Logs resources and tools with the MCP instance."""

    # Resources
    @mcp_instance.resource("gcp://audit-logs/{project_id}")
    def list_audit_logs_resource(project_id: str = None) -> str:
        """List recent audit logs from a specific project"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().logging
            project_id = project_id or client_instances.get_project_id()

            # Filter for audit logs only
            filter_str = 'logName:"cloudaudit.googleapis.com"'

            # List log entries
            entries = client.list_log_entries(
                request={
                    "resource_names": [f"projects/{project_id}"],
                    "filter": filter_str,
                    "page_size": 10,  # Limiting to 10 for responsiveness
                }
            )

            result = []
            for entry in entries:
                log_data = {
                    "timestamp": entry.timestamp.isoformat()
                    if entry.timestamp
                    else None,
                    "severity": entry.severity.name if entry.severity else None,
                    "log_name": entry.log_name,
                    "resource": {
                        "type": entry.resource.type,
                        "labels": dict(entry.resource.labels)
                        if entry.resource.labels
                        else {},
                    }
                    if entry.resource
                    else {},
                }

                # Handle payload based on type
                if hasattr(entry, "json_payload") and entry.json_payload:
                    log_data["payload"] = dict(entry.json_payload)
                elif hasattr(entry, "proto_payload") and entry.proto_payload:
                    log_data["payload"] = "Proto payload (details omitted)"
                elif hasattr(entry, "text_payload") and entry.text_payload:
                    log_data["payload"] = entry.text_payload

                result.append(log_data)

            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    @mcp_instance.resource("gcp://audit-logs/{project_id}/sinks")
    def list_log_sinks_resource(project_id: str = None) -> str:
        """List log sinks configured for a project"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().logging
            project_id = project_id or client_instances.get_project_id()

            parent = f"projects/{project_id}"
            sinks = client.list_sinks(parent=parent)

            result = []
            for sink in sinks:
                result.append(
                    {
                        "name": sink.name,
                        "destination": sink.destination,
                        "filter": sink.filter,
                        "description": sink.description,
                        "disabled": sink.disabled,
                    }
                )

            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    # Tools
    @mcp_instance.tool()
    def list_audit_logs(
        project_id: str = None,
        filter_str: str = 'logName:"cloudaudit.googleapis.com"',
        page_size: int = 20,
    ) -> str:
        """
        List Google Cloud Audit logs from a project

        Args:
            project_id: GCP project ID (defaults to configured project)
            filter_str: Log filter expression (defaults to audit logs)
            page_size: Maximum number of entries to return
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().logging
            project_id = project_id or client_instances.get_project_id()

            print(f"Retrieving audit logs from project {project_id}...")

            # List log entries
            entries = client.list_log_entries(
                request={
                    "resource_names": [f"projects/{project_id}"],
                    "filter": filter_str,
                    "page_size": page_size,
                }
            )

            result = []
            for entry in entries:
                log_data = {
                    "timestamp": entry.timestamp.isoformat()
                    if entry.timestamp
                    else None,
                    "severity": entry.severity.name if entry.severity else None,
                    "log_name": entry.log_name,
                    "resource": {
                        "type": entry.resource.type,
                        "labels": dict(entry.resource.labels)
                        if entry.resource.labels
                        else {},
                    }
                    if entry.resource
                    else {},
                }

                # Handle payload based on type
                if hasattr(entry, "json_payload") and entry.json_payload:
                    log_data["payload"] = dict(entry.json_payload)
                elif hasattr(entry, "proto_payload") and entry.proto_payload:
                    log_data["payload"] = "Proto payload (details omitted)"
                elif hasattr(entry, "text_payload") and entry.text_payload:
                    log_data["payload"] = entry.text_payload

                result.append(log_data)

            return json.dumps(
                {
                    "status": "success",
                    "entries": result,
                    "count": len(result),
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def filter_admin_activity_logs(
        project_id: str = None,
        service_name: str = "",
        resource_type: str = "",
        time_range: str = "1h",
        page_size: int = 20,
    ) -> str:
        """
        Filter Admin Activity audit logs for specific services or resource types

        Args:
            project_id: GCP project ID (defaults to configured project)
            service_name: Optional service name to filter by (e.g., compute.googleapis.com)
            resource_type: Optional resource type to filter by (e.g., gce_instance)
            time_range: Time range for logs (e.g., 1h, 24h, 7d)
            page_size: Maximum number of entries to return
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().logging
            project_id = project_id or client_instances.get_project_id()

            # Build filter string
            filter_parts = ['logName:"cloudaudit.googleapis.com%2Factivity"']

            if service_name:
                filter_parts.append(f'protoPayload.serviceName="{service_name}"')

            if resource_type:
                filter_parts.append(f'resource.type="{resource_type}"')

            if time_range:
                filter_parts.append(f'timestamp >= "-{time_range}"')

            filter_str = " AND ".join(filter_parts)

            print(f"Filtering Admin Activity logs with: {filter_str}")

            # List log entries
            entries = client.list_log_entries(
                request={
                    "resource_names": [f"projects/{project_id}"],
                    "filter": filter_str,
                    "page_size": page_size,
                }
            )

            result = []
            for entry in entries:
                # Extract relevant fields for admin activity logs
                log_data = {
                    "timestamp": entry.timestamp.isoformat()
                    if entry.timestamp
                    else None,
                    "method_name": None,
                    "resource_name": None,
                    "service_name": None,
                    "user": None,
                }

                # Extract data from proto payload
                if hasattr(entry, "proto_payload") and entry.proto_payload:
                    payload = entry.proto_payload
                    if hasattr(payload, "method_name"):
                        log_data["method_name"] = payload.method_name
                    if hasattr(payload, "resource_name"):
                        log_data["resource_name"] = payload.resource_name
                    if hasattr(payload, "service_name"):
                        log_data["service_name"] = payload.service_name

                    # Extract authentication info
                    if hasattr(payload, "authentication_info"):
                        auth_info = payload.authentication_info
                        if hasattr(auth_info, "principal_email"):
                            log_data["user"] = auth_info.principal_email

                result.append(log_data)

            return json.dumps(
                {
                    "status": "success",
                    "entries": result,
                    "count": len(result),
                    "filter": filter_str,
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def create_log_sink(
        sink_name: str,
        destination: str,
        project_id: str = None,
        filter_str: str = 'logName:"cloudaudit.googleapis.com"',
        description: str = "",
    ) -> str:
        """
        Create a log sink to export audit logs to a destination

        Args:
            sink_name: Name for the new log sink
            destination: Destination for logs (e.g., storage.googleapis.com/my-bucket)
            project_id: GCP project ID (defaults to configured project)
            filter_str: Log filter expression (defaults to audit logs)
            description: Optional description for the sink
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().logging
            project_id = project_id or client_instances.get_project_id()

            parent = f"projects/{project_id}"

            # Create sink configuration
            sink = logging_v2.LogSink(
                name=sink_name,
                destination=destination,
                filter=filter_str,
                description=description,
            )

            print(f"Creating log sink '{sink_name}' to export to {destination}...")

            # Create the sink
            response = client.create_sink(
                request={
                    "parent": parent,
                    "sink": sink,
                }
            )

            # Important: Recommend setting up IAM permissions
            writer_identity = response.writer_identity

            return json.dumps(
                {
                    "status": "success",
                    "name": response.name,
                    "destination": response.destination,
                    "filter": response.filter,
                    "writer_identity": writer_identity,
                    "next_steps": f"Important: Grant {writer_identity} the appropriate permissions on the destination",
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    # Prompts
    @mcp_instance.prompt()
    def audit_log_investigation() -> str:
        """Prompt for investigating security incidents through audit logs"""
        return """
I need to investigate a potential security incident in my Google Cloud project.

Please help me:
1. Determine what types of audit logs I should check (Admin Activity, Data Access, System Event)
2. Create an effective filter query to find relevant logs
3. Identify key fields to examine for signs of unusual activity
4. Suggest common indicators of potential security issues in audit logs
"""

    @mcp_instance.prompt()
    def log_export_setup() -> str:
        """Prompt for setting up log exports for compliance"""
        return """
I need to set up log exports for compliance requirements.

Please help me:
1. Understand the different destinations available for log exports
2. Design an effective filter to capture all required audit events
3. Implement a log sink with appropriate permissions
4. Verify my setup is correctly capturing and exporting logs
"""

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_storage.py:
--------------------------------------------------------------------------------

```python
import json
import os
from typing import Dict, Optional

from services import client_instances


def register(mcp_instance):
    """Register all Cloud Storage resources and tools with the MCP instance."""

    # Resources
    @mcp_instance.resource("gcp://storage/{project_id}/buckets")
    def list_buckets_resource(project_id: str = None) -> str:
        """List all Cloud Storage buckets in a project"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().storage
            project_id = project_id or client_instances.get_project_id()

            buckets = client.list_buckets()

            result = []
            for bucket in buckets:
                result.append(
                    {
                        "name": bucket.name,
                        "location": bucket.location,
                        "storage_class": bucket.storage_class,
                        "time_created": bucket.time_created.isoformat()
                        if bucket.time_created
                        else None,
                        "versioning_enabled": bucket.versioning_enabled,
                        "labels": dict(bucket.labels) if bucket.labels else {},
                    }
                )

            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    @mcp_instance.resource("gcp://storage/{project_id}/bucket/{bucket_name}")
    def get_bucket_resource(project_id: str = None, bucket_name: str = None) -> str:
        """Get details for a specific Cloud Storage bucket"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().storage
            project_id = project_id or client_instances.get_project_id()

            bucket = client.get_bucket(bucket_name)
            result = {
                "name": bucket.name,
                "location": bucket.location,
                "storage_class": bucket.storage_class,
                "time_created": bucket.time_created.isoformat()
                if bucket.time_created
                else None,
                "versioning_enabled": bucket.versioning_enabled,
                "requester_pays": bucket.requester_pays,
                "lifecycle_rules": bucket.lifecycle_rules,
                "cors": bucket.cors,
                "labels": dict(bucket.labels) if bucket.labels else {},
            }
            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    @mcp_instance.resource("gcp://storage/{project_id}/bucket/{bucket_name}/objects")
    def list_objects_resource(project_id: str = None, bucket_name: str = None) -> str:
        """List objects in a specific Cloud Storage bucket"""
        prefix = ""  # Move it inside the function with a default value
        try:
            # Get client from client_instances
            client = client_instances.get_clients().storage
            project_id = project_id or client_instances.get_project_id()

            bucket = client.get_bucket(bucket_name)
            blobs = bucket.list_blobs(prefix=prefix)

            result = []
            for blob in blobs:
                result.append(
                    {
                        "name": blob.name,
                        "size": blob.size,
                        "updated": blob.updated.isoformat() if blob.updated else None,
                        "content_type": blob.content_type,
                        "md5_hash": blob.md5_hash,
                        "generation": blob.generation,
                        "metadata": blob.metadata,
                    }
                )

            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    # Tools
    @mcp_instance.tool()
    def create_bucket(
        bucket_name: str,
        project_id: str = None,
        location: str = "us-central1",
        storage_class: str = "STANDARD",
        labels: Optional[Dict[str, str]] = None,
        versioning_enabled: bool = False,
    ) -> str:
        """
        Create a Cloud Storage bucket

        Args:
            bucket_name: Name for the new bucket
            project_id: GCP project ID (defaults to configured project)
            location: GCP region (e.g., us-central1)
            storage_class: Storage class (STANDARD, NEARLINE, COLDLINE, ARCHIVE)
            labels: Optional key-value pairs for bucket labels
            versioning_enabled: Whether to enable object versioning
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().storage
            project_id = project_id or client_instances.get_project_id()

            # Validate storage class
            valid_storage_classes = ["STANDARD", "NEARLINE", "COLDLINE", "ARCHIVE"]
            if storage_class not in valid_storage_classes:
                return json.dumps(
                    {
                        "status": "error",
                        "message": f"Invalid storage class: {storage_class}. Valid classes are: {', '.join(valid_storage_classes)}",
                    },
                    indent=2,
                )

            # Log info (similar to ctx.info)
            print(f"Creating bucket {bucket_name} in {location}...")
            bucket = client.bucket(bucket_name)
            bucket.create(location=location, storage_class=storage_class, labels=labels)

            # Set versioning if enabled
            if versioning_enabled:
                bucket.versioning_enabled = True
                bucket.patch()

            return json.dumps(
                {
                    "status": "success",
                    "name": bucket.name,
                    "location": bucket.location,
                    "storage_class": bucket.storage_class,
                    "time_created": bucket.time_created.isoformat()
                    if bucket.time_created
                    else None,
                    "versioning_enabled": bucket.versioning_enabled,
                    "url": f"gs://{bucket_name}/",
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def list_buckets(project_id: str = None, prefix: str = "") -> str:
        """
        List Cloud Storage buckets in a project

        Args:
            project_id: GCP project ID (defaults to configured project)
            prefix: Optional prefix to filter bucket names
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().storage
            project_id = project_id or client_instances.get_project_id()

            # Log info (similar to ctx.info)
            print(f"Listing buckets in project {project_id}...")

            # List buckets with optional prefix filter
            if prefix:
                buckets = [
                    b for b in client.list_buckets() if b.name.startswith(prefix)
                ]
            else:
                buckets = list(client.list_buckets())

            result = []
            for bucket in buckets:
                result.append(
                    {
                        "name": bucket.name,
                        "location": bucket.location,
                        "storage_class": bucket.storage_class,
                        "time_created": bucket.time_created.isoformat()
                        if bucket.time_created
                        else None,
                    }
                )

            return json.dumps(
                {"status": "success", "buckets": result, "count": len(result)}, indent=2
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def upload_object(
        bucket_name: str,
        source_file_path: str,
        destination_blob_name: Optional[str] = None,
        project_id: str = None,
        content_type: Optional[str] = None,
        metadata: Optional[Dict[str, str]] = None,
    ) -> str:
        """
        Upload an object to a Cloud Storage bucket

        Args:
            bucket_name: Name of the bucket
            source_file_path: Local path to the file to upload
            destination_blob_name: Name to assign to the blob (defaults to file basename)
            project_id: GCP project ID (defaults to configured project)
            content_type: Content type of the object (optional)
            metadata: Custom metadata dictionary (optional)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().storage
            project_id = project_id or client_instances.get_project_id()

            # Check if file exists
            if not os.path.exists(source_file_path):
                return json.dumps(
                    {
                        "status": "error",
                        "message": f"File not found: {source_file_path}",
                    },
                    indent=2,
                )

            # Get bucket
            bucket = client.bucket(bucket_name)

            # Use filename if destination_blob_name not provided
            if not destination_blob_name:
                destination_blob_name = os.path.basename(source_file_path)

            # Create blob
            blob = bucket.blob(destination_blob_name)

            # Set content type if provided
            if content_type:
                blob.content_type = content_type

            # Set metadata if provided
            if metadata:
                blob.metadata = metadata

            # Get file size for progress reporting
            file_size = os.path.getsize(source_file_path)
            print(
                f"Uploading {source_file_path} ({file_size} bytes) to gs://{bucket_name}/{destination_blob_name}..."
            )

            # Upload file
            blob.upload_from_filename(source_file_path)

            return json.dumps(
                {
                    "status": "success",
                    "bucket": bucket_name,
                    "name": blob.name,
                    "size": blob.size,
                    "content_type": blob.content_type,
                    "md5_hash": blob.md5_hash,
                    "generation": blob.generation,
                    "public_url": blob.public_url,
                    "gsutil_uri": f"gs://{bucket_name}/{destination_blob_name}",
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def download_object(
        bucket_name: str,
        source_blob_name: str,
        destination_file_path: str,
        project_id: str = None,
    ) -> str:
        """
        Download an object from a Cloud Storage bucket

        Args:
            bucket_name: Name of the bucket
            source_blob_name: Name of the blob to download
            destination_file_path: Local path where the file should be saved
            project_id: GCP project ID (defaults to configured project)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().storage
            project_id = project_id or client_instances.get_project_id()

            # Get bucket and blob
            bucket = client.bucket(bucket_name)
            blob = bucket.blob(source_blob_name)

            # Check if blob exists
            if not blob.exists():
                return json.dumps(
                    {
                        "status": "error",
                        "message": f"Object not found: gs://{bucket_name}/{source_blob_name}",
                    },
                    indent=2,
                )

            # Create directory if doesn't exist
            os.makedirs(
                os.path.dirname(os.path.abspath(destination_file_path)), exist_ok=True
            )

            print(
                f"Downloading gs://{bucket_name}/{source_blob_name} to {destination_file_path}..."
            )

            # Download file
            blob.download_to_filename(destination_file_path)

            return json.dumps(
                {
                    "status": "success",
                    "bucket": bucket_name,
                    "blob_name": source_blob_name,
                    "size": blob.size,
                    "content_type": blob.content_type,
                    "downloaded_to": destination_file_path,
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def delete_object(bucket_name: str, blob_name: str, project_id: str = None) -> str:
        """
        Delete an object from a Cloud Storage bucket

        Args:
            bucket_name: Name of the bucket
            blob_name: Name of the blob to delete
            project_id: GCP project ID (defaults to configured project)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().storage
            project_id = project_id or client_instances.get_project_id()

            # Get bucket and blob
            bucket = client.bucket(bucket_name)
            blob = bucket.blob(blob_name)

            print(f"Deleting gs://{bucket_name}/{blob_name}...")

            # Delete the blob
            blob.delete()

            return json.dumps(
                {
                    "status": "success",
                    "message": f"Successfully deleted gs://{bucket_name}/{blob_name}",
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    # Prompts
    @mcp_instance.prompt()
    def create_bucket_prompt(location: str = "us-central1") -> str:
        """Prompt for creating a new Cloud Storage bucket"""
        return f"""
I need to create a new Cloud Storage bucket in {location}.

Please help me with:
1. Understanding storage classes (STANDARD, NEARLINE, COLDLINE, ARCHIVE)
2. Best practices for bucket naming
3. When to enable versioning
4. Recommendations for bucket security settings
5. Steps to create the bucket
"""

    @mcp_instance.prompt()
    def upload_file_prompt() -> str:
        """Prompt for help with uploading files to Cloud Storage"""
        return """
I need to upload files to a Cloud Storage bucket.

Please help me understand:
1. The best way to organize files in Cloud Storage
2. When to use folders/prefixes
3. How to set appropriate permissions on uploaded files
4. How to make files publicly accessible (if needed)
5. The steps to perform the upload
"""

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/artifact_registry.py:
--------------------------------------------------------------------------------

```python
import json

from services import client_instances


def register(mcp_instance):
    """Register all resources and tools with the MCP instance."""

    @mcp_instance.resource(
        "gcp://artifactregistry/{project_id}/{location}/repositories"
    )
    def list_repositories_resource(project_id: str = None, location: str = None) -> str:
        """List all Artifact Registry repositories in a specific location"""
        try:
            # define the client
            client = client_instances.get_clients().artifactregistry
            project_id = project_id or client_instances.get_project_id()
            location = location or client_instances.get_location()

            # Use parameters directly from URL path
            parent = f"projects/{project_id}/locations/{location}"

            repositories = client.list_repositories(parent=parent)
            result = []
            for repo in repositories:
                result.append(
                    {
                        "name": repo.name.split("/")[-1],
                        "format": repo.format.name
                        if hasattr(repo.format, "name")
                        else str(repo.format),
                        "description": repo.description,
                        "create_time": repo.create_time.isoformat()
                        if repo.create_time
                        else None,
                        "update_time": repo.update_time.isoformat()
                        if repo.update_time
                        else None,
                        "kms_key_name": repo.kms_key_name,
                        "labels": dict(repo.labels) if repo.labels else {},
                    }
                )
            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    # Add a tool for creating repositories
    @mcp_instance.tool()
    def create_artifact_repository(
        name: str, format: str, description: str = ""
    ) -> str:
        """Create a new Artifact Registry repository"""
        try:
            # define the client
            client = client_instances.get_clients().artifactregistry
            project_id = client_instances.get_project_id()
            location = client_instances.get_location()

            parent = f"projects/{project_id}/locations/{location}"

            # Create repository request
            from google.cloud.artifactregistry_v1 import (
                CreateRepositoryRequest,
                Repository,
            )

            # Create the repository format enum value
            if format.upper() not in ["DOCKER", "MAVEN", "NPM", "PYTHON", "APT", "YUM"]:
                return json.dumps(
                    {
                        "error": f"Invalid format: {format}. Must be one of: DOCKER, MAVEN, NPM, PYTHON, APT, YUM"
                    },
                    indent=2,
                )

            repo = Repository(
                name=f"{parent}/repositories/{name}",
                format=getattr(Repository.Format, format.upper()),
                description=description,
            )

            request = CreateRepositoryRequest(
                parent=parent, repository_id=name, repository=repo
            )

            operation = client.create_repository(request=request)
            result = operation.result()  # Wait for operation to complete

            return json.dumps(
                {
                    "name": result.name.split("/")[-1],
                    "format": result.format.name
                    if hasattr(result.format, "name")
                    else str(result.format),
                    "description": result.description,
                    "create_time": result.create_time.isoformat()
                    if result.create_time
                    else None,
                    "update_time": result.update_time.isoformat()
                    if result.update_time
                    else None,
                },
                indent=2,
            )

        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)


#     @mcp_instance.tool()
#     def get_artifact_repository(
#         project_id: str = None,
#         location: str = None,
#         repository_id: str = None,
#     ) -> str:
#         """Get details about a specific Artifact Registry repository"""
#         try:
#             # Get the client from the context
#             clients = ctx.lifespan_context["clients"]
#             client = clients.artifactregistry

#             # Use context values if parameters not provided
#             project_id = project_id or ctx.lifespan_context["project_id"]
#             location = location or ctx.lifespan_context["location"]

#             if not repository_id:
#                 return json.dumps({"error": "Repository ID is required"}, indent=2)

#             name = f"projects/{project_id}/locations/{location}/repositories/{repository_id}"

#             repo = client.get_repository(name=name)
#             result = {
#                 "name": repo.name.split("/")[-1],
#                 "format": repo.format.name
#                 if hasattr(repo.format, "name")
#                 else str(repo.format),
#                 "description": repo.description,
#                 "create_time": repo.create_time.isoformat()
#                 if repo.create_time
#                 else None,
#                 "update_time": repo.update_time.isoformat()
#                 if repo.update_time
#                 else None,
#                 "kms_key_name": repo.kms_key_name,
#                 "labels": dict(repo.labels) if repo.labels else {},
#             }
#             return json.dumps(result, indent=2)
#         except Exception as e:
#             return json.dumps({"error": str(e)}, indent=2)

#     @mcp_instance.tool()
#     def upload_artifact(
#         repo_name: str,
#         artifact_path: str,
#         package: str = "",
#         version: str = "",
#         project_id: str = None,
#         location: str = None,
#     ) -> str:
#         """
#         Upload an artifact to Artifact Registry

#         Args:
#             project_id: GCP project ID
#             location: GCP region (e.g., us-central1)
#             repo_name: Name of the repository
#             artifact_path: Local path to the artifact file
#             package: Package name (optional)
#             version: Version string (optional)
#         """
#         import os

#         try:
#             # Get the client from the context
#             clients = ctx.lifespan_context["clients"]
#             client = clients.artifactregistry

#             # Use context values if parameters not provided
#             project_id = project_id or ctx.lifespan_context["project_id"]
#             location = location or ctx.lifespan_context["location"]

#             if not os.path.exists(artifact_path):
#                 return json.dumps(
#                     {"status": "error", "message": f"File not found: {artifact_path}"}
#                 )

#             filename = os.path.basename(artifact_path)
#             file_size = os.path.getsize(artifact_path)

#             parent = (
#                 f"projects/{project_id}/locations/{location}/repositories/{repo_name}"
#             )

#             with open(artifact_path, "rb") as f:
#                 file_contents = f.read()
#                 # Use the standard client for upload
#                 result = client.upload_artifact(
#                     parent=parent, contents=file_contents, artifact_path=filename
#                 )

#                 return json.dumps(
#                     {
#                         "status": "success",
#                         "uri": result.uri if hasattr(result, "uri") else None,
#                         "message": f"Successfully uploaded {filename}",
#                     },
#                     indent=2,
#                 )
#         except Exception as e:
#             return json.dumps({"status": "error", "message": str(e)}, indent=2)

#     # Prompts
#     @mcp_instance.prompt()
#     def create_repository_prompt(location: str = "us-central1") -> str:
#         """Prompt for creating a new Artifact Registry repository"""
#         return f"""
#     I need to create a new Artifact Registry repository in {location}.

#     Please help me with:
#     1. What formats are available (Docker, NPM, Maven, etc.)
#     2. Best practices for naming repositories
#     3. Recommendations for labels I should apply
#     4. Steps to create the repository
#     """

#     @mcp_instance.prompt()
#     def upload_artifact_prompt(repo_format: str = "DOCKER") -> str:
#         """Prompt for help with uploading artifacts"""
#         return f"""
#     I need to upload a {repo_format.lower()} artifact to my Artifact Registry repository.

#     Please help me understand:
#     1. The recommended way to upload {repo_format.lower()} artifacts
#     2. Any naming conventions I should follow
#     3. How to ensure proper versioning
#     4. The steps to perform the upload
#     """


# import json
# import os
# from typing import Optional

# from google.cloud import artifactregistry_v1
# from mcp.server.fastmcp import Context

# # Instead of importing from core.server


# # Resources
# @mcp.resource("gcp://artifactregistry/{project_id}/{location}/repositories")
# def list_repositories_resource(
#     project_id: str = None, location: str = None, ctx: Context = None
# ) -> str:
#     """List all Artifact Registry repositories in a specific location"""
#     client = ctx.clients.artifactregistry
#     project_id = project_id or ctx.clients.project_id
#     location = location or ctx.clients.location

#     parent = f"projects/{project_id}/locations/{location}"
#     repositories = client.list_repositories(parent=parent)

#     result = []
#     for repo in repositories:
#         result.append(
#             {
#                 "name": repo.name.split("/")[-1],
#                 "format": repo.format.name,
#                 "description": repo.description,
#                 "create_time": repo.create_time.isoformat()
#                 if repo.create_time
#                 else None,
#                 "update_time": repo.update_time.isoformat()
#                 if repo.update_time
#                 else None,
#                 "kms_key_name": repo.kms_key_name,
#                 "labels": dict(repo.labels) if repo.labels else {},
#             }
#         )

#     return json.dumps(result, indent=2)


# @mcp.resource("gcp://artifactregistry/{project_id}/{location}/repository/{repo_name}")
# def get_repository_resource(
#     repo_name: str, project_id: str = None, location: str = None, ctx: Context = None
# ) -> str:
#     """Get details for a specific Artifact Registry repository"""
#     client = ctx.clients.artifactregistry
#     project_id = project_id or ctx.clients.project_id
#     location = location or ctx.clients.location

#     name = f"projects/{project_id}/locations/{location}/repositories/{repo_name}"

#     try:
#         repo = client.get_repository(name=name)
#         result = {
#             "name": repo.name.split("/")[-1],
#             "format": repo.format.name,
#             "description": repo.description,
#             "create_time": repo.create_time.isoformat() if repo.create_time else None,
#             "update_time": repo.update_time.isoformat() if repo.update_time else None,
#             "kms_key_name": repo.kms_key_name,
#             "labels": dict(repo.labels) if repo.labels else {},
#         }
#         return json.dumps(result, indent=2)
#     except Exception as e:
#         return json.dumps({"error": str(e)})


# @mcp.resource(
#     "gcp://artifactregistry/{project_id}/{location}/repository/{repo_name}/packages"
# )
# def list_packages_resource(
#     repo_name: str, project_id: str = None, location: str = None, ctx: Context = None
# ) -> str:
#     """List packages in a specific Artifact Registry repository"""
#     client = ctx.clients.artifactregistry
#     project_id = project_id or ctx.clients.project_id
#     location = location or ctx.clients.location

#     parent = f"projects/{project_id}/locations/{location}/repositories/{repo_name}"
#     packages = client.list_packages(parent=parent)

#     result = []
#     for package in packages:
#         result.append(
#             {
#                 "name": package.name.split("/")[-1],
#                 "display_name": package.display_name,
#                 "create_time": package.create_time.isoformat()
#                 if package.create_time
#                 else None,
#                 "update_time": package.update_time.isoformat()
#                 if package.update_time
#                 else None,
#             }
#         )

#     return json.dumps(result, indent=2)


# # Tools
# @mcp.tool()
# async def create_repository(
#     repo_name: str,
#     format: str = "DOCKER",
#     description: str = "",
#     labels: Optional[dict] = None,
#     project_id: str = None,
#     location: str = None,
#     ctx: Context = None,
# ) -> str:
#     """
#     Create an Artifact Registry repository

#     Args:
#         project_id: GCP project ID
#         location: GCP region (e.g., us-central1)
#         repo_name: Name for the new repository
#         format: Repository format (DOCKER, NPM, PYTHON, MAVEN, APT, YUM, GO)
#         description: Optional description for the repository
#         labels: Optional key-value pairs for repository labels
#     """
#     client = ctx.clients.artifactregistry
#     project_id = project_id or ctx.clients.project_id
#     location = location or ctx.clients.location

#     # Validate format
#     try:
#         format_enum = artifactregistry_v1.Repository.Format[format]
#     except KeyError:
#         return json.dumps(
#             {
#                 "status": "error",
#                 "message": f"Invalid format: {format}. Valid formats are: {', '.join(artifactregistry_v1.Repository.Format._member_names_)}",
#             }
#         )

#     # Create repository
#     parent = f"projects/{project_id}/locations/{location}"
#     repository = artifactregistry_v1.Repository(
#         format=format_enum,
#         description=description,
#     )

#     if labels:
#         repository.labels = labels

#     request = artifactregistry_v1.CreateRepositoryRequest(
#         parent=parent, repository_id=repo_name, repository=repository
#     )

#     try:
#         ctx.info(f"Creating repository {repo_name} in {location}...")
#         response = client.create_repository(request=request)

#         return json.dumps(
#             {
#                 "status": "success",
#                 "name": response.name,
#                 "format": response.format.name,
#                 "description": response.description,
#                 "create_time": response.create_time.isoformat()
#                 if response.create_time
#                 else None,
#             },
#             indent=2,
#         )
#     except Exception as e:
#         return json.dumps({"status": "error", "message": str(e)})


# @mcp.tool()
# async def list_repositories(
#     project_id: str = None, location: str = None, ctx: Context = None
# ) -> str:
#     """
#     List Artifact Registry repositories in a specific location

#     Args:
#         project_id: GCP project ID
#         location: GCP region (e.g., us-central1)
#     """
#     client = ctx.clients.artifactregistry
#     project_id = project_id or ctx.clients.project_id
#     location = location or ctx.clients.location

#     parent = f"projects/{project_id}/locations/{location}"

#     try:
#         ctx.info(f"Listing repositories in {location}...")
#         repositories = client.list_repositories(parent=parent)

#         result = []
#         for repo in repositories:
#             result.append(
#                 {
#                     "name": repo.name.split("/")[-1],
#                     "format": repo.format.name,
#                     "description": repo.description,
#                     "create_time": repo.create_time.isoformat()
#                     if repo.create_time
#                     else None,
#                 }
#             )

#         return json.dumps(
#             {"status": "success", "repositories": result, "count": len(result)},
#             indent=2,
#         )
#     except Exception as e:
#         return json.dumps({"status": "error", "message": str(e)})

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_bigquery.py:
--------------------------------------------------------------------------------

```python
import json
from typing import Any, Dict, List, Optional

from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from services import client_instances


def register(mcp_instance):
    """Register all BigQuery resources and tools with the MCP instance."""

    # Resources
    @mcp_instance.resource("gcp://bigquery/{project_id}/datasets")
    def list_datasets_resource(project_id: str = None) -> str:
        """List all BigQuery datasets in a project"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().bigquery
            project_id = project_id or client_instances.get_project_id()

            datasets = list(client.list_datasets())

            result = []
            for dataset in datasets:
                result.append(
                    {
                        "id": dataset.dataset_id,
                        "full_id": dataset.full_dataset_id,
                        "friendly_name": dataset.friendly_name,
                        "location": dataset.location,
                        "labels": dict(dataset.labels) if dataset.labels else {},
                        "created": dataset.created.isoformat()
                        if dataset.created
                        else None,
                    }
                )

            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    @mcp_instance.resource("gcp://bigquery/{project_id}/dataset/{dataset_id}")
    def get_dataset_resource(project_id: str = None, dataset_id: str = None) -> str:
        """Get details for a specific BigQuery dataset"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().bigquery
            project_id = project_id or client_instances.get_project_id()

            dataset_ref = client.dataset(dataset_id)
            dataset = client.get_dataset(dataset_ref)

            result = {
                "id": dataset.dataset_id,
                "full_id": dataset.full_dataset_id,
                "friendly_name": dataset.friendly_name,
                "description": dataset.description,
                "location": dataset.location,
                "labels": dict(dataset.labels) if dataset.labels else {},
                "created": dataset.created.isoformat() if dataset.created else None,
                "modified": dataset.modified.isoformat() if dataset.modified else None,
                "default_table_expiration_ms": dataset.default_table_expiration_ms,
                "default_partition_expiration_ms": dataset.default_partition_expiration_ms,
            }
            return json.dumps(result, indent=2)
        except NotFound:
            return json.dumps({"error": f"Dataset {dataset_id} not found"}, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    @mcp_instance.resource("gcp://bigquery/{project_id}/dataset/{dataset_id}/tables")
    def list_tables_resource(project_id: str = None, dataset_id: str = None) -> str:
        """List all tables in a BigQuery dataset"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().bigquery
            project_id = project_id or client_instances.get_project_id()

            tables = list(client.list_tables(dataset_id))

            result = []
            for table in tables:
                result.append(
                    {
                        "id": table.table_id,
                        "full_id": f"{table.project}.{table.dataset_id}.{table.table_id}",
                        "table_type": table.table_type,
                    }
                )

            return json.dumps(result, indent=2)
        except NotFound:
            return json.dumps({"error": f"Dataset {dataset_id} not found"}, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    @mcp_instance.resource(
        "gcp://bigquery/{project_id}/dataset/{dataset_id}/table/{table_id}"
    )
    def get_table_resource(
        project_id: str = None, dataset_id: str = None, table_id: str = None
    ) -> str:
        """Get details for a specific BigQuery table"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().bigquery
            project_id = project_id or client_instances.get_project_id()

            table_ref = client.dataset(dataset_id).table(table_id)
            table = client.get_table(table_ref)

            # Extract schema information
            schema_fields = []
            for field in table.schema:
                schema_fields.append(
                    {
                        "name": field.name,
                        "type": field.field_type,
                        "mode": field.mode,
                        "description": field.description,
                    }
                )

            result = {
                "id": table.table_id,
                "full_id": f"{table.project}.{table.dataset_id}.{table.table_id}",
                "friendly_name": table.friendly_name,
                "description": table.description,
                "num_rows": table.num_rows,
                "num_bytes": table.num_bytes,
                "table_type": table.table_type,
                "created": table.created.isoformat() if table.created else None,
                "modified": table.modified.isoformat() if table.modified else None,
                "expires": table.expires.isoformat() if table.expires else None,
                "schema": schema_fields,
                "labels": dict(table.labels) if table.labels else {},
            }
            return json.dumps(result, indent=2)
        except NotFound:
            return json.dumps(
                {"error": f"Table {dataset_id}.{table_id} not found"}, indent=2
            )
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    # Tools
    @mcp_instance.tool()
    def run_query(
        query: str,
        project_id: str = None,
        location: str = None,
        max_results: int = 100,
        use_legacy_sql: bool = False,
        timeout_ms: int = 30000,
    ) -> str:
        """
        Run a BigQuery query and return the results

        Args:
            query: SQL query to execute
            project_id: GCP project ID (defaults to configured project)
            location: Optional BigQuery location (us, eu, etc.)
            max_results: Maximum number of rows to return
            use_legacy_sql: Whether to use legacy SQL syntax
            timeout_ms: Query timeout in milliseconds
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().bigquery
            project_id = project_id or client_instances.get_project_id()
            location = location or client_instances.get_location()

            job_config = bigquery.QueryJobConfig(
                use_legacy_sql=use_legacy_sql,
            )

            # Log info (similar to ctx.info)
            print(f"Running query: {query[:100]}...")

            query_job = client.query(
                query,
                job_config=job_config,
                location=location,
                timeout=timeout_ms / 1000.0,
            )

            # Wait for the query to complete
            results = query_job.result(max_results=max_results)

            # Get the schema
            schema = [field.name for field in results.schema]

            # Convert rows to a list of dictionaries
            rows = []
            for row in results:
                row_dict = {}
                for key in schema:
                    value = row[key]
                    if hasattr(value, "isoformat"):  # Handle datetime objects
                        row_dict[key] = value.isoformat()
                    else:
                        row_dict[key] = value
                rows.append(row_dict)

            # Create summary statistics
            stats = {
                "total_rows": query_job.total_rows,
                "total_bytes_processed": query_job.total_bytes_processed,
                "total_bytes_billed": query_job.total_bytes_billed,
                "billing_tier": query_job.billing_tier,
                "created": query_job.created.isoformat() if query_job.created else None,
                "started": query_job.started.isoformat() if query_job.started else None,
                "ended": query_job.ended.isoformat() if query_job.ended else None,
                "duration_ms": (query_job.ended - query_job.started).total_seconds()
                * 1000
                if query_job.started and query_job.ended
                else None,
            }

            return json.dumps(
                {
                    "status": "success",
                    "schema": schema,
                    "rows": rows,
                    "returned_rows": len(rows),
                    "stats": stats,
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def create_dataset(
        dataset_id: str,
        project_id: str = None,
        location: str = None,
        description: str = "",
        friendly_name: str = None,
        labels: Optional[Dict[str, str]] = None,
        default_table_expiration_ms: Optional[int] = None,
    ) -> str:
        """
        Create a new BigQuery dataset

        Args:
            dataset_id: ID for the new dataset
            project_id: GCP project ID (defaults to configured project)
            location: Dataset location (US, EU, asia-northeast1, etc.)
            description: Optional dataset description
            friendly_name: Optional user-friendly name
            labels: Optional key-value pairs for dataset labels
            default_table_expiration_ms: Default expiration time for tables in milliseconds
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().bigquery
            project_id = project_id or client_instances.get_project_id()
            location = location or client_instances.get_location()

            dataset = bigquery.Dataset(f"{project_id}.{dataset_id}")
            dataset.location = location

            if description:
                dataset.description = description
            if friendly_name:
                dataset.friendly_name = friendly_name
            if labels:
                dataset.labels = labels
            if default_table_expiration_ms:
                dataset.default_table_expiration_ms = default_table_expiration_ms

            # Log info (similar to ctx.info)
            print(f"Creating dataset {dataset_id} in {location}...")
            dataset = client.create_dataset(dataset)

            return json.dumps(
                {
                    "status": "success",
                    "dataset_id": dataset.dataset_id,
                    "full_dataset_id": dataset.full_dataset_id,
                    "location": dataset.location,
                    "created": dataset.created.isoformat() if dataset.created else None,
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def create_table(
        dataset_id: str,
        table_id: str,
        schema_fields: List[Dict[str, Any]],
        project_id: str = None,
        description: str = "",
        friendly_name: str = None,
        expiration_ms: Optional[int] = None,
        labels: Optional[Dict[str, str]] = None,
        clustering_fields: Optional[List[str]] = None,
        time_partitioning_field: Optional[str] = None,
        time_partitioning_type: str = "DAY",
    ) -> str:
        """
        Create a new BigQuery table

        Args:
            dataset_id: Dataset ID where the table will be created
            table_id: ID for the new table
            schema_fields: List of field definitions, each with name, type, mode, and description
            project_id: GCP project ID (defaults to configured project)
            description: Optional table description
            friendly_name: Optional user-friendly name
            expiration_ms: Optional table expiration time in milliseconds
            labels: Optional key-value pairs for table labels
            clustering_fields: Optional list of fields to cluster by
            time_partitioning_field: Optional field to use for time-based partitioning
            time_partitioning_type: Partitioning type (DAY, HOUR, MONTH, YEAR)

        Example schema_fields:
        [
            {"name": "name", "type": "STRING", "mode": "REQUIRED", "description": "Name field"},
            {"name": "age", "type": "INTEGER", "mode": "NULLABLE", "description": "Age field"}
        ]
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().bigquery
            project_id = project_id or client_instances.get_project_id()

            # Convert schema_fields to SchemaField objects
            schema = []
            for field in schema_fields:
                schema.append(
                    bigquery.SchemaField(
                        name=field["name"],
                        field_type=field["type"],
                        mode=field.get("mode", "NULLABLE"),
                        description=field.get("description", ""),
                    )
                )

            # Create table reference
            table_ref = client.dataset(dataset_id).table(table_id)
            table = bigquery.Table(table_ref, schema=schema)

            # Set table properties
            if description:
                table.description = description
            if friendly_name:
                table.friendly_name = friendly_name
            if expiration_ms:
                table.expires = expiration_ms
            if labels:
                table.labels = labels

            # Set clustering if specified
            if clustering_fields:
                table.clustering_fields = clustering_fields

            # Set time partitioning if specified
            if time_partitioning_field:
                if time_partitioning_type == "DAY":
                    table.time_partitioning = bigquery.TimePartitioning(
                        type_=bigquery.TimePartitioningType.DAY,
                        field=time_partitioning_field,
                    )
                elif time_partitioning_type == "HOUR":
                    table.time_partitioning = bigquery.TimePartitioning(
                        type_=bigquery.TimePartitioningType.HOUR,
                        field=time_partitioning_field,
                    )
                elif time_partitioning_type == "MONTH":
                    table.time_partitioning = bigquery.TimePartitioning(
                        type_=bigquery.TimePartitioningType.MONTH,
                        field=time_partitioning_field,
                    )
                elif time_partitioning_type == "YEAR":
                    table.time_partitioning = bigquery.TimePartitioning(
                        type_=bigquery.TimePartitioningType.YEAR,
                        field=time_partitioning_field,
                    )

            # Log info (similar to ctx.info)
            print(f"Creating table {dataset_id}.{table_id}...")
            table = client.create_table(table)

            return json.dumps(
                {
                    "status": "success",
                    "table_id": table.table_id,
                    "full_table_id": f"{table.project}.{table.dataset_id}.{table.table_id}",
                    "created": table.created.isoformat() if table.created else None,
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def delete_table(dataset_id: str, table_id: str, project_id: str = None) -> str:
        """
        Delete a BigQuery table

        Args:
            dataset_id: Dataset ID containing the table
            table_id: ID of the table to delete
            project_id: GCP project ID (defaults to configured project)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().bigquery
            project_id = project_id or client_instances.get_project_id()

            table_ref = client.dataset(dataset_id).table(table_id)

            # Log info (similar to ctx.info)
            print(f"Deleting table {dataset_id}.{table_id}...")
            client.delete_table(table_ref)

            return json.dumps(
                {
                    "status": "success",
                    "message": f"Table {project_id}.{dataset_id}.{table_id} successfully deleted",
                },
                indent=2,
            )
        except NotFound:
            return json.dumps(
                {
                    "status": "error",
                    "message": f"Table {project_id}.{dataset_id}.{table_id} not found",
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def load_table_from_json(
        dataset_id: str,
        table_id: str,
        json_data: List[Dict[str, Any]],
        project_id: str = None,
        schema_fields: Optional[List[Dict[str, Any]]] = None,
        write_disposition: str = "WRITE_APPEND",
    ) -> str:
        """
        Load data into a BigQuery table from JSON data

        Args:
            dataset_id: Dataset ID containing the table
            table_id: ID of the table to load data into
            json_data: List of dictionaries representing rows to insert
            project_id: GCP project ID (defaults to configured project)
            schema_fields: Optional schema definition (if not using existing table schema)
            write_disposition: How to handle existing data (WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().bigquery
            project_id = project_id or client_instances.get_project_id()

            table_ref = client.dataset(dataset_id).table(table_id)

            # Convert write_disposition to the appropriate enum
            if write_disposition == "WRITE_TRUNCATE":
                disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
            elif write_disposition == "WRITE_APPEND":
                disposition = bigquery.WriteDisposition.WRITE_APPEND
            elif write_disposition == "WRITE_EMPTY":
                disposition = bigquery.WriteDisposition.WRITE_EMPTY
            else:
                return json.dumps(
                    {
                        "status": "error",
                        "message": f"Invalid write_disposition: {write_disposition}. Use WRITE_TRUNCATE, WRITE_APPEND, or WRITE_EMPTY.",
                    },
                    indent=2,
                )

            # Create job config
            job_config = bigquery.LoadJobConfig(
                write_disposition=disposition,
                source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
            )

            # Set schema if provided
            if schema_fields:
                schema = []
                for field in schema_fields:
                    schema.append(
                        bigquery.SchemaField(
                            name=field["name"],
                            field_type=field["type"],
                            mode=field.get("mode", "NULLABLE"),
                            description=field.get("description", ""),
                        )
                    )
                job_config.schema = schema

            # Convert JSON data to newline-delimited JSON (not needed but keeping the log)
            print(f"Loading {len(json_data)} rows into {dataset_id}.{table_id}...")

            # Create and run the load job
            load_job = client.load_table_from_json(
                json_data, table_ref, job_config=job_config
            )

            # Wait for the job to complete
            load_job.result()

            # Get updated table info
            table = client.get_table(table_ref)

            return json.dumps(
                {
                    "status": "success",
                    "rows_loaded": len(json_data),
                    "total_rows": table.num_rows,
                    "message": f"Successfully loaded data into {project_id}.{dataset_id}.{table_id}",
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def export_table_to_csv(
        dataset_id: str,
        table_id: str,
        destination_uri: str,
        project_id: str = None,
        print_header: bool = True,
        field_delimiter: str = ",",
    ) -> str:
        """
        Export a BigQuery table to Cloud Storage as CSV

        Args:
            dataset_id: Dataset ID containing the table
            table_id: ID of the table to export
            destination_uri: GCS URI (gs://bucket/path)
            project_id: GCP project ID (defaults to configured project)
            print_header: Whether to include column headers
            field_delimiter: Delimiter character for fields
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().bigquery
            project_id = project_id or client_instances.get_project_id()

            table_ref = client.dataset(dataset_id).table(table_id)

            # Validate destination URI
            if not destination_uri.startswith("gs://"):
                return json.dumps(
                    {
                        "status": "error",
                        "message": "destination_uri must start with gs://",
                    },
                    indent=2,
                )

            job_config = bigquery.ExtractJobConfig()
            job_config.destination_format = bigquery.DestinationFormat.CSV
            job_config.print_header = print_header
            job_config.field_delimiter = field_delimiter

            # Log info (similar to ctx.info)
            print(f"Exporting {dataset_id}.{table_id} to {destination_uri}...")

            # Create and run the extract job
            extract_job = client.extract_table(
                table_ref, destination_uri, job_config=job_config
            )

            # Wait for the job to complete
            extract_job.result()

            return json.dumps(
                {
                    "status": "success",
                    "destination": destination_uri,
                    "message": f"Successfully exported {project_id}.{dataset_id}.{table_id} to {destination_uri}",
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    # Prompts
    @mcp_instance.prompt()
    def create_dataset_prompt() -> str:
        """Prompt for creating a new BigQuery dataset"""
        return """
I need to create a new BigQuery dataset.

Please help me with:
1. Choosing an appropriate location for my dataset
2. Understanding dataset naming conventions
3. Best practices for dataset configuration (expiration, labels, etc.)
4. The process to create the dataset
"""

    @mcp_instance.prompt()
    def query_optimization_prompt() -> str:
        """Prompt for BigQuery query optimization help"""
        return """
I have a BigQuery query that's slow or expensive to run.

Please help me optimize it by:
1. Analyzing key factors that affect BigQuery performance
2. Identifying common patterns that lead to inefficient queries
3. Suggesting specific optimization techniques
4. Helping me understand how to use EXPLAIN plan analysis
"""

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_run.py:
--------------------------------------------------------------------------------

```python
import json
from typing import Dict, Optional

from google.cloud import run_v2
from google.protobuf import field_mask_pb2
from services import client_instances


def register(mcp_instance):
    """Register all Cloud Run resources and tools with the MCP instance."""

    # Resources
    @mcp_instance.resource("gcp://cloudrun/{project_id}/{region}/services")
    def list_services_resource(project_id: str = None, region: str = None) -> str:
        """List all Cloud Run services in a specific region"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().run
            project_id = project_id or client_instances.get_project_id()
            region = region or client_instances.get_location()

            parent = f"projects/{project_id}/locations/{region}"
            services = client.list_services(parent=parent)

            result = []
            for service in services:
                result.append(
                    {
                        "name": service.name.split("/")[-1],
                        "uid": service.uid,
                        "generation": service.generation,
                        "labels": dict(service.labels) if service.labels else {},
                        "annotations": dict(service.annotations)
                        if service.annotations
                        else {},
                        "create_time": service.create_time.isoformat()
                        if service.create_time
                        else None,
                        "update_time": service.update_time.isoformat()
                        if service.update_time
                        else None,
                        "uri": service.uri,
                    }
                )

            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    @mcp_instance.resource("gcp://run/{project_id}/{location}/service/{service_name}")
    def get_service_resource(
        service_name: str, project_id: str = None, location: str = None
    ) -> str:
        """Get details for a specific Cloud Run service"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().run
            project_id = project_id or client_instances.get_project_id()
            location = location or client_instances.get_location()

            name = f"projects/{project_id}/locations/{location}/services/{service_name}"
            service = client.get_service(name=name)

            # Extract container details
            containers = []
            if service.template and service.template.containers:
                for container in service.template.containers:
                    container_info = {
                        "image": container.image,
                        "command": list(container.command) if container.command else [],
                        "args": list(container.args) if container.args else [],
                        "env": [
                            {"name": env.name, "value": env.value}
                            for env in container.env
                        ]
                        if container.env
                        else [],
                        "resources": {
                            "limits": dict(container.resources.limits)
                            if container.resources and container.resources.limits
                            else {},
                            "cpu_idle": container.resources.cpu_idle
                            if container.resources
                            else None,
                        }
                        if container.resources
                        else {},
                        "ports": [
                            {"name": port.name, "container_port": port.container_port}
                            for port in container.ports
                        ]
                        if container.ports
                        else [],
                    }
                    containers.append(container_info)

            # Extract scaling details
            scaling = None
            if service.template and service.template.scaling:
                scaling = {
                    "min_instance_count": service.template.scaling.min_instance_count,
                    "max_instance_count": service.template.scaling.max_instance_count,
                }

            result = {
                "name": service.name.split("/")[-1],
                "uid": service.uid,
                "generation": service.generation,
                "labels": dict(service.labels) if service.labels else {},
                "annotations": dict(service.annotations) if service.annotations else {},
                "create_time": service.create_time.isoformat()
                if service.create_time
                else None,
                "update_time": service.update_time.isoformat()
                if service.update_time
                else None,
                "creator": service.creator,
                "last_modifier": service.last_modifier,
                "client": service.client,
                "client_version": service.client_version,
                "ingress": service.ingress.name if service.ingress else None,
                "launch_stage": service.launch_stage.name
                if service.launch_stage
                else None,
                "traffic": [
                    {
                        "type": traffic.type_.name if traffic.type_ else None,
                        "revision": traffic.revision.split("/")[-1]
                        if traffic.revision
                        else None,
                        "percent": traffic.percent,
                        "tag": traffic.tag,
                    }
                    for traffic in service.traffic
                ]
                if service.traffic
                else [],
                "uri": service.uri,
                "template": {
                    "containers": containers,
                    "execution_environment": service.template.execution_environment.name
                    if service.template and service.template.execution_environment
                    else None,
                    "vpc_access": {
                        "connector": service.template.vpc_access.connector,
                        "egress": service.template.vpc_access.egress.name
                        if service.template.vpc_access.egress
                        else None,
                    }
                    if service.template and service.template.vpc_access
                    else None,
                    "scaling": scaling,
                    "timeout": f"{service.template.timeout.seconds}s {service.template.timeout.nanos}ns"
                    if service.template and service.template.timeout
                    else None,
                    "service_account": service.template.service_account
                    if service.template
                    else None,
                }
                if service.template
                else {},
            }

            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    @mcp_instance.resource(
        "gcp://run/{project_id}/{location}/service/{service_name}/revisions"
    )
    def list_revisions_resource(
        service_name: str, project_id: str = None, location: str = None
    ) -> str:
        """List revisions for a specific Cloud Run service"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().run
            project_id = project_id or client_instances.get_project_id()
            location = location or client_instances.get_location()

            parent = f"projects/{project_id}/locations/{location}"

            # List all revisions
            revisions = client.list_revisions(parent=parent)

            # Filter revisions for the specified service
            service_revisions = []
            for revision in revisions:
                # Check if this revision belongs to the specified service
                if service_name in revision.name:
                    service_revisions.append(
                        {
                            "name": revision.name.split("/")[-1],
                            "uid": revision.uid,
                            "generation": revision.generation,
                            "service": revision.service.split("/")[-1]
                            if revision.service
                            else None,
                            "create_time": revision.create_time.isoformat()
                            if revision.create_time
                            else None,
                            "update_time": revision.update_time.isoformat()
                            if revision.update_time
                            else None,
                            "scaling": {
                                "min_instance_count": revision.scaling.min_instance_count
                                if revision.scaling
                                else None,
                                "max_instance_count": revision.scaling.max_instance_count
                                if revision.scaling
                                else None,
                            }
                            if revision.scaling
                            else None,
                            "conditions": [
                                {
                                    "type": condition.type,
                                    "state": condition.state.name
                                    if condition.state
                                    else None,
                                    "message": condition.message,
                                    "last_transition_time": condition.last_transition_time.isoformat()
                                    if condition.last_transition_time
                                    else None,
                                    "severity": condition.severity.name
                                    if condition.severity
                                    else None,
                                }
                                for condition in revision.conditions
                            ]
                            if revision.conditions
                            else [],
                        }
                    )

            return json.dumps(service_revisions, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    # Tools
    @mcp_instance.tool()
    def create_service(
        service_name: str,
        image: str,
        project_id: str = None,
        location: str = None,
        env_vars: Optional[Dict[str, str]] = None,
        memory_limit: str = "512Mi",
        cpu_limit: str = "1",
        min_instances: int = 0,
        max_instances: int = 100,
        port: int = 8080,
        allow_unauthenticated: bool = True,
        service_account: str = None,
        vpc_connector: str = None,
        timeout_seconds: int = 300,
    ) -> str:
        """
        Create a new Cloud Run service

        Args:
            service_name: Name for the new service
            image: Container image to deploy (e.g., gcr.io/project/image:tag)
            project_id: GCP project ID (defaults to configured project)
            location: GCP region (e.g., us-central1) (defaults to configured location)
            env_vars: Environment variables as key-value pairs
            memory_limit: Memory limit (e.g., 512Mi, 1Gi)
            cpu_limit: CPU limit (e.g., 1, 2)
            min_instances: Minimum number of instances
            max_instances: Maximum number of instances
            port: Container port
            allow_unauthenticated: Whether to allow unauthenticated access
            service_account: Service account email
            vpc_connector: VPC connector name
            timeout_seconds: Request timeout in seconds
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().run
            project_id = project_id or client_instances.get_project_id()
            location = location or client_instances.get_location()

            print(f"Creating Cloud Run service {service_name} in {location}...")

            # Create container
            container = run_v2.Container(
                image=image,
                resources=run_v2.ResourceRequirements(
                    limits={
                        "memory": memory_limit,
                        "cpu": cpu_limit,
                    }
                ),
            )

            # Add environment variables if provided
            if env_vars:
                container.env = [
                    run_v2.EnvVar(name=key, value=value)
                    for key, value in env_vars.items()
                ]

            # Add port
            container.ports = [run_v2.ContainerPort(container_port=port)]

            # Create template
            template = run_v2.RevisionTemplate(
                containers=[container],
                scaling=run_v2.RevisionScaling(
                    min_instance_count=min_instances,
                    max_instance_count=max_instances,
                ),
                timeout=run_v2.Duration(seconds=timeout_seconds),
            )

            # Add service account if provided
            if service_account:
                template.service_account = service_account

            # Add VPC connector if provided
            if vpc_connector:
                template.vpc_access = run_v2.VpcAccess(
                    connector=vpc_connector,
                    egress=run_v2.VpcAccess.VpcEgress.ALL_TRAFFIC,
                )

            # Create service
            service = run_v2.Service(
                template=template,
                ingress=run_v2.IngressTraffic.INGRESS_TRAFFIC_ALL
                if allow_unauthenticated
                else run_v2.IngressTraffic.INGRESS_TRAFFIC_INTERNAL_ONLY,
            )

            # Create the service
            parent = f"projects/{project_id}/locations/{location}"
            operation = client.create_service(
                parent=parent,
                service_id=service_name,
                service=service,
            )

            # Wait for the operation to complete
            print(f"Waiting for service {service_name} to be created...")
            result = operation.result()

            return json.dumps(
                {
                    "status": "success",
                    "name": result.name.split("/")[-1],
                    "uri": result.uri,
                    "create_time": result.create_time.isoformat()
                    if result.create_time
                    else None,
                    "update_time": result.update_time.isoformat()
                    if result.update_time
                    else None,
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def list_services(project_id: str = None, region: str = None) -> str:
        """
        List Cloud Run services in a specific region

        Args:
            project_id: GCP project ID (defaults to configured project)
            region: GCP region (e.g., us-central1) (defaults to configured location)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().run
            project_id = project_id or client_instances.get_project_id()
            region = region or client_instances.get_location()

            parent = f"projects/{project_id}/locations/{region}"

            print(f"Listing Cloud Run services in {region}...")
            services = client.list_services(parent=parent)

            result = []
            for service in services:
                result.append(
                    {
                        "name": service.name.split("/")[-1],
                        "uri": service.uri,
                        "create_time": service.create_time.isoformat()
                        if service.create_time
                        else None,
                        "update_time": service.update_time.isoformat()
                        if service.update_time
                        else None,
                        "labels": dict(service.labels) if service.labels else {},
                    }
                )

            return json.dumps(
                {"status": "success", "services": result, "count": len(result)},
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def update_service(
        service_name: str,
        project_id: str = None,
        region: str = None,
        image: Optional[str] = None,
        memory: Optional[str] = None,
        cpu: Optional[str] = None,
        max_instances: Optional[int] = None,
        min_instances: Optional[int] = None,
        env_vars: Optional[Dict[str, str]] = None,
        concurrency: Optional[int] = None,
        timeout_seconds: Optional[int] = None,
        service_account: Optional[str] = None,
        labels: Optional[Dict[str, str]] = None,
    ) -> str:
        """
        Update an existing Cloud Run service

        Args:
            service_name: Name of the service to update
            project_id: GCP project ID (defaults to configured project)
            region: GCP region (e.g., us-central1) (defaults to configured location)
            image: New container image URL (optional)
            memory: New memory allocation (optional)
            cpu: New CPU allocation (optional)
            max_instances: New maximum number of instances (optional)
            min_instances: New minimum number of instances (optional)
            env_vars: New environment variables (optional)
            concurrency: New maximum concurrent requests per instance (optional)
            timeout_seconds: New request timeout in seconds (optional)
            service_account: New service account email (optional)
            labels: New key-value labels to apply to the service (optional)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().run
            project_id = project_id or client_instances.get_project_id()
            region = region or client_instances.get_location()

            # Get the existing service
            name = f"projects/{project_id}/locations/{region}/services/{service_name}"

            print(f"Getting current service configuration for {service_name}...")
            service = client.get_service(name=name)

            # Track which fields are being updated
            update_mask_fields = []

            # Update the container image if specified
            if image and service.template and service.template.containers:
                service.template.containers[0].image = image
                update_mask_fields.append("template.containers[0].image")

            # Update resources if specified
            if (memory or cpu) and service.template and service.template.containers:
                if not service.template.containers[0].resources:
                    service.template.containers[
                        0
                    ].resources = run_v2.ResourceRequirements(limits={})

                if not service.template.containers[0].resources.limits:
                    service.template.containers[0].resources.limits = {}

                if memory:
                    service.template.containers[0].resources.limits["memory"] = memory
                    update_mask_fields.append(
                        "template.containers[0].resources.limits.memory"
                    )

                if cpu:
                    service.template.containers[0].resources.limits["cpu"] = cpu
                    update_mask_fields.append(
                        "template.containers[0].resources.limits.cpu"
                    )

            # Update scaling parameters
            if max_instances is not None and service.template:
                service.template.max_instance_count = max_instances
                update_mask_fields.append("template.max_instance_count")

            if min_instances is not None and service.template:
                service.template.min_instance_count = min_instances
                update_mask_fields.append("template.min_instance_count")

            # Update concurrency
            if concurrency is not None and service.template:
                service.template.max_instance_request_concurrency = concurrency
                update_mask_fields.append("template.max_instance_request_concurrency")

            # Update timeout
            if timeout_seconds is not None and service.template:
                service.template.timeout = {"seconds": timeout_seconds}
                update_mask_fields.append("template.timeout")

            # Update service account
            if service_account is not None and service.template:
                service.template.service_account = service_account
                update_mask_fields.append("template.service_account")

            # Update environment variables
            if (
                env_vars is not None
                and service.template
                and service.template.containers
            ):
                # Create new env var list
                new_env_vars = [
                    run_v2.EnvVar(name=key, value=value)
                    for key, value in env_vars.items()
                ]
                service.template.containers[0].env = new_env_vars
                update_mask_fields.append("template.containers[0].env")

            # Update labels
            if labels is not None:
                service.labels = labels
                update_mask_fields.append("labels")

            # Only proceed if there are fields to update
            if not update_mask_fields:
                return json.dumps(
                    {"status": "info", "message": "No updates specified"}, indent=2
                )

            # Create update mask
            update_mask = field_mask_pb2.FieldMask(paths=update_mask_fields)

            # Create the request
            request = run_v2.UpdateServiceRequest(
                service=service, update_mask=update_mask
            )

            print(f"Updating Cloud Run service {service_name}...")
            operation = client.update_service(request=request)

            print("Waiting for service update to complete...")
            response = operation.result()

            return json.dumps(
                {
                    "status": "success",
                    "name": response.name,
                    "uri": response.uri,
                    "updated_fields": update_mask_fields,
                    "conditions": [
                        {
                            "type": condition.type_,
                            "state": condition.state.name
                            if hasattr(condition.state, "name")
                            else str(condition.state),
                            "message": condition.message,
                        }
                        for condition in response.conditions
                    ]
                    if response.conditions
                    else [],
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def delete_service(
        service_name: str, project_id: str = None, region: str = None
    ) -> str:
        """
        Delete a Cloud Run service

        Args:
            service_name: Name of the service to delete
            project_id: GCP project ID (defaults to configured project)
            region: GCP region (e.g., us-central1) (defaults to configured location)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().run
            project_id = project_id or client_instances.get_project_id()
            region = region or client_instances.get_location()

            name = f"projects/{project_id}/locations/{region}/services/{service_name}"

            print(f"Deleting Cloud Run service {service_name} in {region}...")
            operation = client.delete_service(name=name)

            print("Waiting for service deletion to complete...")
            operation.result()

            return json.dumps(
                {
                    "status": "success",
                    "message": f"Service {service_name} successfully deleted",
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def get_service(
        service_name: str, project_id: str = None, region: str = None
    ) -> str:
        """
        Get details of a specific Cloud Run service

        Args:
            service_name: Name of the service
            project_id: GCP project ID (defaults to configured project)
            region: GCP region (e.g., us-central1) (defaults to configured location)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().run
            project_id = project_id or client_instances.get_project_id()
            region = region or client_instances.get_location()

            name = f"projects/{project_id}/locations/{region}/services/{service_name}"

            print(f"Getting details for Cloud Run service {service_name}...")
            service = client.get_service(name=name)

            # Parse container details
            containers = []
            if service.template and service.template.containers:
                for container in service.template.containers:
                    container_info = {
                        "image": container.image,
                        "resources": {
                            "limits": dict(container.resources.limits)
                            if container.resources and container.resources.limits
                            else {}
                        }
                        if container.resources
                        else {},
                        "env_vars": {env.name: env.value for env in container.env}
                        if container.env
                        else {},
                    }
                    containers.append(container_info)

            # Parse traffic
            traffic_result = []
            if service.traffic:
                for traffic in service.traffic:
                    traffic_info = {
                        "type": traffic.type_.name
                        if hasattr(traffic.type_, "name")
                        else str(traffic.type_),
                        "revision": traffic.revision.split("/")[-1]
                        if traffic.revision
                        else None,
                        "percent": traffic.percent,
                        "tag": traffic.tag,
                    }
                    traffic_result.append(traffic_info)

            return json.dumps(
                {
                    "status": "success",
                    "name": service.name.split("/")[-1],
                    "uri": service.uri,
                    "containers": containers,
                    "traffic": traffic_result,
                    "update_time": service.update_time.isoformat()
                    if service.update_time
                    else None,
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    # Prompts
    @mcp_instance.prompt()
    def deploy_service_prompt(
        service_name: str,
        image: str,
        min_instances: str = "0",
        max_instances: str = "100",
        env_vars: str = "{}",
    ) -> str:
        """Prompt to deploy a Cloud Run service with the given configuration"""
        return f"""
I need to deploy a Cloud Run service with the following configuration:

- Service name: {service_name}
- Container image: {image}
- Min instances: {min_instances}
- Max instances: {max_instances}
- Environment variables: {env_vars}

Please help me set up this service and explain the deployment process and any best practices I should follow.
"""

    @mcp_instance.prompt()
    def check_service_status_prompt(service_name: str) -> str:
        """Prompt to check the status of a deployed Cloud Run service"""
        return f"""
I need to check the current status of my Cloud Run service named "{service_name}".

Please provide me with:
1. Is the service currently running?
2. What is the URL to access it?
3. What revision is currently serving traffic?
4. Are there any issues with the service?
"""

    @mcp_instance.prompt()
    def create_service_prompt(region: str = "us-central1") -> str:
        """Prompt for creating a new Cloud Run service"""
        return f"""
I need to create a new Cloud Run service in {region}.

Please help me with:
1. What container image should I use?
2. How much CPU and memory should I allocate?
3. Should I set min and max instances for scaling?
4. Do I need to set any environment variables?
5. Should I allow unauthenticated access?
6. What's the best way to deploy my service?
"""

    @mcp_instance.prompt()
    def update_service_prompt() -> str:
        """Prompt for updating a Cloud Run service"""
        return """
I need to update my Cloud Run service.

Please help me understand:
1. How to update the container image
2. How to change resource allocations
3. How to add or modify environment variables
4. How to update scaling settings
5. Best practices for zero-downtime updates
"""

```

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_monitoring.py:
--------------------------------------------------------------------------------

```python
import datetime
import json
from typing import Dict, List, Optional

from google.cloud import monitoring_v3
from google.protobuf.duration_pb2 import Duration
from google.protobuf.timestamp_pb2 import Timestamp
from services import client_instances


def register(mcp_instance):
    """Register all Cloud Monitoring resources and tools with the MCP instance."""

    # Resources
    @mcp_instance.resource("gcp://monitoring/{project_id}/metrics")
    def list_metrics_resource(project_id: str = None) -> str:
        """List all available metrics for a GCP project"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().monitoring
            project_id = project_id or client_instances.get_project_id()

            name = f"projects/{project_id}"
            metrics = client.list_metric_descriptors(name=name)

            result = []
            for metric in metrics:
                result.append(
                    {
                        "name": metric.name,
                        "type": metric.type,
                        "display_name": metric.display_name,
                        "description": metric.description,
                        "kind": monitoring_v3.MetricDescriptor.MetricKind.Name(
                            metric.metric_kind
                        ),
                        "value_type": monitoring_v3.MetricDescriptor.ValueType.Name(
                            metric.value_type
                        ),
                        "unit": metric.unit,
                    }
                )

            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    @mcp_instance.resource("gcp://monitoring/{project_id}/alerts")
    def list_alerts_resource(project_id: str = None) -> str:
        """List all alert policies for a GCP project"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().monitoring
            project_id = project_id or client_instances.get_project_id()

            name = f"projects/{project_id}"
            policies = client.list_alert_policies(name=name)

            result = []
            for policy in policies:
                result.append(
                    {
                        "name": policy.name,
                        "display_name": policy.display_name,
                        "enabled": policy.enabled,
                        "conditions_count": len(policy.conditions),
                        "creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat()
                        if policy.creation_record and policy.creation_record.mutate_time
                        else None,
                        "notification_channels": [
                            chan.split("/")[-1] for chan in policy.notification_channels
                        ]
                        if policy.notification_channels
                        else [],
                    }
                )

            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    @mcp_instance.resource("gcp://monitoring/{project_id}/alert/{alert_id}")
    def get_alert_resource(project_id: str = None, alert_id: str = None) -> str:
        """Get details for a specific alert policy"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().monitoring
            project_id = project_id or client_instances.get_project_id()

            name = f"projects/{project_id}/alertPolicies/{alert_id}"
            policy = client.get_alert_policy(name=name)

            conditions = []
            for condition in policy.conditions:
                condition_data = {
                    "name": condition.name,
                    "display_name": condition.display_name,
                    "type": condition.condition_type_name
                    if hasattr(condition, "condition_type_name")
                    else None,
                }

                # Add condition-specific details
                if condition.HasField("condition_threshold"):
                    threshold = condition.condition_threshold
                    condition_data["threshold"] = {
                        "filter": threshold.filter,
                        "comparison": monitoring_v3.ComparisonType.Name(
                            threshold.comparison
                        )
                        if threshold.comparison
                        else None,
                        "threshold_value": threshold.threshold_value,
                        "duration": f"{threshold.duration.seconds}s"
                        if threshold.duration
                        else None,
                        "aggregation": {
                            "alignment_period": f"{threshold.aggregations[0].alignment_period.seconds}s"
                            if threshold.aggregations
                            and threshold.aggregations[0].alignment_period
                            else None,
                            "per_series_aligner": monitoring_v3.Aggregation.Aligner.Name(
                                threshold.aggregations[0].per_series_aligner
                            )
                            if threshold.aggregations
                            and threshold.aggregations[0].per_series_aligner
                            else None,
                            "cross_series_reducer": monitoring_v3.Aggregation.Reducer.Name(
                                threshold.aggregations[0].cross_series_reducer
                            )
                            if threshold.aggregations
                            and threshold.aggregations[0].cross_series_reducer
                            else None,
                        }
                        if threshold.aggregations and len(threshold.aggregations) > 0
                        else None,
                    }

                conditions.append(condition_data)

            result = {
                "name": policy.name,
                "display_name": policy.display_name,
                "enabled": policy.enabled,
                "conditions": conditions,
                "combiner": monitoring_v3.AlertPolicy.ConditionCombinerType.Name(
                    policy.combiner
                )
                if policy.combiner
                else None,
                "notification_channels": policy.notification_channels,
                "creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat()
                if policy.creation_record and policy.creation_record.mutate_time
                else None,
                "documentation": {
                    "content": policy.documentation.content,
                    "mime_type": policy.documentation.mime_type,
                }
                if policy.HasField("documentation")
                else None,
            }

            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    @mcp_instance.resource("gcp://monitoring/{project_id}/notification_channels")
    def list_notification_channels_resource(project_id: str = None) -> str:
        """List notification channels for a GCP project"""
        try:
            # Get client from client_instances
            client = client_instances.get_clients().monitoring
            project_id = project_id or client_instances.get_project_id()

            name = f"projects/{project_id}"
            channels = client.list_notification_channels(name=name)

            result = []
            for channel in channels:
                result.append(
                    {
                        "name": channel.name,
                        "type": channel.type,
                        "display_name": channel.display_name,
                        "description": channel.description,
                        "verification_status": monitoring_v3.NotificationChannel.VerificationStatus.Name(
                            channel.verification_status
                        )
                        if channel.verification_status
                        else None,
                        "enabled": channel.enabled,
                        "labels": dict(channel.labels) if channel.labels else {},
                    }
                )

            return json.dumps(result, indent=2)
        except Exception as e:
            return json.dumps({"error": str(e)}, indent=2)

    # Tools
    @mcp_instance.tool()
    def list_metrics(project_id: str = None, filter_str: str = "") -> str:
        """
        List metrics in a GCP project with optional filtering

        Args:
            project_id: GCP project ID (defaults to configured project)
            filter_str: Optional filter string to narrow results (e.g., "metric.type = starts_with(\"compute.googleapis.com\")")
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().monitoring
            project_id = project_id or client_instances.get_project_id()

            parent = f"projects/{project_id}"
            request = monitoring_v3.ListMetricDescriptorsRequest(
                name=parent, filter=filter_str
            )

            print(f"Listing metrics for project {project_id}...")
            metrics = client.list_metric_descriptors(request=request)

            result = []
            for metric in metrics:
                result.append(
                    {
                        "name": metric.name,
                        "type": metric.type,
                        "display_name": metric.display_name,
                        "description": metric.description,
                        "kind": monitoring_v3.MetricDescriptor.MetricKind.Name(
                            metric.metric_kind
                        ),
                        "value_type": monitoring_v3.MetricDescriptor.ValueType.Name(
                            metric.value_type
                        ),
                        "unit": metric.unit,
                    }
                )

            return json.dumps(
                {"status": "success", "metrics": result, "count": len(result)}, indent=2
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def fetch_metric_timeseries(
        metric_type: str,
        project_id: str = None,
        filter_additions: str = "",
        hours: int = 1,
        alignment_period_seconds: int = 60,
    ) -> str:
        """
        Fetch time series data for a specific metric

        Args:
            metric_type: The metric type (e.g., "compute.googleapis.com/instance/cpu/utilization")
            project_id: GCP project ID (defaults to configured project)
            filter_additions: Additional filter criteria (e.g., "resource.labels.instance_id = \"my-instance\"")
            hours: Number of hours of data to fetch (default: 1)
            alignment_period_seconds: Data point alignment period in seconds (default: 60)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().monitoring
            project_id = project_id or client_instances.get_project_id()

            # Build the filter
            filter_str = f'metric.type = "{metric_type}"'
            if filter_additions:
                filter_str += f" AND {filter_additions}"

            # Calculate time interval
            now = datetime.datetime.utcnow()
            seconds = int(now.timestamp())
            end_time = Timestamp(seconds=seconds)

            start_time = Timestamp(seconds=seconds - hours * 3600)

            # Create interval
            interval = monitoring_v3.TimeInterval(
                end_time=end_time, start_time=start_time
            )

            # Create aggregation
            aggregation = monitoring_v3.Aggregation(
                alignment_period=Duration(seconds=alignment_period_seconds),
                per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN,
            )

            # Build request
            request = monitoring_v3.ListTimeSeriesRequest(
                name=f"projects/{project_id}",
                filter=filter_str,
                interval=interval,
                view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
                aggregation=aggregation,
            )

            print(f"Fetching time series data for {metric_type}...")
            time_series = client.list_time_series(request=request)

            result = []
            for series in time_series:
                data_points = []
                for point in series.points:
                    point_time = point.interval.end_time.ToDatetime().isoformat()

                    # Handle different value types
                    if point.value.HasField("double_value"):
                        value = point.value.double_value
                    elif point.value.HasField("int64_value"):
                        value = point.value.int64_value
                    elif point.value.HasField("bool_value"):
                        value = point.value.bool_value
                    elif point.value.HasField("string_value"):
                        value = point.value.string_value
                    elif point.value.HasField("distribution_value"):
                        value = (
                            "distribution"  # Simplified, as distributions are complex
                        )
                    else:
                        value = None

                    data_points.append({"time": point_time, "value": value})

                series_data = {
                    "metric": dict(series.metric.labels)
                    if series.metric and series.metric.labels
                    else {},
                    "resource": {
                        "type": series.resource.type,
                        "labels": dict(series.resource.labels)
                        if series.resource and series.resource.labels
                        else {},
                    }
                    if series.resource
                    else {},
                    "points": data_points,
                }
                result.append(series_data)

            return json.dumps(
                {
                    "status": "success",
                    "metric_type": metric_type,
                    "time_range": {
                        "start": start_time.ToDatetime().isoformat(),
                        "end": end_time.ToDatetime().isoformat(),
                    },
                    "time_series": result,
                    "count": len(result),
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def list_alert_policies(project_id: str = None, filter_str: str = "") -> str:
        """
        List alert policies in a GCP project with optional filtering

        Args:
            project_id: GCP project ID (defaults to configured project)
            filter_str: Optional filter string (e.g., "display_name = \"High CPU Alert\"")
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().monitoring
            project_id = project_id or client_instances.get_project_id()

            parent = f"projects/{project_id}"
            request = monitoring_v3.ListAlertPoliciesRequest(
                name=parent, filter=filter_str
            )

            print(f"Listing alert policies for project {project_id}...")
            policies = client.list_alert_policies(request=request)

            result = []
            for policy in policies:
                policy_data = {
                    "name": policy.name,
                    "display_name": policy.display_name,
                    "enabled": policy.enabled,
                    "conditions_count": len(policy.conditions),
                    "combiner": monitoring_v3.AlertPolicy.ConditionCombinerType.Name(
                        policy.combiner
                    )
                    if policy.combiner
                    else None,
                    "notification_channels": [
                        chan.split("/")[-1] for chan in policy.notification_channels
                    ]
                    if policy.notification_channels
                    else [],
                    "creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat()
                    if policy.creation_record and policy.creation_record.mutate_time
                    else None,
                }
                result.append(policy_data)

            return json.dumps(
                {"status": "success", "alert_policies": result, "count": len(result)},
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def create_metric_threshold_alert(
        display_name: str,
        metric_type: str,
        filter_str: str,
        threshold_value: float,
        project_id: str = None,
        comparison: str = "COMPARISON_GT",
        duration_seconds: int = 300,
        alignment_period_seconds: int = 60,
        aligner: str = "ALIGN_MEAN",
        reducer: str = "REDUCE_MEAN",
        notification_channels: Optional[List[str]] = None,
        documentation: str = "",
        enabled: bool = True,
    ) -> str:
        """
        Create a metric threshold alert policy

        Args:
            display_name: Human-readable name for the alert
            metric_type: The metric to alert on (e.g., "compute.googleapis.com/instance/cpu/utilization")
            filter_str: Filter string to define which resources to monitor
            threshold_value: The threshold value to trigger the alert
            project_id: GCP project ID (defaults to configured project)
            comparison: Comparison type (COMPARISON_GT, COMPARISON_GE, COMPARISON_LT, COMPARISON_LE, COMPARISON_EQ, COMPARISON_NE)
            duration_seconds: Duration in seconds the condition must be met to trigger
            alignment_period_seconds: Period in seconds for data point alignment
            aligner: Per-series aligner (ALIGN_MEAN, ALIGN_MAX, ALIGN_MIN, etc.)
            reducer: Cross-series reducer (REDUCE_MEAN, REDUCE_MAX, REDUCE_MIN, etc.)
            notification_channels: List of notification channel IDs
            documentation: Documentation for the alert (markdown supported)
            enabled: Whether the alert should be enabled
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().monitoring
            project_id = project_id or client_instances.get_project_id()

            # Validate and convert enums
            try:
                comparison_enum = getattr(monitoring_v3.ComparisonType, comparison)
                aligner_enum = getattr(monitoring_v3.Aggregation.Aligner, aligner)
                reducer_enum = getattr(monitoring_v3.Aggregation.Reducer, reducer)
            except AttributeError as e:
                return json.dumps(
                    {
                        "status": "error",
                        "message": f"Invalid enum value: {str(e)}. Please check documentation for valid values.",
                    },
                    indent=2,
                )

            # Prepare notification channels with full paths
            full_notification_channels = []
            if notification_channels:
                for channel in notification_channels:
                    if not channel.startswith(
                        f"projects/{project_id}/notificationChannels/"
                    ):
                        channel = (
                            f"projects/{project_id}/notificationChannels/{channel}"
                        )
                    full_notification_channels.append(channel)

            # Create aggregation
            aggregation = monitoring_v3.Aggregation(
                alignment_period=Duration(seconds=alignment_period_seconds),
                per_series_aligner=aligner_enum,
                cross_series_reducer=reducer_enum,
                group_by_fields=[],
            )

            # Create condition threshold
            condition_threshold = monitoring_v3.AlertPolicy.Condition.MetricThreshold(
                filter=f'metric.type = "{metric_type}" AND {filter_str}',
                comparison=comparison_enum,
                threshold_value=threshold_value,
                duration=Duration(seconds=duration_seconds),
                aggregations=[aggregation],
            )

            # Create condition
            condition = monitoring_v3.AlertPolicy.Condition(
                display_name=f"Threshold condition for {display_name}",
                condition_threshold=condition_threshold,
            )

            # Create alert policy
            alert_policy = monitoring_v3.AlertPolicy(
                display_name=display_name,
                conditions=[condition],
                combiner=monitoring_v3.AlertPolicy.ConditionCombinerType.OR,
                notification_channels=full_notification_channels,
                enabled=monitoring_v3.BoolValue(value=enabled),
            )

            # Add documentation if provided
            if documentation:
                alert_policy.documentation = monitoring_v3.AlertPolicy.Documentation(
                    content=documentation, mime_type="text/markdown"
                )

            # Create the request
            request = monitoring_v3.CreateAlertPolicyRequest(
                name=f"projects/{project_id}", alert_policy=alert_policy
            )

            print(f"Creating alert policy: {display_name}...")
            response = client.create_alert_policy(request=request)

            return json.dumps(
                {
                    "status": "success",
                    "name": response.name,
                    "display_name": response.display_name,
                    "enabled": response.enabled,
                    "conditions_count": len(response.conditions),
                    "notification_channels": response.notification_channels,
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def update_alert_policy(
        alert_id: str,
        project_id: str = None,
        display_name: Optional[str] = None,
        notification_channels: Optional[List[str]] = None,
        enabled: Optional[bool] = None,
        documentation: Optional[str] = None,
    ) -> str:
        """
        Update an existing alert policy

        Args:
            alert_id: ID of the alert to update
            project_id: GCP project ID (defaults to configured project)
            display_name: New name for the alert
            notification_channels: List of notification channel IDs
            enabled: Whether the alert should be enabled
            documentation: Documentation for the alert (markdown supported)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().monitoring
            project_id = project_id or client_instances.get_project_id()

            # Get the existing policy
            name = f"projects/{project_id}/alertPolicies/{alert_id}"
            policy = client.get_alert_policy(name=name)

            # Update fields if provided
            if display_name:
                policy.display_name = display_name

            if notification_channels is not None:
                full_notification_channels = []
                for channel in notification_channels:
                    if not channel.startswith(
                        f"projects/{project_id}/notificationChannels/"
                    ):
                        channel = (
                            f"projects/{project_id}/notificationChannels/{channel}"
                        )
                    full_notification_channels.append(channel)
                policy.notification_channels = full_notification_channels

            if enabled is not None:
                policy.enabled = monitoring_v3.BoolValue(value=enabled)

            if documentation is not None:
                policy.documentation = monitoring_v3.AlertPolicy.Documentation(
                    content=documentation, mime_type="text/markdown"
                )

            # Create update mask
            update_mask = []
            if display_name:
                update_mask.append("display_name")
            if notification_channels is not None:
                update_mask.append("notification_channels")
            if enabled is not None:
                update_mask.append("enabled")
            if documentation is not None:
                update_mask.append("documentation")

            # Update the policy
            request = monitoring_v3.UpdateAlertPolicyRequest(
                alert_policy=policy, update_mask={"paths": update_mask}
            )

            print(f"Updating alert policy: {policy.name}...")
            response = client.update_alert_policy(request=request)

            return json.dumps(
                {
                    "status": "success",
                    "name": response.name,
                    "display_name": response.display_name,
                    "enabled": response.enabled,
                    "conditions_count": len(response.conditions),
                    "notification_channels": [
                        chan.split("/")[-1] for chan in response.notification_channels
                    ]
                    if response.notification_channels
                    else [],
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def delete_alert_policy(alert_id: str, project_id: str = None) -> str:
        """
        Delete an alert policy

        Args:
            alert_id: ID of the alert to delete
            project_id: GCP project ID (defaults to configured project)
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().monitoring
            project_id = project_id or client_instances.get_project_id()

            name = f"projects/{project_id}/alertPolicies/{alert_id}"

            print(f"Deleting alert policy: {alert_id}...")
            client.delete_alert_policy(name=name)

            return json.dumps(
                {
                    "status": "success",
                    "message": f"Alert policy {alert_id} successfully deleted",
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    @mcp_instance.tool()
    def create_notification_channel(
        display_name: str,
        channel_type: str,
        labels: Dict[str, str],
        project_id: str = None,
        description: str = "",
        enabled: bool = True,
    ) -> str:
        """
        Create a notification channel

        Args:
            display_name: Human-readable name for the channel
            channel_type: Type of channel (email, sms, slack, pagerduty, etc.)
            labels: Channel-specific configuration (e.g., {"email_address": "[email protected]"})
            project_id: GCP project ID (defaults to configured project)
            description: Optional description for the channel
            enabled: Whether the channel should be enabled
        """
        try:
            # Get client from client_instances
            client = client_instances.get_clients().monitoring
            project_id = project_id or client_instances.get_project_id()

            # Create notification channel
            notification_channel = monitoring_v3.NotificationChannel(
                type=channel_type,
                display_name=display_name,
                description=description,
                labels=labels,
                enabled=monitoring_v3.BoolValue(value=enabled),
            )

            # Create the request
            request = monitoring_v3.CreateNotificationChannelRequest(
                name=f"projects/{project_id}", notification_channel=notification_channel
            )

            print(f"Creating notification channel: {display_name} ({channel_type})...")
            response = client.create_notification_channel(request=request)

            return json.dumps(
                {
                    "status": "success",
                    "name": response.name,
                    "type": response.type,
                    "display_name": response.display_name,
                    "description": response.description,
                    "verification_status": monitoring_v3.NotificationChannel.VerificationStatus.Name(
                        response.verification_status
                    )
                    if response.verification_status
                    else None,
                    "enabled": response.enabled,
                    "labels": dict(response.labels) if response.labels else {},
                },
                indent=2,
            )
        except Exception as e:
            return json.dumps({"status": "error", "message": str(e)}, indent=2)

    # Prompts
    @mcp_instance.prompt()
    def create_alert_prompt() -> str:
        """Prompt for creating a new alert policy"""
        return """
I need to create a new alert policy in Cloud Monitoring.

Please help me with:
1. Selecting the appropriate metric type for my alert
2. Setting up sensible thresholds and durations
3. Understanding the different comparison types
4. Best practices for alert documentation
5. Setting up notification channels

I'd like to create an alert that triggers when:
"""

    @mcp_instance.prompt()
    def monitor_resources_prompt() -> str:
        """Prompt for guidance on monitoring GCP resources"""
        return """
I need to set up monitoring for my GCP resources. Please help me understand:

1. What are the most important metrics I should be monitoring for:
   - Compute Engine instances
   - Cloud SQL databases
   - Cloud Storage buckets
   - App Engine applications
   - Kubernetes Engine clusters

2. What are recommended thresholds for alerts on these resources?

3. How should I organize my monitoring to keep it manageable?

4. What visualization options do I have in Cloud Monitoring?
"""

```