This is page 2 of 2. Use http://codebase.md/enesbol/gcp-mcp?lines=true&page={x} to view the full context. # Directory Structure ``` ├── .gitignore ├── .python-version ├── Dockerfile ├── LICENSE ├── pyproject.toml ├── README.md ├── requirements.txt ├── smithery.yaml ├── src │ └── gcp-mcp-server │ ├── __init__.py │ ├── core │ │ ├── __init__.py │ │ ├── auth.py │ │ ├── context.py │ │ ├── logging_handler.py │ │ ├── security.py │ │ └── server.py │ ├── main.py │ ├── prompts │ │ ├── __init__.py │ │ └── common.py │ └── services │ ├── __init__.py │ ├── artifact_registry.py │ ├── client_instances.py │ ├── cloud_audit_logs.py │ ├── cloud_bigquery.py │ ├── cloud_build.py │ ├── cloud_compute_engine.py │ ├── cloud_monitoring.py │ ├── cloud_run.py │ ├── cloud_storage.py │ └── README.md ├── utils │ ├── __init__.py │ └── helpers.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /src/gcp-mcp-server/services/cloud_monitoring.py: -------------------------------------------------------------------------------- ```python 1 | import datetime 2 | import json 3 | from typing import Dict, List, Optional 4 | 5 | from google.cloud import monitoring_v3 6 | from google.protobuf.duration_pb2 import Duration 7 | from google.protobuf.timestamp_pb2 import Timestamp 8 | from services import client_instances 9 | 10 | 11 | def register(mcp_instance): 12 | """Register all Cloud Monitoring resources and tools with the MCP instance.""" 13 | 14 | # Resources 15 | @mcp_instance.resource("gcp://monitoring/{project_id}/metrics") 16 | def list_metrics_resource(project_id: str = None) -> str: 17 | """List all available metrics for a GCP project""" 18 | try: 19 | # Get client from client_instances 20 | client = client_instances.get_clients().monitoring 21 | project_id = project_id or client_instances.get_project_id() 22 | 23 | name = f"projects/{project_id}" 24 | metrics = client.list_metric_descriptors(name=name) 25 | 26 | result = [] 27 | for metric in metrics: 28 | result.append( 29 | { 30 | "name": metric.name, 31 | "type": metric.type, 32 | "display_name": metric.display_name, 33 | "description": metric.description, 34 | "kind": monitoring_v3.MetricDescriptor.MetricKind.Name( 35 | metric.metric_kind 36 | ), 37 | "value_type": monitoring_v3.MetricDescriptor.ValueType.Name( 38 | metric.value_type 39 | ), 40 | "unit": metric.unit, 41 | } 42 | ) 43 | 44 | return json.dumps(result, indent=2) 45 | except Exception as e: 46 | return json.dumps({"error": str(e)}, indent=2) 47 | 48 | @mcp_instance.resource("gcp://monitoring/{project_id}/alerts") 49 | def list_alerts_resource(project_id: str = None) -> str: 50 | """List all alert policies for a GCP project""" 51 | try: 52 | # Get client from client_instances 53 | client = client_instances.get_clients().monitoring 54 | project_id = project_id or client_instances.get_project_id() 55 | 56 | name = f"projects/{project_id}" 57 | policies = client.list_alert_policies(name=name) 58 | 59 | result = [] 60 | for policy in policies: 61 | result.append( 62 | { 63 | "name": policy.name, 64 | "display_name": policy.display_name, 65 | "enabled": policy.enabled, 66 | "conditions_count": len(policy.conditions), 67 | "creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat() 68 | if policy.creation_record and policy.creation_record.mutate_time 69 | else None, 70 | "notification_channels": [ 71 | chan.split("/")[-1] for chan in policy.notification_channels 72 | ] 73 | if policy.notification_channels 74 | else [], 75 | } 76 | ) 77 | 78 | return json.dumps(result, indent=2) 79 | except Exception as e: 80 | return json.dumps({"error": str(e)}, indent=2) 81 | 82 | @mcp_instance.resource("gcp://monitoring/{project_id}/alert/{alert_id}") 83 | def get_alert_resource(project_id: str = None, alert_id: str = None) -> str: 84 | """Get details for a specific alert policy""" 85 | try: 86 | # Get client from client_instances 87 | client = client_instances.get_clients().monitoring 88 | project_id = project_id or client_instances.get_project_id() 89 | 90 | name = f"projects/{project_id}/alertPolicies/{alert_id}" 91 | policy = client.get_alert_policy(name=name) 92 | 93 | conditions = [] 94 | for condition in policy.conditions: 95 | condition_data = { 96 | "name": condition.name, 97 | "display_name": condition.display_name, 98 | "type": condition.condition_type_name 99 | if hasattr(condition, "condition_type_name") 100 | else None, 101 | } 102 | 103 | # Add condition-specific details 104 | if condition.HasField("condition_threshold"): 105 | threshold = condition.condition_threshold 106 | condition_data["threshold"] = { 107 | "filter": threshold.filter, 108 | "comparison": monitoring_v3.ComparisonType.Name( 109 | threshold.comparison 110 | ) 111 | if threshold.comparison 112 | else None, 113 | "threshold_value": threshold.threshold_value, 114 | "duration": f"{threshold.duration.seconds}s" 115 | if threshold.duration 116 | else None, 117 | "aggregation": { 118 | "alignment_period": f"{threshold.aggregations[0].alignment_period.seconds}s" 119 | if threshold.aggregations 120 | and threshold.aggregations[0].alignment_period 121 | else None, 122 | "per_series_aligner": monitoring_v3.Aggregation.Aligner.Name( 123 | threshold.aggregations[0].per_series_aligner 124 | ) 125 | if threshold.aggregations 126 | and threshold.aggregations[0].per_series_aligner 127 | else None, 128 | "cross_series_reducer": monitoring_v3.Aggregation.Reducer.Name( 129 | threshold.aggregations[0].cross_series_reducer 130 | ) 131 | if threshold.aggregations 132 | and threshold.aggregations[0].cross_series_reducer 133 | else None, 134 | } 135 | if threshold.aggregations and len(threshold.aggregations) > 0 136 | else None, 137 | } 138 | 139 | conditions.append(condition_data) 140 | 141 | result = { 142 | "name": policy.name, 143 | "display_name": policy.display_name, 144 | "enabled": policy.enabled, 145 | "conditions": conditions, 146 | "combiner": monitoring_v3.AlertPolicy.ConditionCombinerType.Name( 147 | policy.combiner 148 | ) 149 | if policy.combiner 150 | else None, 151 | "notification_channels": policy.notification_channels, 152 | "creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat() 153 | if policy.creation_record and policy.creation_record.mutate_time 154 | else None, 155 | "documentation": { 156 | "content": policy.documentation.content, 157 | "mime_type": policy.documentation.mime_type, 158 | } 159 | if policy.HasField("documentation") 160 | else None, 161 | } 162 | 163 | return json.dumps(result, indent=2) 164 | except Exception as e: 165 | return json.dumps({"error": str(e)}, indent=2) 166 | 167 | @mcp_instance.resource("gcp://monitoring/{project_id}/notification_channels") 168 | def list_notification_channels_resource(project_id: str = None) -> str: 169 | """List notification channels for a GCP project""" 170 | try: 171 | # Get client from client_instances 172 | client = client_instances.get_clients().monitoring 173 | project_id = project_id or client_instances.get_project_id() 174 | 175 | name = f"projects/{project_id}" 176 | channels = client.list_notification_channels(name=name) 177 | 178 | result = [] 179 | for channel in channels: 180 | result.append( 181 | { 182 | "name": channel.name, 183 | "type": channel.type, 184 | "display_name": channel.display_name, 185 | "description": channel.description, 186 | "verification_status": monitoring_v3.NotificationChannel.VerificationStatus.Name( 187 | channel.verification_status 188 | ) 189 | if channel.verification_status 190 | else None, 191 | "enabled": channel.enabled, 192 | "labels": dict(channel.labels) if channel.labels else {}, 193 | } 194 | ) 195 | 196 | return json.dumps(result, indent=2) 197 | except Exception as e: 198 | return json.dumps({"error": str(e)}, indent=2) 199 | 200 | # Tools 201 | @mcp_instance.tool() 202 | def list_metrics(project_id: str = None, filter_str: str = "") -> str: 203 | """ 204 | List metrics in a GCP project with optional filtering 205 | 206 | Args: 207 | project_id: GCP project ID (defaults to configured project) 208 | filter_str: Optional filter string to narrow results (e.g., "metric.type = starts_with(\"compute.googleapis.com\")") 209 | """ 210 | try: 211 | # Get client from client_instances 212 | client = client_instances.get_clients().monitoring 213 | project_id = project_id or client_instances.get_project_id() 214 | 215 | parent = f"projects/{project_id}" 216 | request = monitoring_v3.ListMetricDescriptorsRequest( 217 | name=parent, filter=filter_str 218 | ) 219 | 220 | print(f"Listing metrics for project {project_id}...") 221 | metrics = client.list_metric_descriptors(request=request) 222 | 223 | result = [] 224 | for metric in metrics: 225 | result.append( 226 | { 227 | "name": metric.name, 228 | "type": metric.type, 229 | "display_name": metric.display_name, 230 | "description": metric.description, 231 | "kind": monitoring_v3.MetricDescriptor.MetricKind.Name( 232 | metric.metric_kind 233 | ), 234 | "value_type": monitoring_v3.MetricDescriptor.ValueType.Name( 235 | metric.value_type 236 | ), 237 | "unit": metric.unit, 238 | } 239 | ) 240 | 241 | return json.dumps( 242 | {"status": "success", "metrics": result, "count": len(result)}, indent=2 243 | ) 244 | except Exception as e: 245 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 246 | 247 | @mcp_instance.tool() 248 | def fetch_metric_timeseries( 249 | metric_type: str, 250 | project_id: str = None, 251 | filter_additions: str = "", 252 | hours: int = 1, 253 | alignment_period_seconds: int = 60, 254 | ) -> str: 255 | """ 256 | Fetch time series data for a specific metric 257 | 258 | Args: 259 | metric_type: The metric type (e.g., "compute.googleapis.com/instance/cpu/utilization") 260 | project_id: GCP project ID (defaults to configured project) 261 | filter_additions: Additional filter criteria (e.g., "resource.labels.instance_id = \"my-instance\"") 262 | hours: Number of hours of data to fetch (default: 1) 263 | alignment_period_seconds: Data point alignment period in seconds (default: 60) 264 | """ 265 | try: 266 | # Get client from client_instances 267 | client = client_instances.get_clients().monitoring 268 | project_id = project_id or client_instances.get_project_id() 269 | 270 | # Build the filter 271 | filter_str = f'metric.type = "{metric_type}"' 272 | if filter_additions: 273 | filter_str += f" AND {filter_additions}" 274 | 275 | # Calculate time interval 276 | now = datetime.datetime.utcnow() 277 | seconds = int(now.timestamp()) 278 | end_time = Timestamp(seconds=seconds) 279 | 280 | start_time = Timestamp(seconds=seconds - hours * 3600) 281 | 282 | # Create interval 283 | interval = monitoring_v3.TimeInterval( 284 | end_time=end_time, start_time=start_time 285 | ) 286 | 287 | # Create aggregation 288 | aggregation = monitoring_v3.Aggregation( 289 | alignment_period=Duration(seconds=alignment_period_seconds), 290 | per_series_aligner=monitoring_v3.Aggregation.Aligner.ALIGN_MEAN, 291 | ) 292 | 293 | # Build request 294 | request = monitoring_v3.ListTimeSeriesRequest( 295 | name=f"projects/{project_id}", 296 | filter=filter_str, 297 | interval=interval, 298 | view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, 299 | aggregation=aggregation, 300 | ) 301 | 302 | print(f"Fetching time series data for {metric_type}...") 303 | time_series = client.list_time_series(request=request) 304 | 305 | result = [] 306 | for series in time_series: 307 | data_points = [] 308 | for point in series.points: 309 | point_time = point.interval.end_time.ToDatetime().isoformat() 310 | 311 | # Handle different value types 312 | if point.value.HasField("double_value"): 313 | value = point.value.double_value 314 | elif point.value.HasField("int64_value"): 315 | value = point.value.int64_value 316 | elif point.value.HasField("bool_value"): 317 | value = point.value.bool_value 318 | elif point.value.HasField("string_value"): 319 | value = point.value.string_value 320 | elif point.value.HasField("distribution_value"): 321 | value = ( 322 | "distribution" # Simplified, as distributions are complex 323 | ) 324 | else: 325 | value = None 326 | 327 | data_points.append({"time": point_time, "value": value}) 328 | 329 | series_data = { 330 | "metric": dict(series.metric.labels) 331 | if series.metric and series.metric.labels 332 | else {}, 333 | "resource": { 334 | "type": series.resource.type, 335 | "labels": dict(series.resource.labels) 336 | if series.resource and series.resource.labels 337 | else {}, 338 | } 339 | if series.resource 340 | else {}, 341 | "points": data_points, 342 | } 343 | result.append(series_data) 344 | 345 | return json.dumps( 346 | { 347 | "status": "success", 348 | "metric_type": metric_type, 349 | "time_range": { 350 | "start": start_time.ToDatetime().isoformat(), 351 | "end": end_time.ToDatetime().isoformat(), 352 | }, 353 | "time_series": result, 354 | "count": len(result), 355 | }, 356 | indent=2, 357 | ) 358 | except Exception as e: 359 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 360 | 361 | @mcp_instance.tool() 362 | def list_alert_policies(project_id: str = None, filter_str: str = "") -> str: 363 | """ 364 | List alert policies in a GCP project with optional filtering 365 | 366 | Args: 367 | project_id: GCP project ID (defaults to configured project) 368 | filter_str: Optional filter string (e.g., "display_name = \"High CPU Alert\"") 369 | """ 370 | try: 371 | # Get client from client_instances 372 | client = client_instances.get_clients().monitoring 373 | project_id = project_id or client_instances.get_project_id() 374 | 375 | parent = f"projects/{project_id}" 376 | request = monitoring_v3.ListAlertPoliciesRequest( 377 | name=parent, filter=filter_str 378 | ) 379 | 380 | print(f"Listing alert policies for project {project_id}...") 381 | policies = client.list_alert_policies(request=request) 382 | 383 | result = [] 384 | for policy in policies: 385 | policy_data = { 386 | "name": policy.name, 387 | "display_name": policy.display_name, 388 | "enabled": policy.enabled, 389 | "conditions_count": len(policy.conditions), 390 | "combiner": monitoring_v3.AlertPolicy.ConditionCombinerType.Name( 391 | policy.combiner 392 | ) 393 | if policy.combiner 394 | else None, 395 | "notification_channels": [ 396 | chan.split("/")[-1] for chan in policy.notification_channels 397 | ] 398 | if policy.notification_channels 399 | else [], 400 | "creation_time": policy.creation_record.mutate_time.ToDatetime().isoformat() 401 | if policy.creation_record and policy.creation_record.mutate_time 402 | else None, 403 | } 404 | result.append(policy_data) 405 | 406 | return json.dumps( 407 | {"status": "success", "alert_policies": result, "count": len(result)}, 408 | indent=2, 409 | ) 410 | except Exception as e: 411 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 412 | 413 | @mcp_instance.tool() 414 | def create_metric_threshold_alert( 415 | display_name: str, 416 | metric_type: str, 417 | filter_str: str, 418 | threshold_value: float, 419 | project_id: str = None, 420 | comparison: str = "COMPARISON_GT", 421 | duration_seconds: int = 300, 422 | alignment_period_seconds: int = 60, 423 | aligner: str = "ALIGN_MEAN", 424 | reducer: str = "REDUCE_MEAN", 425 | notification_channels: Optional[List[str]] = None, 426 | documentation: str = "", 427 | enabled: bool = True, 428 | ) -> str: 429 | """ 430 | Create a metric threshold alert policy 431 | 432 | Args: 433 | display_name: Human-readable name for the alert 434 | metric_type: The metric to alert on (e.g., "compute.googleapis.com/instance/cpu/utilization") 435 | filter_str: Filter string to define which resources to monitor 436 | threshold_value: The threshold value to trigger the alert 437 | project_id: GCP project ID (defaults to configured project) 438 | comparison: Comparison type (COMPARISON_GT, COMPARISON_GE, COMPARISON_LT, COMPARISON_LE, COMPARISON_EQ, COMPARISON_NE) 439 | duration_seconds: Duration in seconds the condition must be met to trigger 440 | alignment_period_seconds: Period in seconds for data point alignment 441 | aligner: Per-series aligner (ALIGN_MEAN, ALIGN_MAX, ALIGN_MIN, etc.) 442 | reducer: Cross-series reducer (REDUCE_MEAN, REDUCE_MAX, REDUCE_MIN, etc.) 443 | notification_channels: List of notification channel IDs 444 | documentation: Documentation for the alert (markdown supported) 445 | enabled: Whether the alert should be enabled 446 | """ 447 | try: 448 | # Get client from client_instances 449 | client = client_instances.get_clients().monitoring 450 | project_id = project_id or client_instances.get_project_id() 451 | 452 | # Validate and convert enums 453 | try: 454 | comparison_enum = getattr(monitoring_v3.ComparisonType, comparison) 455 | aligner_enum = getattr(monitoring_v3.Aggregation.Aligner, aligner) 456 | reducer_enum = getattr(monitoring_v3.Aggregation.Reducer, reducer) 457 | except AttributeError as e: 458 | return json.dumps( 459 | { 460 | "status": "error", 461 | "message": f"Invalid enum value: {str(e)}. Please check documentation for valid values.", 462 | }, 463 | indent=2, 464 | ) 465 | 466 | # Prepare notification channels with full paths 467 | full_notification_channels = [] 468 | if notification_channels: 469 | for channel in notification_channels: 470 | if not channel.startswith( 471 | f"projects/{project_id}/notificationChannels/" 472 | ): 473 | channel = ( 474 | f"projects/{project_id}/notificationChannels/{channel}" 475 | ) 476 | full_notification_channels.append(channel) 477 | 478 | # Create aggregation 479 | aggregation = monitoring_v3.Aggregation( 480 | alignment_period=Duration(seconds=alignment_period_seconds), 481 | per_series_aligner=aligner_enum, 482 | cross_series_reducer=reducer_enum, 483 | group_by_fields=[], 484 | ) 485 | 486 | # Create condition threshold 487 | condition_threshold = monitoring_v3.AlertPolicy.Condition.MetricThreshold( 488 | filter=f'metric.type = "{metric_type}" AND {filter_str}', 489 | comparison=comparison_enum, 490 | threshold_value=threshold_value, 491 | duration=Duration(seconds=duration_seconds), 492 | aggregations=[aggregation], 493 | ) 494 | 495 | # Create condition 496 | condition = monitoring_v3.AlertPolicy.Condition( 497 | display_name=f"Threshold condition for {display_name}", 498 | condition_threshold=condition_threshold, 499 | ) 500 | 501 | # Create alert policy 502 | alert_policy = monitoring_v3.AlertPolicy( 503 | display_name=display_name, 504 | conditions=[condition], 505 | combiner=monitoring_v3.AlertPolicy.ConditionCombinerType.OR, 506 | notification_channels=full_notification_channels, 507 | enabled=monitoring_v3.BoolValue(value=enabled), 508 | ) 509 | 510 | # Add documentation if provided 511 | if documentation: 512 | alert_policy.documentation = monitoring_v3.AlertPolicy.Documentation( 513 | content=documentation, mime_type="text/markdown" 514 | ) 515 | 516 | # Create the request 517 | request = monitoring_v3.CreateAlertPolicyRequest( 518 | name=f"projects/{project_id}", alert_policy=alert_policy 519 | ) 520 | 521 | print(f"Creating alert policy: {display_name}...") 522 | response = client.create_alert_policy(request=request) 523 | 524 | return json.dumps( 525 | { 526 | "status": "success", 527 | "name": response.name, 528 | "display_name": response.display_name, 529 | "enabled": response.enabled, 530 | "conditions_count": len(response.conditions), 531 | "notification_channels": response.notification_channels, 532 | }, 533 | indent=2, 534 | ) 535 | except Exception as e: 536 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 537 | 538 | @mcp_instance.tool() 539 | def update_alert_policy( 540 | alert_id: str, 541 | project_id: str = None, 542 | display_name: Optional[str] = None, 543 | notification_channels: Optional[List[str]] = None, 544 | enabled: Optional[bool] = None, 545 | documentation: Optional[str] = None, 546 | ) -> str: 547 | """ 548 | Update an existing alert policy 549 | 550 | Args: 551 | alert_id: ID of the alert to update 552 | project_id: GCP project ID (defaults to configured project) 553 | display_name: New name for the alert 554 | notification_channels: List of notification channel IDs 555 | enabled: Whether the alert should be enabled 556 | documentation: Documentation for the alert (markdown supported) 557 | """ 558 | try: 559 | # Get client from client_instances 560 | client = client_instances.get_clients().monitoring 561 | project_id = project_id or client_instances.get_project_id() 562 | 563 | # Get the existing policy 564 | name = f"projects/{project_id}/alertPolicies/{alert_id}" 565 | policy = client.get_alert_policy(name=name) 566 | 567 | # Update fields if provided 568 | if display_name: 569 | policy.display_name = display_name 570 | 571 | if notification_channels is not None: 572 | full_notification_channels = [] 573 | for channel in notification_channels: 574 | if not channel.startswith( 575 | f"projects/{project_id}/notificationChannels/" 576 | ): 577 | channel = ( 578 | f"projects/{project_id}/notificationChannels/{channel}" 579 | ) 580 | full_notification_channels.append(channel) 581 | policy.notification_channels = full_notification_channels 582 | 583 | if enabled is not None: 584 | policy.enabled = monitoring_v3.BoolValue(value=enabled) 585 | 586 | if documentation is not None: 587 | policy.documentation = monitoring_v3.AlertPolicy.Documentation( 588 | content=documentation, mime_type="text/markdown" 589 | ) 590 | 591 | # Create update mask 592 | update_mask = [] 593 | if display_name: 594 | update_mask.append("display_name") 595 | if notification_channels is not None: 596 | update_mask.append("notification_channels") 597 | if enabled is not None: 598 | update_mask.append("enabled") 599 | if documentation is not None: 600 | update_mask.append("documentation") 601 | 602 | # Update the policy 603 | request = monitoring_v3.UpdateAlertPolicyRequest( 604 | alert_policy=policy, update_mask={"paths": update_mask} 605 | ) 606 | 607 | print(f"Updating alert policy: {policy.name}...") 608 | response = client.update_alert_policy(request=request) 609 | 610 | return json.dumps( 611 | { 612 | "status": "success", 613 | "name": response.name, 614 | "display_name": response.display_name, 615 | "enabled": response.enabled, 616 | "conditions_count": len(response.conditions), 617 | "notification_channels": [ 618 | chan.split("/")[-1] for chan in response.notification_channels 619 | ] 620 | if response.notification_channels 621 | else [], 622 | }, 623 | indent=2, 624 | ) 625 | except Exception as e: 626 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 627 | 628 | @mcp_instance.tool() 629 | def delete_alert_policy(alert_id: str, project_id: str = None) -> str: 630 | """ 631 | Delete an alert policy 632 | 633 | Args: 634 | alert_id: ID of the alert to delete 635 | project_id: GCP project ID (defaults to configured project) 636 | """ 637 | try: 638 | # Get client from client_instances 639 | client = client_instances.get_clients().monitoring 640 | project_id = project_id or client_instances.get_project_id() 641 | 642 | name = f"projects/{project_id}/alertPolicies/{alert_id}" 643 | 644 | print(f"Deleting alert policy: {alert_id}...") 645 | client.delete_alert_policy(name=name) 646 | 647 | return json.dumps( 648 | { 649 | "status": "success", 650 | "message": f"Alert policy {alert_id} successfully deleted", 651 | }, 652 | indent=2, 653 | ) 654 | except Exception as e: 655 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 656 | 657 | @mcp_instance.tool() 658 | def create_notification_channel( 659 | display_name: str, 660 | channel_type: str, 661 | labels: Dict[str, str], 662 | project_id: str = None, 663 | description: str = "", 664 | enabled: bool = True, 665 | ) -> str: 666 | """ 667 | Create a notification channel 668 | 669 | Args: 670 | display_name: Human-readable name for the channel 671 | channel_type: Type of channel (email, sms, slack, pagerduty, etc.) 672 | labels: Channel-specific configuration (e.g., {"email_address": "[email protected]"}) 673 | project_id: GCP project ID (defaults to configured project) 674 | description: Optional description for the channel 675 | enabled: Whether the channel should be enabled 676 | """ 677 | try: 678 | # Get client from client_instances 679 | client = client_instances.get_clients().monitoring 680 | project_id = project_id or client_instances.get_project_id() 681 | 682 | # Create notification channel 683 | notification_channel = monitoring_v3.NotificationChannel( 684 | type=channel_type, 685 | display_name=display_name, 686 | description=description, 687 | labels=labels, 688 | enabled=monitoring_v3.BoolValue(value=enabled), 689 | ) 690 | 691 | # Create the request 692 | request = monitoring_v3.CreateNotificationChannelRequest( 693 | name=f"projects/{project_id}", notification_channel=notification_channel 694 | ) 695 | 696 | print(f"Creating notification channel: {display_name} ({channel_type})...") 697 | response = client.create_notification_channel(request=request) 698 | 699 | return json.dumps( 700 | { 701 | "status": "success", 702 | "name": response.name, 703 | "type": response.type, 704 | "display_name": response.display_name, 705 | "description": response.description, 706 | "verification_status": monitoring_v3.NotificationChannel.VerificationStatus.Name( 707 | response.verification_status 708 | ) 709 | if response.verification_status 710 | else None, 711 | "enabled": response.enabled, 712 | "labels": dict(response.labels) if response.labels else {}, 713 | }, 714 | indent=2, 715 | ) 716 | except Exception as e: 717 | return json.dumps({"status": "error", "message": str(e)}, indent=2) 718 | 719 | # Prompts 720 | @mcp_instance.prompt() 721 | def create_alert_prompt() -> str: 722 | """Prompt for creating a new alert policy""" 723 | return """ 724 | I need to create a new alert policy in Cloud Monitoring. 725 | 726 | Please help me with: 727 | 1. Selecting the appropriate metric type for my alert 728 | 2. Setting up sensible thresholds and durations 729 | 3. Understanding the different comparison types 730 | 4. Best practices for alert documentation 731 | 5. Setting up notification channels 732 | 733 | I'd like to create an alert that triggers when: 734 | """ 735 | 736 | @mcp_instance.prompt() 737 | def monitor_resources_prompt() -> str: 738 | """Prompt for guidance on monitoring GCP resources""" 739 | return """ 740 | I need to set up monitoring for my GCP resources. Please help me understand: 741 | 742 | 1. What are the most important metrics I should be monitoring for: 743 | - Compute Engine instances 744 | - Cloud SQL databases 745 | - Cloud Storage buckets 746 | - App Engine applications 747 | - Kubernetes Engine clusters 748 | 749 | 2. What are recommended thresholds for alerts on these resources? 750 | 751 | 3. How should I organize my monitoring to keep it manageable? 752 | 753 | 4. What visualization options do I have in Cloud Monitoring? 754 | """ 755 | ```