#
tokens: 7495/50000 1/30 files (page 2/2)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 2 of 2. Use http://codebase.md/enesbol/gcp-mcp?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .gitignore
├── .python-version
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── requirements.txt
├── smithery.yaml
├── src
│   └── gcp-mcp-server
│       ├── __init__.py
│       ├── core
│       │   ├── __init__.py
│       │   ├── auth.py
│       │   ├── context.py
│       │   ├── logging_handler.py
│       │   ├── security.py
│       │   └── server.py
│       ├── main.py
│       ├── prompts
│       │   ├── __init__.py
│       │   └── common.py
│       └── services
│           ├── __init__.py
│           ├── artifact_registry.py
│           ├── client_instances.py
│           ├── cloud_audit_logs.py
│           ├── cloud_bigquery.py
│           ├── cloud_build.py
│           ├── cloud_compute_engine.py
│           ├── cloud_monitoring.py
│           ├── cloud_run.py
│           ├── cloud_storage.py
│           └── README.md
├── utils
│   ├── __init__.py
│   └── helpers.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/src/gcp-mcp-server/services/cloud_monitoring.py:
--------------------------------------------------------------------------------

```python
  1 | import datetime
  2 | import json
  3 | from typing import Dict, List, Optional
  4 | 
  5 | from google.cloud import monitoring_v3
  6 | from google.protobuf.duration_pb2 import Duration
  7 | from google.protobuf.timestamp_pb2 import Timestamp
  8 | from services import client_instances
  9 | 
 10 | 
 11 | def register(mcp_instance):
 12 |     """Register all Cloud Monitoring resources and tools with the MCP instance."""
 13 | 
 14 |     # Resources
 15 |     @mcp_instance.resource("gcp://monitoring/{project_id}/metrics")
 16 |     def list_metrics_resource(project_id: str = None) -> str:
 17 |         """List all available metrics for a GCP project"""
 18 |         try:
 19 |             # Get client from client_instances
 20 |             client = client_instances.get_clients().monitoring
 21 |             project_id = project_id or client_instances.get_project_id()
 22 | 
 23 |             name = f"projects/{project_id}"
 24 |             metrics = client.list_metric_descriptors(name=name)
 25 | 
 26 |             result = []
 27 |             for metric in metrics:
 28 |                 result.append(
 29 |                     {
 30 |                         "name": metric.name,
 31 |                         "type": metric.type,
 32 |                         "display_name": metric.display_name,
 33 |                         "description": metric.description,
 34 |                         "kind": monitoring_v3.MetricDescriptor.MetricKind.Name(
 35 |                             metric.metric_kind
 36 |                         ),
 37 |                         "value_type": monitoring_v3.MetricDescriptor.ValueType.Name(
 38 |                             metric.value_type
 39 |                         ),
 40 |                         "unit": metric.unit,
 41 |                     }
 42 |                 )
 43 | 
 44 |             return json.dumps(result, indent=2)
 45 |         except Exception as e:
 46 |             return json.dumps({"error": str(e)}, indent=2)
 47 | 
 48 |     @mcp_instance.resource("gcp://monitoring/{project_id}/alerts")
 49 |     def list_alerts_resource(project_id: str = None) -> str:
 50 |         """List all alert policies for a GCP project"""
 51 |         try:
 52 |             # Get client from client_instances
 53 |             client = client_instances.get_clients().monitoring
 54 |             project_id = project_id or client_instances.get_project_id()
 55 | 
 56 |             name = f"projects/{project_id}"
 57 |             policies = client.list_alert_policies(name=name)
 58 | 
 59 |             result = []
 60 |             for policy in policies:
 61 |                 result.append(
 62 |                     {
 63 |                         "name": policy.name,
 64 |                         "display_name": policy.display_name,
 65 |                         "enabled": policy.enabled,
 66 |                         "conditions_count": len(policy.conditions),
 67 |                         "creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat()
 68 |                         if policy.creation_record and policy.creation_record.mutate_time
 69 |                         else None,
 70 |                         "notification_channels": [
 71 |                             chan.split("/")[-1] for chan in policy.notification_channels
 72 |                         ]
 73 |                         if policy.notification_channels
 74 |                         else [],
 75 |                     }
 76 |                 )
 77 | 
 78 |             return json.dumps(result, indent=2)
 79 |         except Exception as e:
 80 |             return json.dumps({"error": str(e)}, indent=2)
 81 | 
 82 |     @mcp_instance.resource("gcp://monitoring/{project_id}/alert/{alert_id}")
 83 |     def get_alert_resource(project_id: str = None, alert_id: str = None) -> str:
 84 |         """Get details for a specific alert policy"""
 85 |         try:
 86 |             # Get client from client_instances
 87 |             client = client_instances.get_clients().monitoring
 88 |             project_id = project_id or client_instances.get_project_id()
 89 | 
 90 |             name = f"projects/{project_id}/alertPolicies/{alert_id}"
 91 |             policy = client.get_alert_policy(name=name)
 92 | 
 93 |             conditions = []
 94 |             for condition in policy.conditions:
 95 |                 condition_data = {
 96 |                     "name": condition.name,
 97 |                     "display_name": condition.display_name,
 98 |                     "type": condition.condition_type_name
 99 |                     if hasattr(condition, "condition_type_name")
100 |                     else None,
101 |                 }
102 | 
103 |                 # Add condition-specific details
104 |                 if condition.HasField("condition_threshold"):
105 |                     threshold = condition.condition_threshold
106 |                     condition_data["threshold"] = {
107 |                         "filter": threshold.filter,
108 |                         "comparison": monitoring_v3.ComparisonType.Name(
109 |                             threshold.comparison
110 |                         )
111 |                         if threshold.comparison
112 |                         else None,
113 |                         "threshold_value": threshold.threshold_value,
114 |                         "duration": f"{threshold.duration.seconds}s"
115 |                         if threshold.duration
116 |                         else None,
117 |                         "aggregation": {
118 |                             "alignment_period": f"{threshold.aggregations[0].alignment_period.seconds}s"
119 |                             if threshold.aggregations
120 |                             and threshold.aggregations[0].alignment_period
121 |                             else None,
122 |                             "per_series_aligner": monitoring_v3.Aggregation.Aligner.Name(
123 |                                 threshold.aggregations[0].per_series_aligner
124 |                             )
125 |                             if threshold.aggregations
126 |                             and threshold.aggregations[0].per_series_aligner
127 |                             else None,
128 |                             "cross_series_reducer": monitoring_v3.Aggregation.Reducer.Name(
129 |                                 threshold.aggregations[0].cross_series_reducer
130 |                             )
131 |                             if threshold.aggregations
132 |                             and threshold.aggregations[0].cross_series_reducer
133 |                             else None,
134 |                         }
135 |                         if threshold.aggregations and len(threshold.aggregations) > 0
136 |                         else None,
137 |                     }
138 | 
139 |                 conditions.append(condition_data)
140 | 
141 |             result = {
142 |                 "name": policy.name,
143 |                 "display_name": policy.display_name,
144 |                 "enabled": policy.enabled,
145 |                 "conditions": conditions,
146 |                 "combiner": monitoring_v3.AlertPolicy.ConditionCombinerType.Name(
147 |                     policy.combiner
148 |                 )
149 |                 if policy.combiner
150 |                 else None,
151 |                 "notification_channels": policy.notification_channels,
152 |                 "creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat()
153 |                 if policy.creation_record and policy.creation_record.mutate_time
154 |                 else None,
155 |                 "documentation": {
156 |                     "content": policy.documentation.content,
157 |                     "mime_type": policy.documentation.mime_type,
158 |                 }
159 |                 if policy.HasField("documentation")
160 |                 else None,
161 |             }
162 | 
163 |             return json.dumps(result, indent=2)
164 |         except Exception as e:
165 |             return json.dumps({"error": str(e)}, indent=2)
166 | 
167 |     @mcp_instance.resource("gcp://monitoring/{project_id}/notification_channels")
168 |     def list_notification_channels_resource(project_id: str = None) -> str:
169 |         """List notification channels for a GCP project"""
170 |         try:
171 |             # Get client from client_instances
172 |             client = client_instances.get_clients().monitoring
173 |             project_id = project_id or client_instances.get_project_id()
174 | 
175 |             name = f"projects/{project_id}"
176 |             channels = client.list_notification_channels(name=name)
177 | 
178 |             result = []
179 |             for channel in channels:
180 |                 result.append(
181 |                     {
182 |                         "name": channel.name,
183 |                         "type": channel.type,
184 |                         "display_name": channel.display_name,
185 |                         "description": channel.description,
186 |                         "verification_status": monitoring_v3.NotificationChannel.VerificationStatus.Name(
187 |                             channel.verification_status
188 |                         )
189 |                         if channel.verification_status
190 |                         else None,
191 |                         "enabled": channel.enabled,
192 |                         "labels": dict(channel.labels) if channel.labels else {},
193 |                     }
194 |                 )
195 | 
196 |             return json.dumps(result, indent=2)
197 |         except Exception as e:
198 |             return json.dumps({"error": str(e)}, indent=2)
199 | 
200 |     # Tools
201 |     @mcp_instance.tool()
202 |     def list_metrics(project_id: str = None, filter_str: str = "") -> str:
203 |         """
204 |         List metrics in a GCP project with optional filtering
205 | 
206 |         Args:
207 |             project_id: GCP project ID (defaults to configured project)
208 |             filter_str: Optional filter string to narrow results (e.g., "metric.type = starts_with(\"compute.googleapis.com\")")
209 |         """
210 |         try:
211 |             # Get client from client_instances
212 |             client = client_instances.get_clients().monitoring
213 |             project_id = project_id or client_instances.get_project_id()
214 | 
215 |             parent = f"projects/{project_id}"
216 |             request = monitoring_v3.ListMetricDescriptorsRequest(
217 |                 name=parent, filter=filter_str
218 |             )
219 | 
220 |             print(f"Listing metrics for project {project_id}...")
221 |             metrics = client.list_metric_descriptors(request=request)
222 | 
223 |             result = []
224 |             for metric in metrics:
225 |                 result.append(
226 |                     {
227 |                         "name": metric.name,
228 |                         "type": metric.type,
229 |                         "display_name": metric.display_name,
230 |                         "description": metric.description,
231 |                         "kind": monitoring_v3.MetricDescriptor.MetricKind.Name(
232 |                             metric.metric_kind
233 |                         ),
234 |                         "value_type": monitoring_v3.MetricDescriptor.ValueType.Name(
235 |                             metric.value_type
236 |                         ),
237 |                         "unit": metric.unit,
238 |                     }
239 |                 )
240 | 
241 |             return json.dumps(
242 |                 {"status": "success", "metrics": result, "count": len(result)}, indent=2
243 |             )
244 |         except Exception as e:
245 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
246 | 
247 |     @mcp_instance.tool()
248 |     def fetch_metric_timeseries(
249 |         metric_type: str,
250 |         project_id: str = None,
251 |         filter_additions: str = "",
252 |         hours: int = 1,
253 |         alignment_period_seconds: int = 60,
254 |     ) -> str:
255 |         """
256 |         Fetch time series data for a specific metric
257 | 
258 |         Args:
259 |             metric_type: The metric type (e.g., "compute.googleapis.com/instance/cpu/utilization")
260 |             project_id: GCP project ID (defaults to configured project)
261 |             filter_additions: Additional filter criteria (e.g., "resource.labels.instance_id = \"my-instance\"")
262 |             hours: Number of hours of data to fetch (default: 1)
263 |             alignment_period_seconds: Data point alignment period in seconds (default: 60)
264 |         """
265 |         try:
266 |             # Get client from client_instances
267 |             client = client_instances.get_clients().monitoring
268 |             project_id = project_id or client_instances.get_project_id()
269 | 
270 |             # Build the filter
271 |             filter_str = f'metric.type = "{metric_type}"'
272 |             if filter_additions:
273 |                 filter_str += f" AND {filter_additions}"
274 | 
275 |             # Calculate time interval
276 |             now = datetime.datetime.utcnow()
277 |             seconds = int(now.timestamp())
278 |             end_time = Timestamp(seconds=seconds)
279 | 
280 |             start_time = Timestamp(seconds=seconds - hours * 3600)
281 | 
282 |             # Create interval
283 |             interval = monitoring_v3.TimeInterval(
284 |                 end_time=end_time, start_time=start_time
285 |             )
286 | 
287 |             # Create aggregation
288 |             aggregation = monitoring_v3.Aggregation(
289 |                 alignment_period=Duration(seconds=alignment_period_seconds),
290 |                 per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN,
291 |             )
292 | 
293 |             # Build request
294 |             request = monitoring_v3.ListTimeSeriesRequest(
295 |                 name=f"projects/{project_id}",
296 |                 filter=filter_str,
297 |                 interval=interval,
298 |                 view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
299 |                 aggregation=aggregation,
300 |             )
301 | 
302 |             print(f"Fetching time series data for {metric_type}...")
303 |             time_series = client.list_time_series(request=request)
304 | 
305 |             result = []
306 |             for series in time_series:
307 |                 data_points = []
308 |                 for point in series.points:
309 |                     point_time = point.interval.end_time.ToDatetime().isoformat()
310 | 
311 |                     # Handle different value types
312 |                     if point.value.HasField("double_value"):
313 |                         value = point.value.double_value
314 |                     elif point.value.HasField("int64_value"):
315 |                         value = point.value.int64_value
316 |                     elif point.value.HasField("bool_value"):
317 |                         value = point.value.bool_value
318 |                     elif point.value.HasField("string_value"):
319 |                         value = point.value.string_value
320 |                     elif point.value.HasField("distribution_value"):
321 |                         value = (
322 |                             "distribution"  # Simplified, as distributions are complex
323 |                         )
324 |                     else:
325 |                         value = None
326 | 
327 |                     data_points.append({"time": point_time, "value": value})
328 | 
329 |                 series_data = {
330 |                     "metric": dict(series.metric.labels)
331 |                     if series.metric and series.metric.labels
332 |                     else {},
333 |                     "resource": {
334 |                         "type": series.resource.type,
335 |                         "labels": dict(series.resource.labels)
336 |                         if series.resource and series.resource.labels
337 |                         else {},
338 |                     }
339 |                     if series.resource
340 |                     else {},
341 |                     "points": data_points,
342 |                 }
343 |                 result.append(series_data)
344 | 
345 |             return json.dumps(
346 |                 {
347 |                     "status": "success",
348 |                     "metric_type": metric_type,
349 |                     "time_range": {
350 |                         "start": start_time.ToDatetime().isoformat(),
351 |                         "end": end_time.ToDatetime().isoformat(),
352 |                     },
353 |                     "time_series": result,
354 |                     "count": len(result),
355 |                 },
356 |                 indent=2,
357 |             )
358 |         except Exception as e:
359 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
360 | 
361 |     @mcp_instance.tool()
362 |     def list_alert_policies(project_id: str = None, filter_str: str = "") -> str:
363 |         """
364 |         List alert policies in a GCP project with optional filtering
365 | 
366 |         Args:
367 |             project_id: GCP project ID (defaults to configured project)
368 |             filter_str: Optional filter string (e.g., "display_name = \"High CPU Alert\"")
369 |         """
370 |         try:
371 |             # Get client from client_instances
372 |             client = client_instances.get_clients().monitoring
373 |             project_id = project_id or client_instances.get_project_id()
374 | 
375 |             parent = f"projects/{project_id}"
376 |             request = monitoring_v3.ListAlertPoliciesRequest(
377 |                 name=parent, filter=filter_str
378 |             )
379 | 
380 |             print(f"Listing alert policies for project {project_id}...")
381 |             policies = client.list_alert_policies(request=request)
382 | 
383 |             result = []
384 |             for policy in policies:
385 |                 policy_data = {
386 |                     "name": policy.name,
387 |                     "display_name": policy.display_name,
388 |                     "enabled": policy.enabled,
389 |                     "conditions_count": len(policy.conditions),
390 |                     "combiner": monitoring_v3.AlertPolicy.ConditionCombinerType.Name(
391 |                         policy.combiner
392 |                     )
393 |                     if policy.combiner
394 |                     else None,
395 |                     "notification_channels": [
396 |                         chan.split("/")[-1] for chan in policy.notification_channels
397 |                     ]
398 |                     if policy.notification_channels
399 |                     else [],
400 |                     "creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat()
401 |                     if policy.creation_record and policy.creation_record.mutate_time
402 |                     else None,
403 |                 }
404 |                 result.append(policy_data)
405 | 
406 |             return json.dumps(
407 |                 {"status": "success", "alert_policies": result, "count": len(result)},
408 |                 indent=2,
409 |             )
410 |         except Exception as e:
411 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
412 | 
413 |     @mcp_instance.tool()
414 |     def create_metric_threshold_alert(
415 |         display_name: str,
416 |         metric_type: str,
417 |         filter_str: str,
418 |         threshold_value: float,
419 |         project_id: str = None,
420 |         comparison: str = "COMPARISON_GT",
421 |         duration_seconds: int = 300,
422 |         alignment_period_seconds: int = 60,
423 |         aligner: str = "ALIGN_MEAN",
424 |         reducer: str = "REDUCE_MEAN",
425 |         notification_channels: Optional[List[str]] = None,
426 |         documentation: str = "",
427 |         enabled: bool = True,
428 |     ) -> str:
429 |         """
430 |         Create a metric threshold alert policy
431 | 
432 |         Args:
433 |             display_name: Human-readable name for the alert
434 |             metric_type: The metric to alert on (e.g., "compute.googleapis.com/instance/cpu/utilization")
435 |             filter_str: Filter string to define which resources to monitor
436 |             threshold_value: The threshold value to trigger the alert
437 |             project_id: GCP project ID (defaults to configured project)
438 |             comparison: Comparison type (COMPARISON_GT, COMPARISON_GE, COMPARISON_LT, COMPARISON_LE, COMPARISON_EQ, COMPARISON_NE)
439 |             duration_seconds: Duration in seconds the condition must be met to trigger
440 |             alignment_period_seconds: Period in seconds for data point alignment
441 |             aligner: Per-series aligner (ALIGN_MEAN, ALIGN_MAX, ALIGN_MIN, etc.)
442 |             reducer: Cross-series reducer (REDUCE_MEAN, REDUCE_MAX, REDUCE_MIN, etc.)
443 |             notification_channels: List of notification channel IDs
444 |             documentation: Documentation for the alert (markdown supported)
445 |             enabled: Whether the alert should be enabled
446 |         """
447 |         try:
448 |             # Get client from client_instances
449 |             client = client_instances.get_clients().monitoring
450 |             project_id = project_id or client_instances.get_project_id()
451 | 
452 |             # Validate and convert enums
453 |             try:
454 |                 comparison_enum = getattr(monitoring_v3.ComparisonType, comparison)
455 |                 aligner_enum = getattr(monitoring_v3.Aggregation.Aligner, aligner)
456 |                 reducer_enum = getattr(monitoring_v3.Aggregation.Reducer, reducer)
457 |             except AttributeError as e:
458 |                 return json.dumps(
459 |                     {
460 |                         "status": "error",
461 |                         "message": f"Invalid enum value: {str(e)}. Please check documentation for valid values.",
462 |                     },
463 |                     indent=2,
464 |                 )
465 | 
466 |             # Prepare notification channels with full paths
467 |             full_notification_channels = []
468 |             if notification_channels:
469 |                 for channel in notification_channels:
470 |                     if not channel.startswith(
471 |                         f"projects/{project_id}/notificationChannels/"
472 |                     ):
473 |                         channel = (
474 |                             f"projects/{project_id}/notificationChannels/{channel}"
475 |                         )
476 |                     full_notification_channels.append(channel)
477 | 
478 |             # Create aggregation
479 |             aggregation = monitoring_v3.Aggregation(
480 |                 alignment_period=Duration(seconds=alignment_period_seconds),
481 |                 per_series_aligner=aligner_enum,
482 |                 cross_series_reducer=reducer_enum,
483 |                 group_by_fields=[],
484 |             )
485 | 
486 |             # Create condition threshold
487 |             condition_threshold = monitoring_v3.AlertPolicy.Condition.MetricThreshold(
488 |                 filter=f'metric.type = "{metric_type}" AND {filter_str}',
489 |                 comparison=comparison_enum,
490 |                 threshold_value=threshold_value,
491 |                 duration=Duration(seconds=duration_seconds),
492 |                 aggregations=[aggregation],
493 |             )
494 | 
495 |             # Create condition
496 |             condition = monitoring_v3.AlertPolicy.Condition(
497 |                 display_name=f"Threshold condition for {display_name}",
498 |                 condition_threshold=condition_threshold,
499 |             )
500 | 
501 |             # Create alert policy
502 |             alert_policy = monitoring_v3.AlertPolicy(
503 |                 display_name=display_name,
504 |                 conditions=[condition],
505 |                 combiner=monitoring_v3.AlertPolicy.ConditionCombinerType.OR,
506 |                 notification_channels=full_notification_channels,
507 |                 enabled=monitoring_v3.BoolValue(value=enabled),
508 |             )
509 | 
510 |             # Add documentation if provided
511 |             if documentation:
512 |                 alert_policy.documentation = monitoring_v3.AlertPolicy.Documentation(
513 |                     content=documentation, mime_type="text/markdown"
514 |                 )
515 | 
516 |             # Create the request
517 |             request = monitoring_v3.CreateAlertPolicyRequest(
518 |                 name=f"projects/{project_id}", alert_policy=alert_policy
519 |             )
520 | 
521 |             print(f"Creating alert policy: {display_name}...")
522 |             response = client.create_alert_policy(request=request)
523 | 
524 |             return json.dumps(
525 |                 {
526 |                     "status": "success",
527 |                     "name": response.name,
528 |                     "display_name": response.display_name,
529 |                     "enabled": response.enabled,
530 |                     "conditions_count": len(response.conditions),
531 |                     "notification_channels": response.notification_channels,
532 |                 },
533 |                 indent=2,
534 |             )
535 |         except Exception as e:
536 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
537 | 
538 |     @mcp_instance.tool()
539 |     def update_alert_policy(
540 |         alert_id: str,
541 |         project_id: str = None,
542 |         display_name: Optional[str] = None,
543 |         notification_channels: Optional[List[str]] = None,
544 |         enabled: Optional[bool] = None,
545 |         documentation: Optional[str] = None,
546 |     ) -> str:
547 |         """
548 |         Update an existing alert policy
549 | 
550 |         Args:
551 |             alert_id: ID of the alert to update
552 |             project_id: GCP project ID (defaults to configured project)
553 |             display_name: New name for the alert
554 |             notification_channels: List of notification channel IDs
555 |             enabled: Whether the alert should be enabled
556 |             documentation: Documentation for the alert (markdown supported)
557 |         """
558 |         try:
559 |             # Get client from client_instances
560 |             client = client_instances.get_clients().monitoring
561 |             project_id = project_id or client_instances.get_project_id()
562 | 
563 |             # Get the existing policy
564 |             name = f"projects/{project_id}/alertPolicies/{alert_id}"
565 |             policy = client.get_alert_policy(name=name)
566 | 
567 |             # Update fields if provided
568 |             if display_name:
569 |                 policy.display_name = display_name
570 | 
571 |             if notification_channels is not None:
572 |                 full_notification_channels = []
573 |                 for channel in notification_channels:
574 |                     if not channel.startswith(
575 |                         f"projects/{project_id}/notificationChannels/"
576 |                     ):
577 |                         channel = (
578 |                             f"projects/{project_id}/notificationChannels/{channel}"
579 |                         )
580 |                     full_notification_channels.append(channel)
581 |                 policy.notification_channels = full_notification_channels
582 | 
583 |             if enabled is not None:
584 |                 policy.enabled = monitoring_v3.BoolValue(value=enabled)
585 | 
586 |             if documentation is not None:
587 |                 policy.documentation = monitoring_v3.AlertPolicy.Documentation(
588 |                     content=documentation, mime_type="text/markdown"
589 |                 )
590 | 
591 |             # Create update mask
592 |             update_mask = []
593 |             if display_name:
594 |                 update_mask.append("display_name")
595 |             if notification_channels is not None:
596 |                 update_mask.append("notification_channels")
597 |             if enabled is not None:
598 |                 update_mask.append("enabled")
599 |             if documentation is not None:
600 |                 update_mask.append("documentation")
601 | 
602 |             # Update the policy
603 |             request = monitoring_v3.UpdateAlertPolicyRequest(
604 |                 alert_policy=policy, update_mask={"paths": update_mask}
605 |             )
606 | 
607 |             print(f"Updating alert policy: {policy.name}...")
608 |             response = client.update_alert_policy(request=request)
609 | 
610 |             return json.dumps(
611 |                 {
612 |                     "status": "success",
613 |                     "name": response.name,
614 |                     "display_name": response.display_name,
615 |                     "enabled": response.enabled,
616 |                     "conditions_count": len(response.conditions),
617 |                     "notification_channels": [
618 |                         chan.split("/")[-1] for chan in response.notification_channels
619 |                     ]
620 |                     if response.notification_channels
621 |                     else [],
622 |                 },
623 |                 indent=2,
624 |             )
625 |         except Exception as e:
626 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
627 | 
628 |     @mcp_instance.tool()
629 |     def delete_alert_policy(alert_id: str, project_id: str = None) -> str:
630 |         """
631 |         Delete an alert policy
632 | 
633 |         Args:
634 |             alert_id: ID of the alert to delete
635 |             project_id: GCP project ID (defaults to configured project)
636 |         """
637 |         try:
638 |             # Get client from client_instances
639 |             client = client_instances.get_clients().monitoring
640 |             project_id = project_id or client_instances.get_project_id()
641 | 
642 |             name = f"projects/{project_id}/alertPolicies/{alert_id}"
643 | 
644 |             print(f"Deleting alert policy: {alert_id}...")
645 |             client.delete_alert_policy(name=name)
646 | 
647 |             return json.dumps(
648 |                 {
649 |                     "status": "success",
650 |                     "message": f"Alert policy {alert_id} successfully deleted",
651 |                 },
652 |                 indent=2,
653 |             )
654 |         except Exception as e:
655 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
656 | 
657 |     @mcp_instance.tool()
658 |     def create_notification_channel(
659 |         display_name: str,
660 |         channel_type: str,
661 |         labels: Dict[str, str],
662 |         project_id: str = None,
663 |         description: str = "",
664 |         enabled: bool = True,
665 |     ) -> str:
666 |         """
667 |         Create a notification channel
668 | 
669 |         Args:
670 |             display_name: Human-readable name for the channel
671 |             channel_type: Type of channel (email, sms, slack, pagerduty, etc.)
672 |             labels: Channel-specific configuration (e.g., {"email_address": "[email protected]"})
673 |             project_id: GCP project ID (defaults to configured project)
674 |             description: Optional description for the channel
675 |             enabled: Whether the channel should be enabled
676 |         """
677 |         try:
678 |             # Get client from client_instances
679 |             client = client_instances.get_clients().monitoring
680 |             project_id = project_id or client_instances.get_project_id()
681 | 
682 |             # Create notification channel
683 |             notification_channel = monitoring_v3.NotificationChannel(
684 |                 type=channel_type,
685 |                 display_name=display_name,
686 |                 description=description,
687 |                 labels=labels,
688 |                 enabled=monitoring_v3.BoolValue(value=enabled),
689 |             )
690 | 
691 |             # Create the request
692 |             request = monitoring_v3.CreateNotificationChannelRequest(
693 |                 name=f"projects/{project_id}", notification_channel=notification_channel
694 |             )
695 | 
696 |             print(f"Creating notification channel: {display_name} ({channel_type})...")
697 |             response = client.create_notification_channel(request=request)
698 | 
699 |             return json.dumps(
700 |                 {
701 |                     "status": "success",
702 |                     "name": response.name,
703 |                     "type": response.type,
704 |                     "display_name": response.display_name,
705 |                     "description": response.description,
706 |                     "verification_status": monitoring_v3.NotificationChannel.VerificationStatus.Name(
707 |                         response.verification_status
708 |                     )
709 |                     if response.verification_status
710 |                     else None,
711 |                     "enabled": response.enabled,
712 |                     "labels": dict(response.labels) if response.labels else {},
713 |                 },
714 |                 indent=2,
715 |             )
716 |         except Exception as e:
717 |             return json.dumps({"status": "error", "message": str(e)}, indent=2)
718 | 
719 |     # Prompts
720 |     @mcp_instance.prompt()
721 |     def create_alert_prompt() -> str:
722 |         """Prompt for creating a new alert policy"""
723 |         return """
724 | I need to create a new alert policy in Cloud Monitoring.
725 | 
726 | Please help me with:
727 | 1. Selecting the appropriate metric type for my alert
728 | 2. Setting up sensible thresholds and durations
729 | 3. Understanding the different comparison types
730 | 4. Best practices for alert documentation
731 | 5. Setting up notification channels
732 | 
733 | I'd like to create an alert that triggers when:
734 | """
735 | 
736 |     @mcp_instance.prompt()
737 |     def monitor_resources_prompt() -> str:
738 |         """Prompt for guidance on monitoring GCP resources"""
739 |         return """
740 | I need to set up monitoring for my GCP resources. Please help me understand:
741 | 
742 | 1. What are the most important metrics I should be monitoring for:
743 |    - Compute Engine instances
744 |    - Cloud SQL databases
745 |    - Cloud Storage buckets
746 |    - App Engine applications
747 |    - Kubernetes Engine clusters
748 | 
749 | 2. What are recommended thresholds for alerts on these resources?
750 | 
751 | 3. How should I organize my monitoring to keep it manageable?
752 | 
753 | 4. What visualization options do I have in Cloud Monitoring?
754 | """
755 | 
```
Page 2/2FirstPrevNextLast