This is page 18 of 19. Use http://codebase.md/aws-samples/sample-cfm-tips-mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .gitignore
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── diagnose_cost_optimization_hub_v2.py
├── LICENSE
├── logging_config.py
├── mcp_runbooks.json
├── mcp_server_with_runbooks.py
├── playbooks
│ ├── __init__.py
│ ├── aws_lambda
│ │ ├── __init__.py
│ │ └── lambda_optimization.py
│ ├── cloudtrail
│ │ ├── __init__.py
│ │ └── cloudtrail_optimization.py
│ ├── cloudtrail_optimization.py
│ ├── cloudwatch
│ │ ├── __init__.py
│ │ ├── aggregation_queries.py
│ │ ├── alarms_and_dashboards_analyzer.py
│ │ ├── analysis_engine.py
│ │ ├── base_analyzer.py
│ │ ├── cloudwatch_optimization_analyzer.py
│ │ ├── cloudwatch_optimization_tool.py
│ │ ├── cloudwatch_optimization.py
│ │ ├── cost_controller.py
│ │ ├── general_spend_analyzer.py
│ │ ├── logs_optimization_analyzer.py
│ │ ├── metrics_optimization_analyzer.py
│ │ ├── optimization_orchestrator.py
│ │ └── result_processor.py
│ ├── comprehensive_optimization.py
│ ├── ebs
│ │ ├── __init__.py
│ │ └── ebs_optimization.py
│ ├── ebs_optimization.py
│ ├── ec2
│ │ ├── __init__.py
│ │ └── ec2_optimization.py
│ ├── ec2_optimization.py
│ ├── lambda_optimization.py
│ ├── rds
│ │ ├── __init__.py
│ │ └── rds_optimization.py
│ ├── rds_optimization.py
│ └── s3
│ ├── __init__.py
│ ├── analyzers
│ │ ├── __init__.py
│ │ ├── api_cost_analyzer.py
│ │ ├── archive_optimization_analyzer.py
│ │ ├── general_spend_analyzer.py
│ │ ├── governance_analyzer.py
│ │ ├── multipart_cleanup_analyzer.py
│ │ └── storage_class_analyzer.py
│ ├── base_analyzer.py
│ ├── s3_aggregation_queries.py
│ ├── s3_analysis_engine.py
│ ├── s3_comprehensive_optimization_tool.py
│ ├── s3_optimization_orchestrator.py
│ └── s3_optimization.py
├── README.md
├── requirements.txt
├── runbook_functions_extended.py
├── runbook_functions.py
├── RUNBOOKS_GUIDE.md
├── services
│ ├── __init__.py
│ ├── cloudwatch_pricing.py
│ ├── cloudwatch_service_vended_log.py
│ ├── cloudwatch_service.py
│ ├── compute_optimizer.py
│ ├── cost_explorer.py
│ ├── optimization_hub.py
│ ├── performance_insights.py
│ ├── pricing.py
│ ├── s3_pricing.py
│ ├── s3_service.py
│ ├── storage_lens_service.py
│ └── trusted_advisor.py
├── setup.py
├── test_runbooks.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── cloudwatch
│ │ │ └── test_cloudwatch_integration.py
│ │ ├── test_cloudwatch_comprehensive_tool_integration.py
│ │ ├── test_cloudwatch_orchestrator_integration.py
│ │ ├── test_integration_suite.py
│ │ └── test_orchestrator_integration.py
│ ├── legacy
│ │ ├── example_output_with_docs.py
│ │ ├── example_wellarchitected_output.py
│ │ ├── test_aws_session_management.py
│ │ ├── test_cloudwatch_orchestrator_pagination.py
│ │ ├── test_cloudwatch_pagination_integration.py
│ │ ├── test_cloudwatch_performance_optimizations.py
│ │ ├── test_cloudwatch_result_processor.py
│ │ ├── test_cloudwatch_timeout_issue.py
│ │ ├── test_documentation_links.py
│ │ ├── test_metrics_pagination_count.py
│ │ ├── test_orchestrator_integration.py
│ │ ├── test_pricing_cache_fix_moved.py
│ │ ├── test_pricing_cache_fix.py
│ │ ├── test_runbook_integration.py
│ │ ├── test_runbooks.py
│ │ ├── test_setup_verification.py
│ │ └── test_stack_trace_fix.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── cloudwatch
│ │ │ └── test_cloudwatch_performance.py
│ │ ├── test_cloudwatch_parallel_execution.py
│ │ ├── test_parallel_execution.py
│ │ └── test_performance_suite.py
│ ├── pytest-cloudwatch.ini
│ ├── pytest.ini
│ ├── README.md
│ ├── requirements-test.txt
│ ├── run_cloudwatch_tests.py
│ ├── run_tests.py
│ ├── test_setup_verification.py
│ ├── test_suite_main.py
│ └── unit
│ ├── __init__.py
│ ├── analyzers
│ │ ├── __init__.py
│ │ ├── conftest_cloudwatch.py
│ │ ├── test_alarms_and_dashboards_analyzer.py
│ │ ├── test_base_analyzer.py
│ │ ├── test_cloudwatch_base_analyzer.py
│ │ ├── test_cloudwatch_cost_constraints.py
│ │ ├── test_cloudwatch_general_spend_analyzer.py
│ │ ├── test_general_spend_analyzer.py
│ │ ├── test_logs_optimization_analyzer.py
│ │ └── test_metrics_optimization_analyzer.py
│ ├── cloudwatch
│ │ ├── test_cache_control.py
│ │ ├── test_cloudwatch_api_mocking.py
│ │ ├── test_cloudwatch_metrics_pagination.py
│ │ ├── test_cloudwatch_pagination_architecture.py
│ │ ├── test_cloudwatch_pagination_comprehensive_fixed.py
│ │ ├── test_cloudwatch_pagination_comprehensive.py
│ │ ├── test_cloudwatch_pagination_fixed.py
│ │ ├── test_cloudwatch_pagination_real_format.py
│ │ ├── test_cloudwatch_pagination_simple.py
│ │ ├── test_cloudwatch_query_pagination.py
│ │ ├── test_cloudwatch_unit_suite.py
│ │ ├── test_general_spend_tips_refactor.py
│ │ ├── test_import_error.py
│ │ ├── test_mcp_pagination_bug.py
│ │ └── test_mcp_surface_pagination.py
│ ├── s3
│ │ └── live
│ │ ├── test_bucket_listing.py
│ │ ├── test_s3_governance_bucket_discovery.py
│ │ └── test_top_buckets.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── test_cloudwatch_cost_controller.py
│ │ ├── test_cloudwatch_query_service.py
│ │ ├── test_cloudwatch_service.py
│ │ ├── test_cost_control_routing.py
│ │ └── test_s3_service.py
│ └── test_unit_suite.py
└── utils
├── __init__.py
├── aws_client_factory.py
├── cache_decorator.py
├── cleanup_manager.py
├── cloudwatch_cache.py
├── documentation_links.py
├── error_handler.py
├── intelligent_cache.py
├── logging_config.py
├── memory_manager.py
├── parallel_executor.py
├── performance_monitor.py
├── progressive_timeout.py
├── service_orchestrator.py
└── session_manager.py
```
# Files
--------------------------------------------------------------------------------
/services/cloudwatch_service.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | CloudWatch Service for CFM Tips MCP Server
3 |
4 | Clean architecture with specialized classes for each CloudWatch functionality:
5 | - CWGeneralSpendTips: General spending analysis and cost optimization
6 | - CWMetricsTips: Custom metrics optimization and analysis
7 | - CWLogsTips: Logs ingestion, storage, and retention optimization
8 | - CWAlarmsTips: Alarms configuration and cost optimization
9 | - CWDashboardTips: Dashboard management and optimization
10 |
11 | Internal components (not exposed outside this file):
12 | - CloudWatchDAO: Data access for CloudWatch APIs
13 | - AWSPricingDAO: Data access for AWS Pricing APIs
14 | - CloudWatchCache: Caching layer for performance
15 | """
16 |
17 | import logging
18 | import boto3
19 | import time
20 | import json
21 | from typing import Dict, List, Any, Optional
22 | from datetime import datetime, timedelta, timezone
23 | from botocore.exceptions import ClientError
24 | from dataclasses import dataclass, field
25 |
26 | from playbooks.cloudwatch.cost_controller import CostController, CostPreferences
27 | from utils.logging_config import log_cloudwatch_operation
28 |
29 | logger = logging.getLogger(__name__)
30 |
31 |
32 | @dataclass
33 | class CloudWatchOperationResult:
34 | """Result of a CloudWatch operation with cost tracking."""
35 | success: bool
36 | data: Any = None
37 | error_message: Optional[str] = None
38 | operation_name: str = ""
39 | cost_incurred: bool = False
40 | operation_type: str = "free"
41 | execution_time: float = 0.0
42 | fallback_used: bool = False
43 | primary_data_source: str = "cloudwatch_api"
44 | api_calls_made: List[str] = field(default_factory=list)
45 |
46 |
47 | @dataclass
48 | class CloudWatchServiceConfig:
49 | """Configuration for CloudWatch service operations."""
50 | region: Optional[str] = None
51 | max_retries: int = 3
52 | retry_delay: float = 1.0
53 | timeout_seconds: float = 30.0
54 | enable_cost_tracking: bool = True
55 | enable_fallback: bool = True
56 | cost_preferences: Optional[CostPreferences] = None
57 |
58 |
59 | class CloudWatchCache:
60 | """Internal caching layer for CloudWatch data. Not exposed outside this file."""
61 |
62 | def __init__(self, ttl_seconds: int = 300):
63 | self.ttl_seconds = ttl_seconds
64 | self._cache = {}
65 | self._timestamps = {}
66 |
67 | def get(self, key: str) -> Optional[Any]:
68 | """Get cached value if not expired."""
69 | if key not in self._cache:
70 | return None
71 |
72 | if time.time() - self._timestamps[key] > self.ttl_seconds:
73 | self._invalidate(key)
74 | return None
75 |
76 | return self._cache[key]
77 |
78 | def set(self, key: str, value: Any) -> None:
79 | """Set cached value with timestamp."""
80 | self._cache[key] = value
81 | self._timestamps[key] = time.time()
82 |
83 | def _invalidate(self, key: str) -> None:
84 | """Remove expired cache entry."""
85 | self._cache.pop(key, None)
86 | self._timestamps.pop(key, None)
87 |
88 | def clear(self) -> None:
89 | """Clear all cached data."""
90 | self._cache.clear()
91 | self._timestamps.clear()
92 |
93 | def get_stats(self) -> Dict[str, Any]:
94 | """Get cache statistics."""
95 | current_time = time.time()
96 | expired_count = sum(
97 | 1 for ts in self._timestamps.values()
98 | if current_time - ts > self.ttl_seconds
99 | )
100 |
101 | return {
102 | 'total_entries': len(self._cache),
103 | 'expired_entries': expired_count,
104 | 'valid_entries': len(self._cache) - expired_count,
105 | 'ttl_seconds': self.ttl_seconds
106 | }
107 |
108 |
109 | class AWSPricingDAO:
110 | """Data Access Object for AWS Pricing API. Not exposed outside this file."""
111 |
112 | def __init__(self, region: str = 'us-east-1'):
113 | self.region = region
114 | self._cache = CloudWatchCache(ttl_seconds=3600) # 1 hour cache for pricing
115 | self.pricing_client = boto3.client('pricing', region_name='us-east-1')
116 |
117 | # Region mapping for pricing API
118 | self._region_map = {
119 | 'us-east-1': 'US East (N. Virginia)',
120 | 'us-east-2': 'US East (Ohio)',
121 | 'us-west-1': 'US West (N. California)',
122 | 'us-west-2': 'US West (Oregon)',
123 | 'eu-central-1': 'Europe (Frankfurt)',
124 | 'eu-west-1': 'Europe (Ireland)',
125 | 'eu-west-2': 'Europe (London)',
126 | 'ap-southeast-1': 'Asia Pacific (Singapore)',
127 | 'ap-southeast-2': 'Asia Pacific (Sydney)',
128 | 'ap-northeast-1': 'Asia Pacific (Tokyo)',
129 | }
130 |
131 | # Free tier limits
132 | self._free_tier = {
133 | 'logs_ingestion_gb': 5.0,
134 | 'logs_storage_gb': 5.0,
135 | 'metrics_count': 10,
136 | 'api_requests': 1000000,
137 | 'alarms_count': 10,
138 | 'dashboards_count': 3,
139 | 'dashboard_metrics': 50
140 | }
141 |
142 | def get_pricing_data(self, component: str) -> Dict[str, Any]:
143 | """Get pricing data for CloudWatch components with caching."""
144 | cache_key = f"pricing_{component}_{self.region}"
145 | cached_result = self._cache.get(cache_key)
146 | if cached_result:
147 | return cached_result
148 |
149 | # Fallback pricing data (typical CloudWatch pricing)
150 | pricing_data = {
151 | 'logs': {
152 | 'ingestion_per_gb': 0.50,
153 | 'storage_per_gb_month': 0.03,
154 | 'insights_per_gb_scanned': 0.005,
155 | 'vended_logs_per_gb': 0.10,
156 | 'cross_region_delivery_per_gb': 0.02
157 | },
158 | 'metrics': {
159 | 'custom_metrics_per_metric': 0.30,
160 | 'detailed_monitoring_per_instance': 2.10,
161 | 'high_resolution_metrics_per_metric': 0.30,
162 | 'api_requests_per_1000': 0.01,
163 | 'get_metric_statistics_per_1000': 0.01,
164 | 'put_metric_data_per_1000': 0.01
165 | },
166 | 'alarms': {
167 | 'standard_alarms_per_alarm': 0.10,
168 | 'high_resolution_alarms_per_alarm': 0.30,
169 | 'composite_alarms_per_alarm': 0.50,
170 | 'alarm_actions_sns': 0.0,
171 | 'alarm_actions_autoscaling': 0.0,
172 | 'alarm_actions_ec2': 0.0
173 | },
174 | 'dashboards': {
175 | 'dashboard_per_month': 3.00,
176 | 'metrics_per_dashboard_free': 50,
177 | 'additional_metrics_per_metric': 0.0,
178 | 'dashboard_api_requests_per_1000': 0.01
179 | }
180 | }
181 |
182 | result = pricing_data.get(component, {})
183 | self._cache.set(cache_key, result)
184 | return result
185 |
186 | def get_free_tier_limits(self) -> Dict[str, Any]:
187 | """Get free tier limits for CloudWatch services."""
188 | return self._free_tier.copy()
189 |
190 | def calculate_cost(self, component: str, usage: Dict[str, Any]) -> Dict[str, Any]:
191 | """Calculate costs for CloudWatch components with free tier consideration."""
192 | pricing = self.get_pricing_data(component)
193 |
194 | if component == 'logs':
195 | return self._calculate_logs_cost(usage, pricing)
196 | elif component == 'metrics':
197 | return self._calculate_metrics_cost(usage, pricing)
198 | elif component == 'alarms':
199 | return self._calculate_alarms_cost(usage, pricing)
200 | elif component == 'dashboards':
201 | return self._calculate_dashboards_cost(usage, pricing)
202 | else:
203 | return {'status': 'error', 'message': f'Unknown component: {component}'}
204 |
205 | def _calculate_logs_cost(self, usage: Dict[str, Any], pricing: Dict[str, Any]) -> Dict[str, Any]:
206 | """Calculate CloudWatch Logs costs."""
207 | ingestion_gb = usage.get('ingestion_gb', 0)
208 | storage_gb = usage.get('storage_gb', 0)
209 | insights_gb_scanned = usage.get('insights_gb_scanned', 0)
210 |
211 | ingestion_billable = max(0, ingestion_gb - self._free_tier['logs_ingestion_gb'])
212 | storage_billable = max(0, storage_gb - self._free_tier['logs_storage_gb'])
213 |
214 | ingestion_cost = ingestion_billable * pricing['ingestion_per_gb']
215 | storage_cost = storage_billable * pricing['storage_per_gb_month']
216 | insights_cost = insights_gb_scanned * pricing['insights_per_gb_scanned']
217 |
218 | total_cost = ingestion_cost + storage_cost + insights_cost
219 |
220 | return {
221 | 'status': 'success',
222 | 'usage': usage,
223 | 'billable_usage': {
224 | 'ingestion_gb': ingestion_billable,
225 | 'storage_gb': storage_billable,
226 | 'insights_gb_scanned': insights_gb_scanned
227 | },
228 | 'cost_breakdown': {
229 | 'ingestion_cost': round(ingestion_cost, 4),
230 | 'storage_cost': round(storage_cost, 4),
231 | 'insights_cost': round(insights_cost, 4)
232 | },
233 | 'total_monthly_cost': round(total_cost, 4),
234 | 'total_annual_cost': round(total_cost * 12, 2)
235 | }
236 |
237 | def _calculate_metrics_cost(self, usage: Dict[str, Any], pricing: Dict[str, Any]) -> Dict[str, Any]:
238 | """Calculate CloudWatch Metrics costs."""
239 | custom_metrics_count = usage.get('custom_metrics_count', 0)
240 | api_requests_count = usage.get('api_requests_count', 0)
241 | detailed_monitoring_instances = usage.get('detailed_monitoring_instances', 0)
242 |
243 | metrics_billable = max(0, custom_metrics_count - self._free_tier['metrics_count'])
244 | requests_billable = max(0, api_requests_count - self._free_tier['api_requests'])
245 |
246 | metrics_cost = metrics_billable * pricing['custom_metrics_per_metric']
247 | requests_cost = (requests_billable / 1000) * pricing['api_requests_per_1000']
248 | detailed_monitoring_cost = detailed_monitoring_instances * pricing['detailed_monitoring_per_instance']
249 |
250 | total_cost = metrics_cost + requests_cost + detailed_monitoring_cost
251 |
252 | return {
253 | 'status': 'success',
254 | 'usage': usage,
255 | 'billable_usage': {
256 | 'custom_metrics_count': metrics_billable,
257 | 'api_requests_count': requests_billable,
258 | 'detailed_monitoring_instances': detailed_monitoring_instances
259 | },
260 | 'cost_breakdown': {
261 | 'metrics_cost': round(metrics_cost, 4),
262 | 'requests_cost': round(requests_cost, 4),
263 | 'detailed_monitoring_cost': round(detailed_monitoring_cost, 4)
264 | },
265 | 'total_monthly_cost': round(total_cost, 4),
266 | 'total_annual_cost': round(total_cost * 12, 2)
267 | }
268 |
269 | def _calculate_alarms_cost(self, usage: Dict[str, Any], pricing: Dict[str, Any]) -> Dict[str, Any]:
270 | """Calculate CloudWatch Alarms costs."""
271 | standard_alarms_count = usage.get('standard_alarms_count', 0)
272 | high_resolution_alarms_count = usage.get('high_resolution_alarms_count', 0)
273 | composite_alarms_count = usage.get('composite_alarms_count', 0)
274 |
275 | standard_billable = max(0, standard_alarms_count - self._free_tier['alarms_count'])
276 |
277 | standard_cost = standard_billable * pricing['standard_alarms_per_alarm']
278 | high_resolution_cost = high_resolution_alarms_count * pricing['high_resolution_alarms_per_alarm']
279 | composite_cost = composite_alarms_count * pricing['composite_alarms_per_alarm']
280 |
281 | total_cost = standard_cost + high_resolution_cost + composite_cost
282 |
283 | return {
284 | 'status': 'success',
285 | 'usage': usage,
286 | 'billable_usage': {
287 | 'standard_alarms_count': standard_billable,
288 | 'high_resolution_alarms_count': high_resolution_alarms_count,
289 | 'composite_alarms_count': composite_alarms_count
290 | },
291 | 'cost_breakdown': {
292 | 'standard_alarms_cost': round(standard_cost, 4),
293 | 'high_resolution_alarms_cost': round(high_resolution_cost, 4),
294 | 'composite_alarms_cost': round(composite_cost, 4)
295 | },
296 | 'total_monthly_cost': round(total_cost, 4),
297 | 'total_annual_cost': round(total_cost * 12, 2)
298 | }
299 |
300 | def _calculate_dashboards_cost(self, usage: Dict[str, Any], pricing: Dict[str, Any]) -> Dict[str, Any]:
301 | """Calculate CloudWatch Dashboards costs."""
302 | dashboards_count = usage.get('dashboards_count', 0)
303 |
304 | dashboards_billable = max(0, dashboards_count - self._free_tier['dashboards_count'])
305 | dashboards_cost = dashboards_billable * pricing['dashboard_per_month']
306 |
307 | return {
308 | 'status': 'success',
309 | 'usage': usage,
310 | 'billable_usage': {
311 | 'dashboards_count': dashboards_billable
312 | },
313 | 'cost_breakdown': {
314 | 'dashboards_cost': round(dashboards_cost, 4)
315 | },
316 | 'total_monthly_cost': round(dashboards_cost, 4),
317 | 'total_annual_cost': round(dashboards_cost * 12, 2)
318 | }
319 |
320 |
321 | class CloudWatchDAO:
322 | """Data Access Object for CloudWatch operations. Not exposed outside this file."""
323 |
324 | def __init__(self, region: Optional[str] = None, cost_controller: Optional[CostController] = None):
325 | self.region = region
326 | self.cost_controller = cost_controller or CostController()
327 | self._cache = CloudWatchCache()
328 |
329 | # Initialize AWS clients
330 | self.cloudwatch_client = boto3.client('cloudwatch', region_name=self.region)
331 | self.logs_client = boto3.client('logs', region_name=self.region)
332 |
333 | logger.debug(f"CloudWatch DAO initialized for region: {self.region}")
334 |
335 | async def list_metrics(self, namespace: Optional[str] = None,
336 | metric_name: Optional[str] = None,
337 | dimensions: Optional[List[Dict[str, str]]] = None) -> Dict[str, Any]:
338 | """List CloudWatch metrics with caching."""
339 | cache_key = f"metrics_{namespace}_{metric_name}_{hash(str(dimensions))}"
340 | cached_result = self._cache.get(cache_key)
341 | if cached_result:
342 | return cached_result
343 |
344 | params = {}
345 | if namespace:
346 | params['Namespace'] = namespace
347 | if metric_name:
348 | params['MetricName'] = metric_name
349 | if dimensions:
350 | params['Dimensions'] = dimensions
351 |
352 | metrics = []
353 | paginator = self.cloudwatch_client.get_paginator('list_metrics')
354 |
355 | for page in paginator.paginate(**params):
356 | metrics.extend(page.get('Metrics', []))
357 |
358 | result = {
359 | 'metrics': metrics,
360 | 'total_count': len(metrics),
361 | 'namespace': namespace,
362 | 'filtered': bool(namespace or metric_name or dimensions)
363 | }
364 |
365 | self._cache.set(cache_key, result)
366 | return result
367 |
368 | async def describe_alarms(self, alarm_names: Optional[List[str]] = None,
369 | alarm_name_prefix: Optional[str] = None,
370 | state_value: Optional[str] = None) -> Dict[str, Any]:
371 | """Describe CloudWatch alarms with caching."""
372 | cache_key = f"alarms_{hash(str(alarm_names))}_{alarm_name_prefix}_{state_value}"
373 | cached_result = self._cache.get(cache_key)
374 | if cached_result:
375 | return cached_result
376 |
377 | params = {}
378 | if alarm_names:
379 | params['AlarmNames'] = alarm_names
380 | if alarm_name_prefix:
381 | params['AlarmNamePrefix'] = alarm_name_prefix
382 | if state_value:
383 | params['StateValue'] = state_value
384 |
385 | alarms = []
386 | paginator = self.cloudwatch_client.get_paginator('describe_alarms')
387 |
388 | for page in paginator.paginate(**params):
389 | alarms.extend(page.get('MetricAlarms', []))
390 | alarms.extend(page.get('CompositeAlarms', []))
391 |
392 | result = {
393 | 'alarms': alarms,
394 | 'total_count': len(alarms),
395 | 'filtered': bool(alarm_names or alarm_name_prefix or state_value)
396 | }
397 |
398 | self._cache.set(cache_key, result)
399 | return result
400 |
401 | async def list_dashboards(self, dashboard_name_prefix: Optional[str] = None) -> Dict[str, Any]:
402 | """List CloudWatch dashboards with caching."""
403 | cache_key = f"dashboards_{dashboard_name_prefix}"
404 | cached_result = self._cache.get(cache_key)
405 | if cached_result:
406 | return cached_result
407 |
408 | params = {}
409 | if dashboard_name_prefix:
410 | params['DashboardNamePrefix'] = dashboard_name_prefix
411 |
412 | dashboards = []
413 | paginator = self.cloudwatch_client.get_paginator('list_dashboards')
414 |
415 | for page in paginator.paginate(**params):
416 | dashboards.extend(page.get('DashboardEntries', []))
417 |
418 | result = {
419 | 'dashboards': dashboards,
420 | 'total_count': len(dashboards),
421 | 'filtered': bool(dashboard_name_prefix)
422 | }
423 |
424 | self._cache.set(cache_key, result)
425 | return result
426 |
427 | async def describe_log_groups(self, log_group_name_prefix: Optional[str] = None,
428 | log_group_names: Optional[List[str]] = None) -> Dict[str, Any]:
429 | """Describe CloudWatch log groups with caching."""
430 | cache_key = f"log_groups_{log_group_name_prefix}_{hash(str(log_group_names))}"
431 | cached_result = self._cache.get(cache_key)
432 | if cached_result:
433 | return cached_result
434 |
435 | params = {}
436 | if log_group_name_prefix:
437 | params['logGroupNamePrefix'] = log_group_name_prefix
438 | if log_group_names:
439 | params['logGroupNames'] = log_group_names
440 |
441 | log_groups = []
442 | paginator = self.logs_client.get_paginator('describe_log_groups')
443 |
444 | for page in paginator.paginate(**params):
445 | log_groups.extend(page.get('logGroups', []))
446 |
447 | result = {
448 | 'log_groups': log_groups,
449 | 'total_count': len(log_groups),
450 | 'filtered': bool(log_group_name_prefix or log_group_names)
451 | }
452 |
453 | self._cache.set(cache_key, result)
454 | return result
455 |
456 | async def get_dashboard(self, dashboard_name: str) -> Dict[str, Any]:
457 | """Get dashboard configuration with caching."""
458 | cache_key = f"dashboard_config_{dashboard_name}"
459 | cached_result = self._cache.get(cache_key)
460 | if cached_result:
461 | return cached_result
462 |
463 | response = self.cloudwatch_client.get_dashboard(DashboardName=dashboard_name)
464 |
465 | result = {
466 | 'dashboard_name': dashboard_name,
467 | 'dashboard_body': response.get('DashboardBody', '{}'),
468 | 'dashboard_arn': response.get('DashboardArn')
469 | }
470 |
471 | self._cache.set(cache_key, result)
472 | return result
473 |
474 | async def get_metric_statistics(self, namespace: str, metric_name: str,
475 | dimensions: List[Dict[str, str]],
476 | start_time: datetime, end_time: datetime,
477 | period: int = 3600,
478 | statistics: List[str] = None) -> Dict[str, Any]:
479 | """Get metric statistics (paid operation)."""
480 | stats_list = statistics or ['Average', 'Sum', 'Maximum']
481 |
482 | response = self.cloudwatch_client.get_metric_statistics(
483 | Namespace=namespace,
484 | MetricName=metric_name,
485 | Dimensions=dimensions,
486 | StartTime=start_time,
487 | EndTime=end_time,
488 | Period=period,
489 | Statistics=stats_list
490 | )
491 |
492 | datapoints = response.get('Datapoints', [])
493 | datapoints.sort(key=lambda x: x['Timestamp'])
494 |
495 | return {
496 | 'namespace': namespace,
497 | 'metric_name': metric_name,
498 | 'dimensions': dimensions,
499 | 'datapoints': datapoints,
500 | 'total_datapoints': len(datapoints),
501 | 'period': period,
502 | 'statistics': stats_list,
503 | 'start_time': start_time.isoformat(),
504 | 'end_time': end_time.isoformat()
505 | }
506 |
507 | async def get_metrics_usage_batch(self, metrics: List[Dict[str, Any]],
508 | lookback_days: int = 30,
509 | batch_size: int = 500) -> None:
510 | """
511 | Get usage data for multiple metrics in batches (paid operation).
512 |
513 | Uses GetMetricData API which supports up to 500 metrics per request.
514 | Results are cached to avoid repeated API calls.
515 | Updates metrics in place with datapoint_count and usage_estimation_method.
516 |
517 | Args:
518 | metrics: List of metric dictionaries to analyze (modified in place)
519 | lookback_days: Number of days to look back for usage data
520 | batch_size: Number of metrics per API call (max 500)
521 | """
522 | if not metrics:
523 | return
524 |
525 | end_time = datetime.now(timezone.utc)
526 | start_time = end_time - timedelta(days=lookback_days)
527 |
528 | # Process metrics in batches of up to 500
529 | for batch_start in range(0, len(metrics), batch_size):
530 | batch = metrics[batch_start:batch_start + batch_size]
531 |
532 | # Check cache first for this batch
533 | cache_key = f"metrics_usage_{hash(str([(m['namespace'], m['metric_name'], str(m['dimensions'])) for m in batch]))}_{lookback_days}"
534 | cached_result = self._cache.get(cache_key)
535 |
536 | if cached_result:
537 | logger.debug(f"Using cached usage data for batch {batch_start//batch_size + 1}")
538 | # Apply cached results
539 | for i, metric in enumerate(batch):
540 | if i < len(cached_result):
541 | metric.update(cached_result[i])
542 | continue
543 |
544 | # Build metric queries for this batch
545 | metric_queries = []
546 | for idx, metric in enumerate(batch):
547 | metric_queries.append({
548 | 'Id': f'm{idx}',
549 | 'MetricStat': {
550 | 'Metric': {
551 | 'Namespace': metric['namespace'],
552 | 'MetricName': metric['metric_name'],
553 | 'Dimensions': metric['dimensions']
554 | },
555 | 'Period': 3600, # 1 hour
556 | 'Stat': 'SampleCount'
557 | }
558 | })
559 |
560 | # Single batched API call for up to 500 metrics
561 | try:
562 | logger.debug(f"Fetching usage data for batch {batch_start//batch_size + 1} ({len(batch)} metrics)")
563 |
564 | response = self.cloudwatch_client.get_metric_data(
565 | MetricDataQueries=metric_queries,
566 | StartTime=start_time,
567 | EndTime=end_time
568 | )
569 |
570 | # Process results and update metrics in place
571 | batch_results = []
572 | for idx, result in enumerate(response.get('MetricDataResults', [])):
573 | if idx < len(batch):
574 | datapoint_count = len(result.get('Values', []))
575 |
576 | # Update metric in place
577 | batch[idx]['datapoint_count'] = datapoint_count
578 | batch[idx]['usage_period_days'] = lookback_days
579 | batch[idx]['usage_estimation_method'] = 'exact_paid'
580 |
581 | # Store for cache
582 | batch_results.append({
583 | 'datapoint_count': datapoint_count,
584 | 'usage_period_days': lookback_days,
585 | 'usage_estimation_method': 'exact_paid'
586 | })
587 |
588 | # Cache the results
589 | self._cache.set(cache_key, batch_results)
590 |
591 | except Exception as e:
592 | logger.error(f"Failed to get usage data for batch {batch_start//batch_size + 1}: {str(e)}")
593 | # Mark all metrics in batch as failed
594 | for metric in batch:
595 | metric['datapoint_count'] = 0
596 | metric['usage_period_days'] = lookback_days
597 | metric['usage_estimation_method'] = 'failed'
598 |
599 | def clear_cache(self) -> None:
600 | """Clear all cached data."""
601 | self._cache.clear()
602 |
603 | def get_cache_stats(self) -> Dict[str, Any]:
604 | """Get cache statistics."""
605 | return self._cache.get_stats()
606 |
607 |
608 | class CWGeneralSpendTips:
609 | """
610 | CloudWatch general spending analysis with 4 public methods.
611 |
612 | Public methods:
613 | - getLogs(): Returns log groups ordered by spend (descending), paginated
614 | - getMetrics(): Returns custom metrics ordered by dimension count (descending), paginated
615 | - getDashboards(): Returns dashboards ordered by custom metrics count (descending), paginated
616 | - getAlarms(): Returns alarms with cost information, paginated
617 | """
618 |
619 | def __init__(self, dao: CloudWatchDAO, pricing_dao: AWSPricingDAO, cost_preferences: CostPreferences):
620 | self.dao = dao
621 | self.pricing_dao = pricing_dao
622 | self.cost_preferences = cost_preferences
623 | self._page_size = 10 # Items per page
624 |
625 | async def getLogs(self, page: int = 1, log_group_name_prefix: Optional[str] = None,
626 | can_spend_for_estimate: bool = False,
627 | estimate_ingestion_from_metadata: bool = True,
628 | lookback_days: int = 30) -> Dict[str, Any]:
629 | """
630 | Get log groups ordered by estimated spend (descending), paginated.
631 |
632 | Args:
633 | page: Page number (1-based)
634 | log_group_name_prefix: Optional filter for log group names
635 | can_spend_for_estimate: If True, uses CloudWatch Metrics API (paid) to get accurate
636 | ingestion data from IncomingBytes metric. If False (default),
637 | uses free estimation methods. Enabling this provides accurate
638 | cost estimates but incurs minimal CloudWatch API charges.
639 | estimate_ingestion_from_metadata: If True (default) and can_spend_for_estimate=False,
640 | estimates ingestion using free metadata (retention
641 | policy and log group age). If False, only storage
642 | costs are calculated. This is always free.
643 | lookback_days: Number of days to analyze for ingestion (only used if can_spend_for_estimate=True)
644 |
645 | Returns:
646 | Dict with log_groups, pagination, summary, and pricing_info
647 | """
648 | try:
649 | # Get log groups from DAO
650 | log_groups_data = await self.dao.describe_log_groups(
651 | log_group_name_prefix=log_group_name_prefix
652 | )
653 | log_groups = log_groups_data['log_groups']
654 |
655 | # Get pricing data
656 | pricing = self.pricing_dao.get_pricing_data('logs')
657 | free_tier = self.pricing_dao.get_free_tier_limits()
658 |
659 | # Calculate spend for each log group
660 | log_groups_with_spend = []
661 | total_storage_gb = 0
662 | total_ingestion_gb = 0
663 |
664 | for lg in log_groups:
665 | stored_bytes = lg.get('storedBytes', 0)
666 | stored_gb = stored_bytes / (1024**3)
667 | retention_days = lg.get('retentionInDays', 0)
668 | log_group_name = lg.get('logGroupName')
669 |
670 | # Calculate storage cost (always accurate)
671 | storage_cost = stored_gb * pricing['storage_per_gb_month']
672 |
673 | # Calculate ingestion cost
674 | if can_spend_for_estimate:
675 | # Option 1: Use CloudWatch Metrics API for accurate data (PAID)
676 | try:
677 | end_time = datetime.now(timezone.utc)
678 | start_time = end_time - timedelta(days=lookback_days)
679 |
680 | ingestion_data = await self.dao.get_metric_statistics(
681 | namespace='AWS/Logs',
682 | metric_name='IncomingBytes',
683 | dimensions=[{'Name': 'LogGroupName', 'Value': log_group_name}],
684 | start_time=start_time,
685 | end_time=end_time,
686 | period=86400, # Daily aggregation
687 | statistics=['Sum']
688 | )
689 |
690 | # Calculate total ingestion in GB
691 | total_bytes = sum(dp['Sum'] for dp in ingestion_data['datapoints'])
692 | ingestion_gb = total_bytes / (1024**3)
693 |
694 | # Normalize to monthly rate
695 | monthly_ingestion_gb = (ingestion_gb / lookback_days) * 30
696 | ingestion_cost = monthly_ingestion_gb * pricing['ingestion_per_gb']
697 | estimation_method = 'accurate_paid'
698 | confidence = 'high'
699 |
700 | except Exception as e:
701 | logger.warning(f"Failed to get ingestion metrics for {log_group_name}: {str(e)}")
702 | # Fallback to free estimation if metrics fail
703 | if estimate_ingestion_from_metadata:
704 | result = self._estimate_ingestion_from_metadata(lg, stored_gb)
705 | monthly_ingestion_gb = result['monthly_ingestion_gb']
706 | ingestion_cost = monthly_ingestion_gb * pricing['ingestion_per_gb']
707 | estimation_method = result['estimation_method']
708 | confidence = result['confidence']
709 | else:
710 | monthly_ingestion_gb = 0
711 | ingestion_cost = 0
712 | estimation_method = 'storage_only'
713 | confidence = 'high'
714 |
715 | elif estimate_ingestion_from_metadata:
716 | # Option 2: Estimate from metadata (FREE - uses retention/age)
717 | result = self._estimate_ingestion_from_metadata(lg, stored_gb)
718 | monthly_ingestion_gb = result['monthly_ingestion_gb']
719 | ingestion_cost = monthly_ingestion_gb * pricing['ingestion_per_gb']
720 | estimation_method = result['estimation_method']
721 | confidence = result['confidence']
722 |
723 | else:
724 | # Option 3: Storage only (FREE - most conservative)
725 | monthly_ingestion_gb = 0
726 | ingestion_cost = 0
727 | estimation_method = 'storage_only'
728 | confidence = 'high'
729 |
730 | total_cost = storage_cost + ingestion_cost
731 | total_storage_gb += stored_gb
732 | total_ingestion_gb += monthly_ingestion_gb
733 |
734 | log_groups_with_spend.append({
735 | 'log_group_name': log_group_name,
736 | 'stored_gb': round(stored_gb, 4),
737 | 'stored_bytes': stored_bytes,
738 | 'retention_days': retention_days if retention_days else 'Never Expire',
739 | 'creation_time': lg.get('creationTime'),
740 | 'cost_breakdown': {
741 | 'storage_cost': round(storage_cost, 4),
742 | 'ingestion_cost': round(ingestion_cost, 4),
743 | 'ingestion_gb_monthly': round(monthly_ingestion_gb, 4) if monthly_ingestion_gb > 0 else None
744 | },
745 | 'estimated_monthly_cost': round(total_cost, 4),
746 | 'estimated_annual_cost': round(total_cost * 12, 2),
747 | 'estimation_method': estimation_method,
748 | 'estimation_confidence': confidence
749 | })
750 |
751 | # Sort by spend descending
752 | log_groups_with_spend.sort(key=lambda x: x['estimated_monthly_cost'], reverse=True)
753 |
754 | # Paginate
755 | total_items = len(log_groups_with_spend)
756 | total_pages = (total_items + self._page_size - 1) // self._page_size
757 | start_idx = (page - 1) * self._page_size
758 | end_idx = start_idx + self._page_size
759 | paginated_logs = log_groups_with_spend[start_idx:end_idx]
760 |
761 | # Calculate totals
762 | total_monthly_cost = sum(lg['estimated_monthly_cost'] for lg in log_groups_with_spend)
763 |
764 | # Free tier analysis
765 | storage_billable = max(0, total_storage_gb - free_tier['logs_storage_gb'])
766 | ingestion_billable = max(0, total_ingestion_gb - free_tier['logs_ingestion_gb']) if can_spend_for_estimate else 0
767 |
768 | return {
769 | 'status': 'success',
770 | 'log_groups': paginated_logs,
771 | 'pagination': {
772 | 'current_page': page,
773 | 'page_size': self._page_size,
774 | 'total_items': total_items,
775 | 'total_pages': total_pages,
776 | 'has_next': page < total_pages,
777 | 'has_previous': page > 1
778 | },
779 | 'summary': {
780 | 'total_log_groups': total_items,
781 | 'total_storage_gb': round(total_storage_gb, 4),
782 | 'total_ingestion_gb_monthly': round(total_ingestion_gb, 4) if can_spend_for_estimate else None,
783 | 'free_tier_limit_gb': free_tier['logs_storage_gb'],
784 | 'free_tier_remaining_gb': round(max(0, free_tier['logs_storage_gb'] - total_storage_gb), 4),
785 | 'billable_storage_gb': round(storage_billable, 4),
786 | 'billable_ingestion_gb': round(ingestion_billable, 4) if can_spend_for_estimate else None,
787 | 'total_estimated_monthly_cost': round(total_monthly_cost, 4),
788 | 'total_estimated_annual_cost': round(total_monthly_cost * 12, 2),
789 | 'estimation_method': 'accurate' if can_spend_for_estimate else 'storage_only'
790 | },
791 | 'pricing_info': pricing
792 | }
793 |
794 | except Exception as e:
795 | logger.error(f"Error getting logs by spend: {str(e)}")
796 | return {
797 | 'status': 'error',
798 | 'message': str(e),
799 | 'log_groups': [],
800 | 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0}
801 | }
802 |
803 | def _estimate_ingestion_from_metadata(self, log_group: Dict[str, Any], stored_gb: float) -> Dict[str, Any]:
804 | """
805 | Estimate monthly ingestion using only free metadata (retention policy and age).
806 |
807 | This is a FREE operation that uses only data from describe_log_groups.
808 |
809 | Estimation methods (in order of preference):
810 | 1. Retention-based: If retention policy is set, assumes steady-state where
811 | storage = retention_days × daily_ingestion
812 | 2. Age-based: Uses log group age to calculate average daily ingestion rate
813 | 3. No estimate: If insufficient data
814 |
815 | Args:
816 | log_group: Log group metadata from describe_log_groups
817 | stored_gb: Current storage in GB
818 |
819 | Returns:
820 | Dict with monthly_ingestion_gb, estimation_method, and confidence level
821 | """
822 | retention_days = log_group.get('retentionInDays')
823 | creation_time = log_group.get('creationTime')
824 |
825 | # Method 1: Retention-based estimation (most reliable for logs with retention)
826 | if retention_days and retention_days > 0:
827 | # Steady-state assumption: storage = retention_days × daily_ingestion
828 | # Therefore: daily_ingestion = storage / retention_days
829 | daily_ingestion_gb = stored_gb / retention_days
830 | monthly_ingestion_gb = daily_ingestion_gb * 30
831 |
832 | return {
833 | 'monthly_ingestion_gb': monthly_ingestion_gb,
834 | 'estimation_method': 'retention_based_free',
835 | 'confidence': 'medium'
836 | }
837 |
838 | # Method 2: Age-based estimation (for logs without retention)
839 | if creation_time:
840 | age_days = (datetime.now(timezone.utc) -
841 | datetime.fromtimestamp(creation_time / 1000, tz=timezone.utc)).days
842 |
843 | if age_days >= 30: # Need at least 30 days for reasonable estimate
844 | # Average daily ingestion = total storage / age
845 | daily_ingestion_gb = stored_gb / age_days
846 | monthly_ingestion_gb = daily_ingestion_gb * 30
847 |
848 | return {
849 | 'monthly_ingestion_gb': monthly_ingestion_gb,
850 | 'estimation_method': 'age_based_free',
851 | 'confidence': 'low'
852 | }
853 | elif age_days > 0:
854 | # Too new for reliable estimate, but provide rough estimate
855 | daily_ingestion_gb = stored_gb / age_days
856 | monthly_ingestion_gb = daily_ingestion_gb * 30
857 |
858 | return {
859 | 'monthly_ingestion_gb': monthly_ingestion_gb,
860 | 'estimation_method': 'age_based_free_unreliable',
861 | 'confidence': 'very_low'
862 | }
863 |
864 | # Method 3: No estimate possible
865 | return {
866 | 'monthly_ingestion_gb': 0,
867 | 'estimation_method': 'storage_only',
868 | 'confidence': 'high' # High confidence in storage cost, no ingestion estimate
869 | }
870 |
871 | async def getMetrics(self, page: int = 1, namespace_filter: Optional[str] = None) -> Dict[str, Any]:
872 | """
873 | Get custom metrics ordered by dimension count (descending), paginated.
874 |
875 | Args:
876 | page: Page number (1-based)
877 | namespace_filter: Optional filter for namespace
878 |
879 | Returns:
880 | Dict with custom_metrics, pagination, summary, and pricing_info
881 | """
882 | try:
883 | # Get metrics from DAO
884 | metrics_data = await self.dao.list_metrics(namespace=namespace_filter)
885 | all_metrics = metrics_data['metrics']
886 |
887 | # Filter to custom metrics only (exclude AWS/* namespaces)
888 | custom_metrics = [m for m in all_metrics if not m.get('Namespace', '').startswith('AWS/')]
889 |
890 | # Get pricing data
891 | pricing = self.pricing_dao.get_pricing_data('metrics')
892 | free_tier = self.pricing_dao.get_free_tier_limits()
893 |
894 | # Calculate dimension count and cost for each metric
895 | metrics_with_info = []
896 | for metric in custom_metrics:
897 | dimensions = metric.get('Dimensions', [])
898 | dimension_count = len(dimensions)
899 |
900 | # Cost per metric
901 | metric_cost = pricing['custom_metrics_per_metric']
902 |
903 | metrics_with_info.append({
904 | 'namespace': metric.get('Namespace'),
905 | 'metric_name': metric.get('MetricName'),
906 | 'dimensions': dimensions,
907 | 'dimension_count': dimension_count,
908 | 'estimated_monthly_cost': round(metric_cost, 4),
909 | 'estimated_annual_cost': round(metric_cost * 12, 2)
910 | })
911 |
912 | # Sort by dimension count descending
913 | metrics_with_info.sort(key=lambda x: x['dimension_count'], reverse=True)
914 |
915 | # Paginate
916 | total_items = len(metrics_with_info)
917 | total_pages = (total_items + self._page_size - 1) // self._page_size
918 | start_idx = (page - 1) * self._page_size
919 | end_idx = start_idx + self._page_size
920 | paginated_metrics = metrics_with_info[start_idx:end_idx]
921 |
922 | # Calculate totals
923 | total_monthly_cost = total_items * pricing['custom_metrics_per_metric']
924 | billable_metrics = max(0, total_items - free_tier['metrics_count'])
925 | billable_cost = billable_metrics * pricing['custom_metrics_per_metric']
926 |
927 | return {
928 | 'status': 'success',
929 | 'custom_metrics': paginated_metrics,
930 | 'pagination': {
931 | 'current_page': page,
932 | 'page_size': self._page_size,
933 | 'total_items': total_items,
934 | 'total_pages': total_pages,
935 | 'has_next': page < total_pages,
936 | 'has_previous': page > 1
937 | },
938 | 'summary': {
939 | 'total_custom_metrics': total_items,
940 | 'free_tier_limit': free_tier['metrics_count'],
941 | 'free_tier_remaining': max(0, free_tier['metrics_count'] - total_items),
942 | 'billable_metrics': billable_metrics,
943 | 'total_estimated_monthly_cost': round(billable_cost, 4),
944 | 'total_estimated_annual_cost': round(billable_cost * 12, 2)
945 | },
946 | 'pricing_info': pricing
947 | }
948 |
949 | except Exception as e:
950 | logger.error(f"Error getting metrics by dimension count: {str(e)}")
951 | return {
952 | 'status': 'error',
953 | 'message': str(e),
954 | 'custom_metrics': [],
955 | 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0}
956 | }
957 |
958 | async def getDashboards(self, page: int = 1, dashboard_name_prefix: Optional[str] = None) -> Dict[str, Any]:
959 | """
960 | Get dashboards ordered by complexity score (descending), paginated.
961 |
962 | Complexity score is calculated based on:
963 | - Widget count (each widget adds to complexity)
964 | - Custom metrics count (higher weight as they cost more)
965 | - Total metrics count (operational complexity)
966 |
967 | Args:
968 | page: Page number (1-based)
969 | dashboard_name_prefix: Optional filter for dashboard names
970 |
971 | Returns:
972 | Dict with dashboards, pagination, summary, and pricing_info
973 | """
974 | try:
975 | # Get dashboards from DAO
976 | dashboards_data = await self.dao.list_dashboards(dashboard_name_prefix=dashboard_name_prefix)
977 | dashboards = dashboards_data['dashboards']
978 |
979 | # Get pricing data
980 | dashboard_pricing = self.pricing_dao.get_pricing_data('dashboards')
981 | metrics_pricing = self.pricing_dao.get_pricing_data('metrics')
982 | free_tier = self.pricing_dao.get_free_tier_limits()
983 |
984 | # Analyze each dashboard
985 | dashboards_with_info = []
986 | for dashboard in dashboards:
987 | dashboard_name = dashboard.get('DashboardName')
988 |
989 | # Get dashboard configuration to analyze complexity
990 | custom_metrics_count = 0
991 | total_metrics_count = 0
992 | widget_count = 0
993 |
994 | try:
995 | dashboard_config = await self.dao.get_dashboard(dashboard_name)
996 | dashboard_body = json.loads(dashboard_config.get('dashboard_body', '{}'))
997 |
998 | # Analyze widgets and metrics
999 | widgets = dashboard_body.get('widgets', [])
1000 | widget_count = len(widgets)
1001 |
1002 | for widget in widgets:
1003 | properties = widget.get('properties', {})
1004 | metrics = properties.get('metrics', [])
1005 |
1006 | # Count all metrics and custom metrics
1007 | for metric in metrics:
1008 | if isinstance(metric, list) and len(metric) > 0:
1009 | total_metrics_count += 1
1010 | namespace = metric[0] if isinstance(metric[0], str) else ''
1011 | if namespace and not namespace.startswith('AWS/'):
1012 | custom_metrics_count += 1
1013 |
1014 | except Exception as e:
1015 | logger.warning(f"Could not analyze dashboard {dashboard_name}: {str(e)}")
1016 |
1017 | # Calculate complexity score
1018 | # Formula: (custom_metrics * 3) + (total_metrics * 1) + (widgets * 2)
1019 | # Custom metrics weighted higher due to cost impact
1020 | complexity_score = (custom_metrics_count * 3) + (total_metrics_count * 1) + (widget_count * 2)
1021 |
1022 | # Calculate costs
1023 | dashboard_cost = dashboard_pricing['dashboard_per_month']
1024 | custom_metrics_cost = custom_metrics_count * metrics_pricing['custom_metrics_per_metric']
1025 | total_cost = dashboard_cost + custom_metrics_cost
1026 |
1027 | dashboards_with_info.append({
1028 | 'dashboard_name': dashboard_name,
1029 | 'dashboard_arn': dashboard.get('DashboardArn'),
1030 | 'last_modified': dashboard.get('LastModified'),
1031 | 'size': dashboard.get('Size'),
1032 | 'widget_count': widget_count,
1033 | 'total_metrics_count': total_metrics_count,
1034 | 'custom_metrics_count': custom_metrics_count,
1035 | 'complexity_score': complexity_score,
1036 | 'dashboard_cost': round(dashboard_cost, 4),
1037 | 'custom_metrics_cost': round(custom_metrics_cost, 4),
1038 | 'total_estimated_monthly_cost': round(total_cost, 4),
1039 | 'estimated_annual_cost': round(total_cost * 12, 2)
1040 | })
1041 |
1042 | # Sort by complexity score descending (combines cost and operational complexity)
1043 | dashboards_with_info.sort(key=lambda x: x['complexity_score'], reverse=True)
1044 |
1045 | # Paginate
1046 | total_items = len(dashboards_with_info)
1047 | total_pages = (total_items + self._page_size - 1) // self._page_size
1048 | start_idx = (page - 1) * self._page_size
1049 | end_idx = start_idx + self._page_size
1050 | paginated_dashboards = dashboards_with_info[start_idx:end_idx]
1051 |
1052 | # Calculate totals
1053 | billable_dashboards = max(0, total_items - free_tier['dashboards_count'])
1054 | total_dashboard_cost = billable_dashboards * dashboard_pricing['dashboard_per_month']
1055 | total_metrics_cost = sum(d['custom_metrics_cost'] for d in dashboards_with_info)
1056 | total_monthly_cost = total_dashboard_cost + total_metrics_cost
1057 |
1058 | return {
1059 | 'status': 'success',
1060 | 'dashboards': paginated_dashboards,
1061 | 'pagination': {
1062 | 'current_page': page,
1063 | 'page_size': self._page_size,
1064 | 'total_items': total_items,
1065 | 'total_pages': total_pages,
1066 | 'has_next': page < total_pages,
1067 | 'has_previous': page > 1
1068 | },
1069 | 'summary': {
1070 | 'total_dashboards': total_items,
1071 | 'free_tier_limit': free_tier['dashboards_count'],
1072 | 'free_tier_remaining': max(0, free_tier['dashboards_count'] - total_items),
1073 | 'billable_dashboards': billable_dashboards,
1074 | 'total_dashboard_cost': round(total_dashboard_cost, 4),
1075 | 'total_custom_metrics_cost': round(total_metrics_cost, 4),
1076 | 'total_estimated_monthly_cost': round(total_monthly_cost, 4),
1077 | 'total_estimated_annual_cost': round(total_monthly_cost * 12, 2)
1078 | },
1079 | 'pricing_info': {
1080 | 'dashboard': dashboard_pricing,
1081 | 'metrics': metrics_pricing
1082 | }
1083 | }
1084 |
1085 | except Exception as e:
1086 | logger.error(f"Error getting dashboards by custom metrics count: {str(e)}")
1087 | return {
1088 | 'status': 'error',
1089 | 'message': str(e),
1090 | 'dashboards': [],
1091 | 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0}
1092 | }
1093 |
1094 | async def getAlarms(self, page: int = 1, alarm_name_prefix: Optional[str] = None,
1095 | state_value: Optional[str] = None) -> Dict[str, Any]:
1096 | """
1097 | Get alarms with cost information, paginated.
1098 |
1099 | Args:
1100 | page: Page number (1-based)
1101 | alarm_name_prefix: Optional filter for alarm names
1102 | state_value: Optional filter for alarm state (OK, ALARM, INSUFFICIENT_DATA)
1103 |
1104 | Returns:
1105 | Dict with alarms, pagination, summary, and pricing_info
1106 | """
1107 | try:
1108 | # Get alarms from DAO
1109 | alarms_data = await self.dao.describe_alarms(
1110 | alarm_name_prefix=alarm_name_prefix,
1111 | state_value=state_value
1112 | )
1113 | alarms = alarms_data['alarms']
1114 |
1115 | # Get pricing data
1116 | pricing = self.pricing_dao.get_pricing_data('alarms')
1117 | free_tier = self.pricing_dao.get_free_tier_limits()
1118 |
1119 | # Analyze each alarm
1120 | alarms_with_info = []
1121 | standard_count = 0
1122 | high_resolution_count = 0
1123 | composite_count = 0
1124 |
1125 | for alarm in alarms:
1126 | # Determine alarm type
1127 | if 'MetricName' in alarm:
1128 | period = alarm.get('Period', 300)
1129 | if period < 300:
1130 | alarm_type = 'high_resolution'
1131 | alarm_cost = pricing['high_resolution_alarms_per_alarm']
1132 | high_resolution_count += 1
1133 | else:
1134 | alarm_type = 'standard'
1135 | alarm_cost = pricing['standard_alarms_per_alarm']
1136 | standard_count += 1
1137 | else:
1138 | alarm_type = 'composite'
1139 | alarm_cost = pricing['composite_alarms_per_alarm']
1140 | composite_count += 1
1141 |
1142 | # Check if alarm has actions
1143 | has_actions = bool(
1144 | alarm.get('AlarmActions') or
1145 | alarm.get('OKActions') or
1146 | alarm.get('InsufficientDataActions')
1147 | )
1148 |
1149 | alarms_with_info.append({
1150 | 'alarm_name': alarm.get('AlarmName'),
1151 | 'alarm_arn': alarm.get('AlarmArn'),
1152 | 'alarm_type': alarm_type,
1153 | 'state_value': alarm.get('StateValue'),
1154 | 'state_reason': alarm.get('StateReason'),
1155 | 'metric_name': alarm.get('MetricName'),
1156 | 'namespace': alarm.get('Namespace'),
1157 | 'period': alarm.get('Period'),
1158 | 'has_actions': has_actions,
1159 | 'actions_enabled': alarm.get('ActionsEnabled', False),
1160 | 'estimated_monthly_cost': round(alarm_cost, 4),
1161 | 'estimated_annual_cost': round(alarm_cost * 12, 2)
1162 | })
1163 |
1164 | # Paginate
1165 | total_items = len(alarms_with_info)
1166 | total_pages = (total_items + self._page_size - 1) // self._page_size
1167 | start_idx = (page - 1) * self._page_size
1168 | end_idx = start_idx + self._page_size
1169 | paginated_alarms = alarms_with_info[start_idx:end_idx]
1170 |
1171 | # Calculate totals
1172 | billable_standard = max(0, standard_count - free_tier['alarms_count'])
1173 | total_monthly_cost = (
1174 | billable_standard * pricing['standard_alarms_per_alarm'] +
1175 | high_resolution_count * pricing['high_resolution_alarms_per_alarm'] +
1176 | composite_count * pricing['composite_alarms_per_alarm']
1177 | )
1178 |
1179 | return {
1180 | 'status': 'success',
1181 | 'alarms': paginated_alarms,
1182 | 'pagination': {
1183 | 'current_page': page,
1184 | 'page_size': self._page_size,
1185 | 'total_items': total_items,
1186 | 'total_pages': total_pages,
1187 | 'has_next': page < total_pages,
1188 | 'has_previous': page > 1
1189 | },
1190 | 'summary': {
1191 | 'total_alarms': total_items,
1192 | 'standard_alarms': standard_count,
1193 | 'high_resolution_alarms': high_resolution_count,
1194 | 'composite_alarms': composite_count,
1195 | 'free_tier_limit': free_tier['alarms_count'],
1196 | 'free_tier_remaining': max(0, free_tier['alarms_count'] - standard_count),
1197 | 'billable_standard_alarms': billable_standard,
1198 | 'total_estimated_monthly_cost': round(total_monthly_cost, 4),
1199 | 'total_estimated_annual_cost': round(total_monthly_cost * 12, 2)
1200 | },
1201 | 'pricing_info': pricing
1202 | }
1203 |
1204 | except Exception as e:
1205 | logger.error(f"Error getting alarms: {str(e)}")
1206 | return {
1207 | 'status': 'error',
1208 | 'message': str(e),
1209 | 'alarms': [],
1210 | 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0}
1211 | }
1212 |
1213 |
1214 | class CWMetricsTips:
1215 | """CloudWatch custom metrics optimization and analysis."""
1216 |
1217 | def __init__(self, dao: CloudWatchDAO, pricing_dao: AWSPricingDAO, cost_preferences: CostPreferences):
1218 | self.dao = dao
1219 | self.pricing_dao = pricing_dao
1220 | self.cost_preferences = cost_preferences
1221 | self._page_size = 10 # Items per page
1222 |
1223 | async def listInstancesWithDetailedMonitoring(self, page: int = 1) -> Dict[str, Any]:
1224 | """
1225 | Get paginated list of EC2 instances with detailed monitoring enabled.
1226 |
1227 | Detailed monitoring costs $2.10/month per instance vs basic monitoring (free).
1228 |
1229 | Args:
1230 | page: Page number (1-based)
1231 |
1232 | Returns:
1233 | Dict with instances, pagination, summary, and cost_analysis
1234 | """
1235 | try:
1236 | # Initialize EC2 client
1237 | ec2_client = boto3.client('ec2', region_name=self.dao.region)
1238 |
1239 | # Get all instances
1240 | instances_with_detailed = []
1241 | paginator = ec2_client.get_paginator('describe_instances')
1242 |
1243 | for page_response in paginator.paginate():
1244 | for reservation in page_response.get('Reservations', []):
1245 | for instance in reservation.get('Instances', []):
1246 | # Check if detailed monitoring is enabled
1247 | monitoring_state = instance.get('Monitoring', {}).get('State', 'disabled')
1248 |
1249 | if monitoring_state == 'enabled':
1250 | instance_id = instance.get('InstanceId')
1251 | instance_type = instance.get('InstanceType')
1252 | state = instance.get('State', {}).get('Name')
1253 |
1254 | # Get instance name from tags
1255 | instance_name = None
1256 | for tag in instance.get('Tags', []):
1257 | if tag.get('Key') == 'Name':
1258 | instance_name = tag.get('Value')
1259 | break
1260 |
1261 | instances_with_detailed.append({
1262 | 'instance_id': instance_id,
1263 | 'instance_name': instance_name,
1264 | 'instance_type': instance_type,
1265 | 'state': state,
1266 | 'monitoring_state': monitoring_state,
1267 | 'launch_time': instance.get('LaunchTime').isoformat() if instance.get('LaunchTime') else None
1268 | })
1269 |
1270 | # Get pricing
1271 | pricing = self.pricing_dao.get_pricing_data('metrics')
1272 | detailed_monitoring_cost = pricing['detailed_monitoring_per_instance']
1273 |
1274 | # Add cost information to each instance
1275 | for instance in instances_with_detailed:
1276 | instance['monthly_cost'] = round(detailed_monitoring_cost, 2)
1277 | instance['annual_cost'] = round(detailed_monitoring_cost * 12, 2)
1278 |
1279 | # Paginate
1280 | total_items = len(instances_with_detailed)
1281 | total_pages = (total_items + self._page_size - 1) // self._page_size
1282 | start_idx = (page - 1) * self._page_size
1283 | end_idx = start_idx + self._page_size
1284 | paginated_instances = instances_with_detailed[start_idx:end_idx]
1285 |
1286 | # Calculate totals
1287 | total_monthly_cost = total_items * detailed_monitoring_cost
1288 |
1289 | return {
1290 | 'status': 'success',
1291 | 'instances': paginated_instances,
1292 | 'pagination': {
1293 | 'current_page': page,
1294 | 'page_size': self._page_size,
1295 | 'total_items': total_items,
1296 | 'total_pages': total_pages,
1297 | 'has_next': page < total_pages,
1298 | 'has_previous': page > 1
1299 | },
1300 | 'summary': {
1301 | 'total_instances_with_detailed_monitoring': total_items,
1302 | 'cost_per_instance_monthly': round(detailed_monitoring_cost, 2),
1303 | 'total_monthly_cost': round(total_monthly_cost, 2),
1304 | 'total_annual_cost': round(total_monthly_cost * 12, 2)
1305 | },
1306 | 'optimization_tip': {
1307 | 'message': 'Consider disabling detailed monitoring for instances that do not require 1-minute metrics',
1308 | 'potential_savings_monthly': round(total_monthly_cost, 2),
1309 | 'potential_savings_annual': round(total_monthly_cost * 12, 2)
1310 | }
1311 | }
1312 |
1313 | except Exception as e:
1314 | logger.error(f"Error listing instances with detailed monitoring: {str(e)}")
1315 | return {
1316 | 'status': 'error',
1317 | 'message': str(e),
1318 | 'instances': [],
1319 | 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0}
1320 | }
1321 |
1322 | async def listCustomMetrics(self, page: int = 1, namespace_filter: Optional[str] = None,
1323 | can_spend_for_exact_usage_estimate: bool = False,
1324 | lookback_days: int = 30) -> Dict[str, Any]:
1325 | """
1326 | Get paginated list of custom metrics sorted by dimension count (descending).
1327 |
1328 | Calls list_metrics twice:
1329 | 1. Without RecentlyActive filter to get all metrics
1330 | 2. With RecentlyActive='PT3H' to flag recently active metrics
1331 |
1332 | If can_spend_for_exact_usage_estimate=True, calls GetMetricData (batched) for the
1333 | current page only to identify actual usage.
1334 |
1335 | Args:
1336 | page: Page number (1-based)
1337 | namespace_filter: Optional filter for namespace
1338 | can_spend_for_exact_usage_estimate: If True, uses GetMetricData API (paid) to get
1339 | exact usage for current page only. If False (default),
1340 | only shows dimension count and recently active flag.
1341 | lookback_days: Number of days to analyze for usage (only used if can_spend_for_exact_usage_estimate=True)
1342 |
1343 | Returns:
1344 | Dict with custom_metrics, pagination, summary, and pricing_info
1345 | """
1346 | try:
1347 | # Call 1: Get all metrics (FREE)
1348 | all_metrics_params = {}
1349 | if namespace_filter:
1350 | all_metrics_params['Namespace'] = namespace_filter
1351 |
1352 | all_metrics = []
1353 | paginator = self.dao.cloudwatch_client.get_paginator('list_metrics')
1354 |
1355 | for page_response in paginator.paginate(**all_metrics_params):
1356 | all_metrics.extend(page_response.get('Metrics', []))
1357 |
1358 | # Filter to custom metrics only (exclude AWS/* namespaces)
1359 | custom_metrics = [m for m in all_metrics if not m.get('Namespace', '').startswith('AWS/')]
1360 |
1361 | # Call 2: Get recently active metrics (PT3H = Past 3 Hours) (FREE)
1362 | recently_active_params = {'RecentlyActive': 'PT3H'}
1363 | if namespace_filter:
1364 | recently_active_params['Namespace'] = namespace_filter
1365 |
1366 | recently_active_metrics = []
1367 | for page_response in paginator.paginate(**recently_active_params):
1368 | recently_active_metrics.extend(page_response.get('Metrics', []))
1369 |
1370 | # Create set of recently active metric identifiers for fast lookup
1371 | recently_active_set = set()
1372 | for metric in recently_active_metrics:
1373 | metric_id = self._get_metric_identifier(metric)
1374 | recently_active_set.add(metric_id)
1375 |
1376 | # Get pricing data
1377 | pricing = self.pricing_dao.get_pricing_data('metrics')
1378 | metric_cost = pricing['custom_metrics_per_metric']
1379 |
1380 | # Process each custom metric (FREE)
1381 | metrics_with_info = []
1382 | for metric in custom_metrics:
1383 | namespace = metric.get('Namespace')
1384 | metric_name = metric.get('MetricName')
1385 | dimensions = metric.get('Dimensions', [])
1386 | dimension_count = len(dimensions)
1387 |
1388 | # Check if recently active
1389 | metric_id = self._get_metric_identifier(metric)
1390 | is_recently_active = metric_id in recently_active_set
1391 |
1392 | metric_info = {
1393 | 'namespace': namespace,
1394 | 'metric_name': metric_name,
1395 | 'dimensions': dimensions,
1396 | 'dimension_count': dimension_count,
1397 | 'recently_active': is_recently_active,
1398 | 'estimated_monthly_cost': round(metric_cost, 4),
1399 | 'estimated_annual_cost': round(metric_cost * 12, 2),
1400 | 'usage_estimation_method': 'dimension_count_free'
1401 | }
1402 |
1403 | metrics_with_info.append(metric_info)
1404 |
1405 | # Sort by dimension count (FREE)
1406 | metrics_with_info.sort(key=lambda x: x['dimension_count'], reverse=True)
1407 |
1408 | # Paginate (FREE)
1409 | total_items = len(metrics_with_info)
1410 | total_pages = (total_items + self._page_size - 1) // self._page_size
1411 | start_idx = (page - 1) * self._page_size
1412 | end_idx = start_idx + self._page_size
1413 | paginated_metrics = metrics_with_info[start_idx:end_idx]
1414 |
1415 | # PAID OPERATION: Analyze current page only (if requested)
1416 | if can_spend_for_exact_usage_estimate and paginated_metrics:
1417 | # Use DAO method for batched analysis with caching
1418 | await self.dao.get_metrics_usage_batch(
1419 | paginated_metrics,
1420 | lookback_days=lookback_days
1421 | )
1422 |
1423 | # Note: DAO updates metrics in place with datapoint_count and usage_estimation_method
1424 |
1425 | # Calculate totals
1426 | free_tier = self.pricing_dao.get_free_tier_limits()
1427 | billable_metrics = max(0, total_items - free_tier['metrics_count'])
1428 | total_monthly_cost = billable_metrics * metric_cost
1429 | recently_active_count = sum(1 for m in metrics_with_info if m['recently_active'])
1430 |
1431 | return {
1432 | 'status': 'success',
1433 | 'custom_metrics': paginated_metrics,
1434 | 'pagination': {
1435 | 'current_page': page,
1436 | 'page_size': self._page_size,
1437 | 'total_items': total_items,
1438 | 'total_pages': total_pages,
1439 | 'has_next': page < total_pages,
1440 | 'has_previous': page > 1
1441 | },
1442 | 'summary': {
1443 | 'total_custom_metrics': total_items,
1444 | 'recently_active_metrics': recently_active_count,
1445 | 'inactive_metrics': total_items - recently_active_count,
1446 | 'free_tier_limit': free_tier['metrics_count'],
1447 | 'free_tier_remaining': max(0, free_tier['metrics_count'] - total_items),
1448 | 'billable_metrics': billable_metrics,
1449 | 'total_estimated_monthly_cost': round(total_monthly_cost, 4),
1450 | 'total_estimated_annual_cost': round(total_monthly_cost * 12, 2),
1451 | 'usage_estimation_method': 'exact_paid_page_only' if can_spend_for_exact_usage_estimate else 'dimension_count_free',
1452 | 'metrics_analyzed_for_usage': len(paginated_metrics) if can_spend_for_exact_usage_estimate else 0
1453 | },
1454 | 'pricing_info': pricing,
1455 | 'optimization_tip': {
1456 | 'message': f'Found {total_items - recently_active_count} metrics not active in past 3 hours. Consider removing unused metrics.',
1457 | 'potential_savings_monthly': round((total_items - recently_active_count) * metric_cost, 2),
1458 | 'potential_savings_annual': round((total_items - recently_active_count) * metric_cost * 12, 2)
1459 | }
1460 | }
1461 |
1462 | except Exception as e:
1463 | logger.error(f"Error listing custom metrics: {str(e)}")
1464 | return {
1465 | 'status': 'error',
1466 | 'message': str(e),
1467 | 'custom_metrics': [],
1468 | 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0}
1469 | }
1470 |
1471 | def _get_metric_identifier(self, metric: Dict[str, Any]) -> str:
1472 | """
1473 | Create a unique identifier for a metric based on namespace, name, and dimensions.
1474 |
1475 | Args:
1476 | metric: Metric dictionary from list_metrics
1477 |
1478 | Returns:
1479 | String identifier for the metric
1480 | """
1481 | namespace = metric.get('Namespace', '')
1482 | metric_name = metric.get('MetricName', '')
1483 | dimensions = metric.get('Dimensions', [])
1484 |
1485 | # Sort dimensions for consistent comparison
1486 | sorted_dims = sorted(dimensions, key=lambda d: d.get('Name', ''))
1487 | dim_str = '|'.join(f"{d.get('Name')}={d.get('Value')}" for d in sorted_dims)
1488 |
1489 | return f"{namespace}::{metric_name}::{dim_str}"
1490 |
1491 | async def analyze_metrics_usage(self, namespace_filter: Optional[str] = None) -> Dict[str, Any]:
1492 | """Analyze custom metrics usage and optimization opportunities."""
1493 | try:
1494 | metrics_data = await self.dao.list_metrics(namespace=namespace_filter)
1495 |
1496 | # Categorize metrics
1497 | aws_metrics = [m for m in metrics_data['metrics']
1498 | if m.get('Namespace', '').startswith('AWS/')]
1499 | custom_metrics = [m for m in metrics_data['metrics']
1500 | if not m.get('Namespace', '').startswith('AWS/')]
1501 |
1502 | # Analyze by namespace
1503 | custom_by_namespace = {}
1504 | for metric in custom_metrics:
1505 | namespace = metric.get('Namespace', 'Unknown')
1506 | if namespace not in custom_by_namespace:
1507 | custom_by_namespace[namespace] = []
1508 | custom_by_namespace[namespace].append(metric)
1509 |
1510 | # Calculate costs
1511 | metrics_cost = self.pricing_dao.calculate_cost('metrics', {
1512 | 'custom_metrics_count': len(custom_metrics),
1513 | 'api_requests_count': 100000,
1514 | 'detailed_monitoring_instances': 0
1515 | })
1516 |
1517 | return {
1518 | 'status': 'success',
1519 | 'metrics_summary': {
1520 | 'total_metrics': len(metrics_data['metrics']),
1521 | 'aws_metrics': len(aws_metrics),
1522 | 'custom_metrics': len(custom_metrics),
1523 | 'custom_by_namespace': {ns: len(metrics) for ns, metrics in custom_by_namespace.items()}
1524 | },
1525 | 'cost_analysis': metrics_cost,
1526 | 'detailed_metrics': {
1527 | 'custom_metrics': custom_metrics[:50],
1528 | 'aws_metrics_sample': aws_metrics[:20]
1529 | }
1530 | }
1531 |
1532 | except Exception as e:
1533 | logger.error(f"Error analyzing metrics usage: {str(e)}")
1534 | return {'status': 'error', 'message': str(e)}
1535 |
1536 |
1537 | class CWLogsTips:
1538 | """CloudWatch Logs optimization and analysis."""
1539 |
1540 | def __init__(self, dao: CloudWatchDAO, pricing_dao: AWSPricingDAO, cost_preferences: CostPreferences):
1541 | self.dao = dao
1542 | self.pricing_dao = pricing_dao
1543 | self.cost_preferences = cost_preferences
1544 | self._page_size = 10 # Items per page
1545 |
1546 | # Import and initialize VendedLogsDAO
1547 | from services.cloudwatch_service_vended_log import VendedLogsDAO
1548 | self._vended_logs_dao = VendedLogsDAO(region=dao.region)
1549 |
1550 | async def analyze_logs_usage(self, log_group_names: Optional[List[str]] = None) -> Dict[str, Any]:
1551 | """Analyze CloudWatch Logs usage and optimization opportunities."""
1552 | try:
1553 | log_groups_data = await self.dao.describe_log_groups(log_group_names=log_group_names)
1554 | log_groups = log_groups_data['log_groups']
1555 |
1556 | # Analyze retention
1557 | without_retention = []
1558 | long_retention = []
1559 |
1560 | for lg in log_groups:
1561 | retention_days = lg.get('retentionInDays')
1562 | if not retention_days:
1563 | without_retention.append(lg.get('logGroupName'))
1564 | elif retention_days > 365:
1565 | long_retention.append({
1566 | 'name': lg.get('logGroupName'),
1567 | 'retention_days': retention_days
1568 | })
1569 |
1570 | # Calculate costs
1571 | total_stored_gb = sum(lg.get('storedBytes', 0) for lg in log_groups) / (1024**3)
1572 | logs_cost = self.pricing_dao.calculate_cost('logs', {
1573 | 'ingestion_gb': total_stored_gb * 0.1,
1574 | 'storage_gb': total_stored_gb,
1575 | 'insights_gb_scanned': 0
1576 | })
1577 |
1578 | return {
1579 | 'status': 'success',
1580 | 'log_groups_summary': {
1581 | 'total_log_groups': len(log_groups),
1582 | 'total_stored_gb': total_stored_gb,
1583 | 'without_retention_policy': len(without_retention),
1584 | 'long_retention_groups': len(long_retention)
1585 | },
1586 | 'cost_analysis': logs_cost,
1587 | 'log_groups_details': log_groups[:50]
1588 | }
1589 |
1590 | except Exception as e:
1591 | logger.error(f"Error analyzing logs usage: {str(e)}")
1592 | return {'status': 'error', 'message': str(e)}
1593 |
1594 | async def listLogsWithoutRetention(self, page: int = 1,
1595 | log_group_name_prefix: Optional[str] = None) -> Dict[str, Any]:
1596 | """
1597 | Get paginated list of log groups without retention policy, sorted by storage size descending.
1598 |
1599 | Log groups without retention policies store logs indefinitely, leading to unbounded costs.
1600 | This method identifies these log groups to help set appropriate retention policies.
1601 |
1602 | Args:
1603 | page: Page number (1-based)
1604 | log_group_name_prefix: Optional filter for log group names
1605 |
1606 | Returns:
1607 | Dict with log_groups, pagination, summary, and optimization recommendations
1608 | """
1609 | try:
1610 | # Get all log groups
1611 | log_groups_data = await self.dao.describe_log_groups(
1612 | log_group_name_prefix=log_group_name_prefix
1613 | )
1614 | log_groups = log_groups_data['log_groups']
1615 |
1616 | # Filter to log groups without retention
1617 | logs_without_retention = []
1618 | for lg in log_groups:
1619 | retention_days = lg.get('retentionInDays')
1620 | if not retention_days: # None or 0 means never expire
1621 | stored_bytes = lg.get('storedBytes', 0)
1622 | stored_gb = stored_bytes / (1024**3)
1623 |
1624 | # Get pricing
1625 | pricing = self.pricing_dao.get_pricing_data('logs')
1626 | storage_cost = stored_gb * pricing['storage_per_gb_month']
1627 |
1628 | logs_without_retention.append({
1629 | 'log_group_name': lg.get('logGroupName'),
1630 | 'stored_gb': round(stored_gb, 4),
1631 | 'stored_bytes': stored_bytes,
1632 | 'creation_time': lg.get('creationTime'),
1633 | 'retention_days': 'Never Expire',
1634 | 'monthly_storage_cost': round(storage_cost, 4),
1635 | 'annual_storage_cost': round(storage_cost * 12, 2),
1636 | 'log_group_class': lg.get('logGroupClass', 'STANDARD')
1637 | })
1638 |
1639 | # Sort by storage size descending
1640 | logs_without_retention.sort(key=lambda x: x['stored_gb'], reverse=True)
1641 |
1642 | # Paginate
1643 | total_items = len(logs_without_retention)
1644 | total_pages = (total_items + self._page_size - 1) // self._page_size
1645 | start_idx = (page - 1) * self._page_size
1646 | end_idx = start_idx + self._page_size
1647 | paginated_logs = logs_without_retention[start_idx:end_idx]
1648 |
1649 | # Calculate totals
1650 | total_storage_gb = sum(lg['stored_gb'] for lg in logs_without_retention)
1651 | total_monthly_cost = sum(lg['monthly_storage_cost'] for lg in logs_without_retention)
1652 |
1653 | return {
1654 | 'status': 'success',
1655 | 'log_groups': paginated_logs,
1656 | 'pagination': {
1657 | 'current_page': page,
1658 | 'page_size': self._page_size,
1659 | 'total_items': total_items,
1660 | 'total_pages': total_pages,
1661 | 'has_next': page < total_pages,
1662 | 'has_previous': page > 1
1663 | },
1664 | 'summary': {
1665 | 'total_log_groups_without_retention': total_items,
1666 | 'total_storage_gb': round(total_storage_gb, 4),
1667 | 'total_monthly_cost': round(total_monthly_cost, 4),
1668 | 'total_annual_cost': round(total_monthly_cost * 12, 2)
1669 | },
1670 | 'optimization_recommendations': {
1671 | 'message': 'Set retention policies to automatically delete old logs and control costs',
1672 | 'recommended_retention_days': [7, 14, 30, 60, 90, 120, 180, 365, 400, 545, 731, 1827, 3653],
1673 | 'common_retention_policies': {
1674 | 'development_logs': '7-14 days',
1675 | 'application_logs': '30-90 days',
1676 | 'audit_logs': '365-3653 days (1-10 years)',
1677 | 'compliance_logs': '2557-3653 days (7-10 years)'
1678 | }
1679 | }
1680 | }
1681 |
1682 | except Exception as e:
1683 | logger.error(f"Error listing logs without retention: {str(e)}")
1684 | return {
1685 | 'status': 'error',
1686 | 'message': str(e),
1687 | 'log_groups': [],
1688 | 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0}
1689 | }
1690 |
1691 | async def listVendedLogTargets(self, page: int = 1,
1692 | log_group_name_prefix: Optional[str] = None) -> Dict[str, Any]:
1693 | """
1694 | Get paginated list of log groups that can be vended directly to S3 to reduce costs.
1695 |
1696 | Vended logs bypass CloudWatch Logs ingestion and storage, significantly reducing costs
1697 | for high-volume service logs like VPC Flow Logs, ELB Access Logs, etc.
1698 |
1699 | This is a complex retrieval task delegated to VendedLogsDAO.
1700 |
1701 | Args:
1702 | page: Page number (1-based)
1703 | log_group_name_prefix: Optional filter for log group names
1704 |
1705 | Returns:
1706 | Dict with vended_log_opportunities, pagination, summary, and implementation guidance
1707 | """
1708 | try:
1709 | # Get all log groups
1710 | log_groups_data = await self.dao.describe_log_groups(
1711 | log_group_name_prefix=log_group_name_prefix
1712 | )
1713 | log_groups = log_groups_data['log_groups']
1714 |
1715 | # Get pricing data
1716 | pricing = self.pricing_dao.get_pricing_data('logs')
1717 |
1718 | # Analyze vended log opportunities using VendedLogsDAO
1719 | opportunities = await self._vended_logs_dao.analyze_vended_log_opportunities(
1720 | log_groups, pricing
1721 | )
1722 |
1723 | # Sort by monthly savings descending (primary sort key for best ROI)
1724 | # This ensures highest-value opportunities appear first
1725 | opportunities.sort(key=lambda x: x['savings']['monthly_savings'], reverse=True)
1726 |
1727 | # Paginate
1728 | total_items = len(opportunities)
1729 | total_pages = (total_items + self._page_size - 1) // self._page_size
1730 | start_idx = (page - 1) * self._page_size
1731 | end_idx = start_idx + self._page_size
1732 | paginated_opportunities = opportunities[start_idx:end_idx]
1733 |
1734 | # Calculate totals
1735 | total_monthly_savings = sum(opp['savings']['monthly_savings'] for opp in opportunities)
1736 | total_annual_savings = sum(opp['savings']['annual_savings'] for opp in opportunities)
1737 | total_current_cost = sum(opp['current_costs']['total_monthly'] for opp in opportunities)
1738 | total_vended_cost = sum(opp['vended_costs']['total_monthly'] for opp in opportunities)
1739 |
1740 | # Group by service type
1741 | by_service = {}
1742 | for opp in opportunities:
1743 | service = opp['service']
1744 | if service not in by_service:
1745 | by_service[service] = {
1746 | 'count': 0,
1747 | 'monthly_savings': 0,
1748 | 'annual_savings': 0
1749 | }
1750 | by_service[service]['count'] += 1
1751 | by_service[service]['monthly_savings'] += opp['savings']['monthly_savings']
1752 | by_service[service]['annual_savings'] += opp['savings']['annual_savings']
1753 |
1754 | return {
1755 | 'status': 'success',
1756 | 'vended_log_opportunities': paginated_opportunities,
1757 | 'pagination': {
1758 | 'current_page': page,
1759 | 'page_size': self._page_size,
1760 | 'total_items': total_items,
1761 | 'total_pages': total_pages,
1762 | 'has_next': page < total_pages,
1763 | 'has_previous': page > 1
1764 | },
1765 | 'summary': {
1766 | 'total_vended_log_opportunities': total_items,
1767 | 'total_current_monthly_cost': round(total_current_cost, 4),
1768 | 'total_vended_monthly_cost': round(total_vended_cost, 4),
1769 | 'total_monthly_savings': round(total_monthly_savings, 4),
1770 | 'total_annual_savings': round(total_annual_savings, 2),
1771 | 'average_cost_reduction_percentage': round(
1772 | (total_monthly_savings / total_current_cost * 100) if total_current_cost > 0 else 0, 1
1773 | ),
1774 | 'opportunities_by_service': by_service
1775 | },
1776 | 'implementation_guidance': {
1777 | 'overview': 'Vended logs bypass CloudWatch Logs and go directly to S3, reducing costs by 60-95%',
1778 | 'benefits': [
1779 | 'Significant cost reduction (60-95% savings)',
1780 | 'No CloudWatch Logs ingestion charges',
1781 | 'Lower storage costs with S3',
1782 | 'Can use S3 lifecycle policies for further cost optimization',
1783 | 'Maintain compliance and audit requirements'
1784 | ],
1785 | 'considerations': [
1786 | 'Logs will no longer be searchable in CloudWatch Logs Insights',
1787 | 'Need to use S3 Select, Athena, or other tools for log analysis',
1788 | 'May require changes to existing log processing workflows',
1789 | 'Some services require recreation of logging configuration'
1790 | ],
1791 | 'next_steps': [
1792 | '1. Review vended log opportunities sorted by savings',
1793 | '2. Check implementation complexity for each service',
1794 | '3. Follow documentation links for specific setup instructions',
1795 | '4. Test with non-critical log groups first',
1796 | '5. Update monitoring and alerting to use S3-based logs'
1797 | ]
1798 | }
1799 | }
1800 |
1801 | except Exception as e:
1802 | logger.error(f"Error listing vended log targets: {str(e)}")
1803 | return {
1804 | 'status': 'error',
1805 | 'message': str(e),
1806 | 'vended_log_opportunities': [],
1807 | 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0}
1808 | }
1809 |
1810 | async def listInfrequentAccessTargets(self, page: int = 1,
1811 | log_group_name_prefix: Optional[str] = None) -> Dict[str, Any]:
1812 | """
1813 | Get paginated list of log groups sorted by retention * storage bytes descending,
1814 | excluding those already using INFREQUENT_ACCESS log group class.
1815 |
1816 | INFREQUENT_ACCESS class reduces storage costs by 50% for logs with retention >= 30 days.
1817 | This method identifies the best candidates for cost savings.
1818 |
1819 | Args:
1820 | page: Page number (1-based)
1821 | log_group_name_prefix: Optional filter for log group names
1822 |
1823 | Returns:
1824 | Dict with log_groups, pagination, summary, and optimization recommendations
1825 | """
1826 | try:
1827 | # Get all log groups
1828 | log_groups_data = await self.dao.describe_log_groups(
1829 | log_group_name_prefix=log_group_name_prefix
1830 | )
1831 | log_groups = log_groups_data['log_groups']
1832 |
1833 | # Get pricing
1834 | pricing = self.pricing_dao.get_pricing_data('logs')
1835 | standard_storage_cost = pricing['storage_per_gb_month']
1836 | infrequent_storage_cost = standard_storage_cost * 0.5 # 50% discount
1837 |
1838 | # Filter and analyze candidates
1839 | infrequent_access_candidates = []
1840 | for lg in log_groups:
1841 | log_group_class = lg.get('logGroupClass', 'STANDARD')
1842 | retention_days = lg.get('retentionInDays', 0)
1843 | stored_bytes = lg.get('storedBytes', 0)
1844 |
1845 | # Skip if already using INFREQUENT_ACCESS
1846 | if log_group_class == 'INFREQUENT_ACCESS':
1847 | continue
1848 |
1849 | # Only consider log groups with retention >= 30 days
1850 | # (INFREQUENT_ACCESS requires minimum 30-day retention)
1851 | if retention_days and retention_days >= 30:
1852 | stored_gb = stored_bytes / (1024**3)
1853 |
1854 | # Calculate priority score: retention * storage
1855 | # Higher score = more data stored for longer = more savings potential
1856 | priority_score = retention_days * stored_bytes
1857 |
1858 | # Calculate current and potential costs
1859 | current_monthly_cost = stored_gb * standard_storage_cost
1860 | infrequent_monthly_cost = stored_gb * infrequent_storage_cost
1861 | monthly_savings = current_monthly_cost - infrequent_monthly_cost
1862 |
1863 | infrequent_access_candidates.append({
1864 | 'log_group_name': lg.get('logGroupName'),
1865 | 'stored_gb': round(stored_gb, 4),
1866 | 'stored_bytes': stored_bytes,
1867 | 'retention_days': retention_days,
1868 | 'current_log_group_class': log_group_class,
1869 | 'priority_score': priority_score,
1870 | 'current_monthly_cost': round(current_monthly_cost, 4),
1871 | 'infrequent_access_monthly_cost': round(infrequent_monthly_cost, 4),
1872 | 'monthly_savings': round(monthly_savings, 4),
1873 | 'annual_savings': round(monthly_savings * 12, 2),
1874 | 'savings_percentage': 50.0
1875 | })
1876 |
1877 | # Sort by priority score descending (retention * storage bytes)
1878 | infrequent_access_candidates.sort(key=lambda x: x['priority_score'], reverse=True)
1879 |
1880 | # Paginate
1881 | total_items = len(infrequent_access_candidates)
1882 | total_pages = (total_items + self._page_size - 1) // self._page_size
1883 | start_idx = (page - 1) * self._page_size
1884 | end_idx = start_idx + self._page_size
1885 | paginated_candidates = infrequent_access_candidates[start_idx:end_idx]
1886 |
1887 | # Calculate totals
1888 | total_monthly_savings = sum(c['monthly_savings'] for c in infrequent_access_candidates)
1889 | total_annual_savings = sum(c['annual_savings'] for c in infrequent_access_candidates)
1890 | total_storage_gb = sum(c['stored_gb'] for c in infrequent_access_candidates)
1891 |
1892 | return {
1893 | 'status': 'success',
1894 | 'log_groups': paginated_candidates,
1895 | 'pagination': {
1896 | 'current_page': page,
1897 | 'page_size': self._page_size,
1898 | 'total_items': total_items,
1899 | 'total_pages': total_pages,
1900 | 'has_next': page < total_pages,
1901 | 'has_previous': page > 1
1902 | },
1903 | 'summary': {
1904 | 'total_infrequent_access_candidates': total_items,
1905 | 'total_storage_gb': round(total_storage_gb, 4),
1906 | 'total_monthly_savings_potential': round(total_monthly_savings, 4),
1907 | 'total_annual_savings_potential': round(total_annual_savings, 2),
1908 | 'average_savings_percentage': 50.0
1909 | },
1910 | 'optimization_recommendations': {
1911 | 'message': 'Switch to INFREQUENT_ACCESS class for 50% storage cost reduction',
1912 | 'requirements': [
1913 | 'Log group must have retention policy of 30 days or more',
1914 | 'Best for logs that are rarely queried',
1915 | 'Higher query costs (CloudWatch Logs Insights charges apply)',
1916 | 'Ideal for compliance/audit logs with long retention'
1917 | ],
1918 | 'implementation': {
1919 | 'method': 'Update log group class using AWS CLI or Console',
1920 | 'cli_command': 'aws logs put-log-group-policy --log-group-name <name> --log-group-class INFREQUENT_ACCESS',
1921 | 'reversible': True,
1922 | 'downtime': 'None - change is immediate'
1923 | },
1924 | 'cost_tradeoffs': {
1925 | 'storage_savings': '50% reduction in storage costs',
1926 | 'query_costs': 'CloudWatch Logs Insights queries cost $0.005/GB scanned',
1927 | 'recommendation': 'Best for logs queried less than 10 times per month'
1928 | }
1929 | }
1930 | }
1931 |
1932 | except Exception as e:
1933 | logger.error(f"Error listing infrequent access targets: {str(e)}")
1934 | return {
1935 | 'status': 'error',
1936 | 'message': str(e),
1937 | 'log_groups': [],
1938 | 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0}
1939 | }
1940 |
1941 | async def listLogAnomalies(self, page: int = 1,
1942 | log_group_name_prefix: Optional[str] = None) -> Dict[str, Any]:
1943 | """
1944 | Get paginated list of log anomalies using ListLogAnomalyDetectors and ListAnomalies.
1945 |
1946 | Identifies log groups with anomaly detection enabled and lists detected anomalies
1947 | to help identify unusual patterns, security issues, or operational problems.
1948 |
1949 | Args:
1950 | page: Page number (1-based)
1951 | log_group_name_prefix: Optional filter for log group names
1952 |
1953 | Returns:
1954 | Dict with anomalies, pagination, summary, and analysis
1955 | """
1956 | try:
1957 | # List all anomaly detectors
1958 | anomaly_detectors = []
1959 | paginator = self.dao.logs_client.get_paginator('list_log_anomaly_detectors')
1960 |
1961 | for page_response in paginator.paginate():
1962 | anomaly_detectors.extend(page_response.get('anomalyDetectors', []))
1963 |
1964 | # Filter by prefix if provided
1965 | if log_group_name_prefix:
1966 | anomaly_detectors = [
1967 | ad for ad in anomaly_detectors
1968 | if ad.get('logGroupArnList', []) and
1969 | any(log_group_name_prefix in arn for arn in ad.get('logGroupArnList', []))
1970 | ]
1971 |
1972 | # Get anomalies for each detector
1973 | all_anomalies = []
1974 | for detector in anomaly_detectors:
1975 | detector_arn = detector.get('anomalyDetectorArn')
1976 | log_group_arns = detector.get('logGroupArnList', [])
1977 |
1978 | try:
1979 | # List anomalies for this detector
1980 | anomalies_response = self.dao.logs_client.list_anomalies(
1981 | anomalyDetectorArn=detector_arn
1982 | )
1983 |
1984 | anomalies = anomalies_response.get('anomalies', [])
1985 |
1986 | for anomaly in anomalies:
1987 | # Extract log group name from ARN
1988 | log_group_names = []
1989 | for arn in log_group_arns:
1990 | # ARN format: arn:aws:logs:region:account:log-group:log-group-name:*
1991 | parts = arn.split(':')
1992 | if len(parts) >= 7:
1993 | log_group_names.append(':'.join(parts[6:-1]))
1994 |
1995 | all_anomalies.append({
1996 | 'anomaly_id': anomaly.get('anomalyId'),
1997 | 'detector_arn': detector_arn,
1998 | 'log_group_names': log_group_names,
1999 | 'first_seen': anomaly.get('firstSeen'),
2000 | 'last_seen': anomaly.get('lastSeen'),
2001 | 'description': anomaly.get('description'),
2002 | 'pattern': anomaly.get('patternString'),
2003 | 'priority': anomaly.get('priority'),
2004 | 'state': anomaly.get('state'),
2005 | 'is_pattern_level_suppression': anomaly.get('isPatternLevelSuppression', False),
2006 | 'detector_evaluation_frequency': detector.get('evaluationFrequency', 'UNKNOWN'),
2007 | 'detector_status': detector.get('anomalyDetectorStatus')
2008 | })
2009 |
2010 | except Exception as e:
2011 | logger.warning(f"Could not list anomalies for detector {detector_arn}: {str(e)}")
2012 | continue
2013 |
2014 | # Sort by last_seen descending (most recent first)
2015 | all_anomalies.sort(key=lambda x: x.get('last_seen', 0), reverse=True)
2016 |
2017 | # Paginate
2018 | total_items = len(all_anomalies)
2019 | total_pages = (total_items + self._page_size - 1) // self._page_size
2020 | start_idx = (page - 1) * self._page_size
2021 | end_idx = start_idx + self._page_size
2022 | paginated_anomalies = all_anomalies[start_idx:end_idx]
2023 |
2024 | # Analyze anomaly patterns
2025 | by_priority = {'HIGH': 0, 'MEDIUM': 0, 'LOW': 0, 'UNKNOWN': 0}
2026 | by_state = {}
2027 | for anomaly in all_anomalies:
2028 | priority = anomaly.get('priority', 'UNKNOWN')
2029 | state = anomaly.get('state', 'UNKNOWN')
2030 | by_priority[priority] = by_priority.get(priority, 0) + 1
2031 | by_state[state] = by_state.get(state, 0) + 1
2032 |
2033 | return {
2034 | 'status': 'success',
2035 | 'anomalies': paginated_anomalies,
2036 | 'pagination': {
2037 | 'current_page': page,
2038 | 'page_size': self._page_size,
2039 | 'total_items': total_items,
2040 | 'total_pages': total_pages,
2041 | 'has_next': page < total_pages,
2042 | 'has_previous': page > 1
2043 | },
2044 | 'summary': {
2045 | 'total_anomalies': total_items,
2046 | 'total_detectors': len(anomaly_detectors),
2047 | 'anomalies_by_priority': by_priority,
2048 | 'anomalies_by_state': by_state
2049 | },
2050 | 'analysis': {
2051 | 'message': 'Log anomalies indicate unusual patterns in your logs',
2052 | 'high_priority_count': by_priority.get('HIGH', 0),
2053 | 'recommendations': [
2054 | 'Investigate high-priority anomalies first',
2055 | 'Review anomaly patterns for security issues',
2056 | 'Consider creating alarms for recurring anomalies',
2057 | 'Update application code to address root causes'
2058 | ]
2059 | }
2060 | }
2061 |
2062 | except Exception as e:
2063 | logger.error(f"Error listing log anomalies: {str(e)}")
2064 | return {
2065 | 'status': 'error',
2066 | 'message': str(e),
2067 | 'anomalies': [],
2068 | 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0}
2069 | }
2070 |
2071 | async def listIneffectiveLogAnomalies(self, page: int = 1,
2072 | log_group_name_prefix: Optional[str] = None) -> Dict[str, Any]:
2073 | """
2074 | Get paginated list of ineffective log anomaly detectors that have found no anomalies
2075 | and could be disabled to reduce costs.
2076 |
2077 | Only includes detectors that have been active for at least half of their anomalyVisibilityTime
2078 | to prevent flagging newly created detectors.
2079 |
2080 | Args:
2081 | page: Page number (1-based)
2082 | log_group_name_prefix: Optional filter for log group names
2083 |
2084 | Returns:
2085 | Dict with ineffective_detectors, pagination, summary, and cost savings
2086 | """
2087 | try:
2088 | # List all anomaly detectors
2089 | anomaly_detectors = []
2090 | paginator = self.dao.logs_client.get_paginator('list_log_anomaly_detectors')
2091 |
2092 | for page_response in paginator.paginate():
2093 | anomaly_detectors.extend(page_response.get('anomalyDetectors', []))
2094 |
2095 | # Filter by prefix if provided
2096 | if log_group_name_prefix:
2097 | anomaly_detectors = [
2098 | ad for ad in anomaly_detectors
2099 | if ad.get('logGroupArnList', []) and
2100 | any(log_group_name_prefix in arn for arn in ad.get('logGroupArnList', []))
2101 | ]
2102 |
2103 | # Analyze each detector for effectiveness
2104 | ineffective_detectors = []
2105 | current_time = datetime.now(timezone.utc)
2106 |
2107 | for detector in anomaly_detectors:
2108 | detector_arn = detector.get('anomalyDetectorArn')
2109 | log_group_arns = detector.get('logGroupArnList', [])
2110 | creation_time = detector.get('creationTimeStamp')
2111 | anomaly_visibility_time = detector.get('anomalyVisibilityTime', 7) # Default 7 days
2112 |
2113 | # Calculate detector age
2114 | if creation_time:
2115 | if isinstance(creation_time, (int, float)):
2116 | creation_datetime = datetime.fromtimestamp(creation_time / 1000, tz=timezone.utc)
2117 | else:
2118 | creation_datetime = creation_time
2119 |
2120 | detector_age_days = (current_time - creation_datetime).days
2121 |
2122 | # Only consider detectors active for at least half of anomalyVisibilityTime
2123 | min_age_days = anomaly_visibility_time / 2
2124 |
2125 | if detector_age_days < min_age_days:
2126 | continue # Skip new detectors
2127 |
2128 | try:
2129 | # Check if detector has found any anomalies
2130 | anomalies_response = self.dao.logs_client.list_anomalies(
2131 | anomalyDetectorArn=detector_arn
2132 | )
2133 |
2134 | anomaly_count = len(anomalies_response.get('anomalies', []))
2135 |
2136 | # If no anomalies found, this detector is ineffective
2137 | if anomaly_count == 0:
2138 | # Extract log group names from ARNs
2139 | log_group_names = []
2140 | for arn in log_group_arns:
2141 | parts = arn.split(':')
2142 | if len(parts) >= 7:
2143 | log_group_names.append(':'.join(parts[6:-1]))
2144 |
2145 | # Estimate cost (anomaly detection is included in CloudWatch Logs costs,
2146 | # but disabling unused detectors reduces processing overhead)
2147 | # Approximate cost: $0.01 per GB of logs analyzed
2148 | estimated_monthly_cost = 0.50 # Conservative estimate per detector
2149 |
2150 | ineffective_detectors.append({
2151 | 'detector_arn': detector_arn,
2152 | 'log_group_names': log_group_names,
2153 | 'log_group_count': len(log_group_names),
2154 | 'creation_time': creation_time,
2155 | 'detector_age_days': detector_age_days,
2156 | 'anomaly_visibility_time_days': anomaly_visibility_time,
2157 | 'anomalies_found': 0,
2158 | 'evaluation_frequency': detector.get('evaluationFrequency', 'UNKNOWN'),
2159 | 'detector_status': detector.get('anomalyDetectorStatus'),
2160 | 'estimated_monthly_cost': round(estimated_monthly_cost, 2),
2161 | 'estimated_annual_cost': round(estimated_monthly_cost * 12, 2),
2162 | 'recommendation': 'Consider disabling this detector as it has not found any anomalies'
2163 | })
2164 |
2165 | except Exception as e:
2166 | logger.warning(f"Could not analyze detector {detector_arn}: {str(e)}")
2167 | continue
2168 |
2169 | # Sort by detector age descending (oldest ineffective detectors first)
2170 | ineffective_detectors.sort(key=lambda x: x['detector_age_days'], reverse=True)
2171 |
2172 | # Paginate
2173 | total_items = len(ineffective_detectors)
2174 | total_pages = (total_items + self._page_size - 1) // self._page_size
2175 | start_idx = (page - 1) * self._page_size
2176 | end_idx = start_idx + self._page_size
2177 | paginated_detectors = ineffective_detectors[start_idx:end_idx]
2178 |
2179 | # Calculate totals
2180 | total_monthly_cost = sum(d['estimated_monthly_cost'] for d in ineffective_detectors)
2181 | total_annual_cost = sum(d['estimated_annual_cost'] for d in ineffective_detectors)
2182 |
2183 | return {
2184 | 'status': 'success',
2185 | 'ineffective_detectors': paginated_detectors,
2186 | 'pagination': {
2187 | 'current_page': page,
2188 | 'page_size': self._page_size,
2189 | 'total_items': total_items,
2190 | 'total_pages': total_pages,
2191 | 'has_next': page < total_pages,
2192 | 'has_previous': page > 1
2193 | },
2194 | 'summary': {
2195 | 'total_ineffective_detectors': total_items,
2196 | 'total_detectors_analyzed': len(anomaly_detectors),
2197 | 'ineffective_percentage': round(
2198 | (total_items / len(anomaly_detectors) * 100) if len(anomaly_detectors) > 0 else 0, 1
2199 | ),
2200 | 'estimated_monthly_cost': round(total_monthly_cost, 2),
2201 | 'estimated_annual_cost': round(total_annual_cost, 2)
2202 | },
2203 | 'optimization_recommendations': {
2204 | 'message': 'Disable ineffective anomaly detectors to reduce processing costs',
2205 | 'criteria': f'Detectors included have been active for at least half of their anomalyVisibilityTime and found no anomalies',
2206 | 'actions': [
2207 | 'Review detector configuration and log patterns',
2208 | 'Consider if anomaly detection is needed for these log groups',
2209 | 'Disable detectors that are not providing value',
2210 | 'Monitor for a period before permanent deletion'
2211 | ],
2212 | 'cost_savings': {
2213 | 'monthly': round(total_monthly_cost, 2),
2214 | 'annual': round(total_annual_cost, 2)
2215 | }
2216 | }
2217 | }
2218 |
2219 | except Exception as e:
2220 | logger.error(f"Error listing ineffective log anomaly detectors: {str(e)}")
2221 | return {
2222 | 'status': 'error',
2223 | 'message': str(e),
2224 | 'ineffective_detectors': [],
2225 | 'pagination': {'current_page': page, 'page_size': self._page_size, 'total_items': 0, 'total_pages': 0}
2226 | }
2227 |
2228 |
2229 | class CWAlarmsTips:
2230 | """CloudWatch Alarms optimization and analysis."""
2231 |
2232 | def __init__(self, dao: CloudWatchDAO, pricing_dao: AWSPricingDAO, cost_preferences: CostPreferences):
2233 | self.dao = dao
2234 | self.pricing_dao = pricing_dao
2235 | self.cost_preferences = cost_preferences
2236 | self._page_size = 10 # Items per page
2237 |
2238 | async def listAlarm(self, page: int = 1, alarm_name_prefix: Optional[str] = None,
2239 | can_use_expensive_cost_to_estimate: bool = False,
2240 | lookback_days: int = 30) -> Dict[str, Any]:
2241 | """
2242 | List all alarms ordered by estimated cost (descending), paginated.
2243 |
2244 | Args:
2245 | page: Page number (1-based)
2246 | alarm_name_prefix: Optional filter for alarm names
2247 | can_use_expensive_cost_to_estimate: If True, uses CloudWatch Metrics API (paid)
2248 | to get accurate alarm evaluation counts. If False
2249 | (default), estimates cost by dimension count.
2250 | lookback_days: Number of days to analyze for evaluation counts (only used if
2251 | can_use_expensive_cost_to_estimate=True)
2252 |
2253 | Returns:
2254 | Dict with alarms, pagination, summary, and pricing_info
2255 | """
2256 | try:
2257 | # Get all alarms from DAO
2258 | alarms_data = await self.dao.describe_alarms(alarm_name_prefix=alarm_name_prefix)
2259 | alarms = alarms_data['alarms']
2260 |
2261 | # Get pricing data
2262 | pricing = self.pricing_dao.get_pricing_data('alarms')
2263 |
2264 | # Calculate cost for each alarm
2265 | alarms_with_cost = []
2266 | for alarm in alarms:
2267 | alarm_info = {
2268 | 'alarm_name': alarm.get('AlarmName'),
2269 | 'alarm_arn': alarm.get('AlarmArn'),
2270 | 'state_value': alarm.get('StateValue'),
2271 | 'state_reason': alarm.get('StateReason'),
2272 | 'actions_enabled': alarm.get('ActionsEnabled', False),
2273 | 'alarm_actions': alarm.get('AlarmActions', []),
2274 | 'ok_actions': alarm.get('OKActions', []),
2275 | 'insufficient_data_actions': alarm.get('InsufficientDataActions', [])
2276 | }
2277 |
2278 | # Determine alarm type and calculate cost
2279 | if 'MetricName' in alarm:
2280 | # Metric alarm
2281 | dimensions = alarm.get('Dimensions', [])
2282 | dimension_count = len(dimensions)
2283 | period = alarm.get('Period', 300)
2284 |
2285 | alarm_info['alarm_type'] = 'metric'
2286 | alarm_info['metric_name'] = alarm.get('MetricName')
2287 | alarm_info['namespace'] = alarm.get('Namespace')
2288 | alarm_info['dimensions'] = dimensions
2289 | alarm_info['dimension_count'] = dimension_count
2290 | alarm_info['period'] = period
2291 |
2292 | # Determine if high resolution
2293 | is_high_resolution = period < 300
2294 | alarm_info['is_high_resolution'] = is_high_resolution
2295 |
2296 | if is_high_resolution:
2297 | alarm_info['monthly_cost'] = pricing['high_resolution_alarms_per_alarm']
2298 | alarm_info['cost_type'] = 'high_resolution'
2299 | else:
2300 | alarm_info['monthly_cost'] = pricing['standard_alarms_per_alarm']
2301 | alarm_info['cost_type'] = 'standard'
2302 |
2303 | # Estimate cost based on dimensions (more dimensions = more complexity)
2304 | alarm_info['estimated_cost_score'] = dimension_count * alarm_info['monthly_cost']
2305 | alarm_info['cost_estimation_method'] = 'dimension_based'
2306 |
2307 | elif 'AlarmRule' in alarm:
2308 | # Composite alarm
2309 | alarm_info['alarm_type'] = 'composite'
2310 | alarm_info['alarm_rule'] = alarm.get('AlarmRule')
2311 | alarm_info['dimension_count'] = 0
2312 | alarm_info['monthly_cost'] = pricing['composite_alarms_per_alarm']
2313 | alarm_info['cost_type'] = 'composite'
2314 | alarm_info['estimated_cost_score'] = alarm_info['monthly_cost']
2315 | alarm_info['cost_estimation_method'] = 'fixed_composite'
2316 | else:
2317 | # Unknown type
2318 | alarm_info['alarm_type'] = 'unknown'
2319 | alarm_info['dimension_count'] = 0
2320 | alarm_info['monthly_cost'] = 0
2321 | alarm_info['cost_type'] = 'unknown'
2322 | alarm_info['estimated_cost_score'] = 0
2323 | alarm_info['cost_estimation_method'] = 'unknown'
2324 |
2325 | # If expensive cost estimation is enabled, get actual evaluation counts
2326 | if can_use_expensive_cost_to_estimate and alarm_info['alarm_type'] == 'metric':
2327 | try:
2328 | end_time = datetime.now(timezone.utc)
2329 | start_time = end_time - timedelta(days=lookback_days)
2330 |
2331 | # Get alarm evaluation metrics
2332 | eval_data = await self.dao.get_metric_statistics(
2333 | namespace='AWS/CloudWatch',
2334 | metric_name='AlarmEvaluations',
2335 | dimensions=[{'Name': 'AlarmName', 'Value': alarm_info['alarm_name']}],
2336 | start_time=start_time,
2337 | end_time=end_time,
2338 | period=86400, # Daily
2339 | statistics=['Sum']
2340 | )
2341 |
2342 | total_evaluations = sum(dp['Sum'] for dp in eval_data['datapoints'])
2343 | alarm_info['evaluation_count'] = int(total_evaluations)
2344 | alarm_info['cost_estimation_method'] = 'actual_evaluations'
2345 |
2346 | # Actual cost is still the fixed monthly rate, but we have usage data
2347 | alarm_info['actual_evaluations_per_day'] = total_evaluations / lookback_days if lookback_days > 0 else 0
2348 |
2349 | except Exception as e:
2350 | logger.warning(f"Failed to get evaluation metrics for {alarm_info['alarm_name']}: {str(e)}")
2351 | alarm_info['evaluation_count'] = None
2352 | alarm_info['cost_estimation_method'] = 'dimension_based_fallback'
2353 |
2354 | alarm_info['annual_cost'] = alarm_info['monthly_cost'] * 12
2355 | alarms_with_cost.append(alarm_info)
2356 |
2357 | # Sort by estimated cost score (descending)
2358 | alarms_with_cost.sort(key=lambda x: x['estimated_cost_score'], reverse=True)
2359 |
2360 | # Calculate pagination
2361 | total_alarms = len(alarms_with_cost)
2362 | total_pages = (total_alarms + self._page_size - 1) // self._page_size
2363 | page = max(1, min(page, total_pages)) if total_pages > 0 else 1
2364 |
2365 | start_idx = (page - 1) * self._page_size
2366 | end_idx = start_idx + self._page_size
2367 | paginated_alarms = alarms_with_cost[start_idx:end_idx]
2368 |
2369 | # Calculate summary
2370 | total_monthly_cost = sum(a['monthly_cost'] for a in alarms_with_cost)
2371 | standard_count = sum(1 for a in alarms_with_cost if a.get('cost_type') == 'standard')
2372 | high_res_count = sum(1 for a in alarms_with_cost if a.get('cost_type') == 'high_resolution')
2373 | composite_count = sum(1 for a in alarms_with_cost if a.get('cost_type') == 'composite')
2374 |
2375 | free_tier = self.pricing_dao.get_free_tier_limits()
2376 | billable_standard = max(0, standard_count - free_tier['alarms_count'])
2377 |
2378 | return {
2379 | 'status': 'success',
2380 | 'alarms': paginated_alarms,
2381 | 'pagination': {
2382 | 'current_page': page,
2383 | 'page_size': self._page_size,
2384 | 'total_items': total_alarms,
2385 | 'total_pages': total_pages,
2386 | 'has_next': page < total_pages,
2387 | 'has_previous': page > 1
2388 | },
2389 | 'summary': {
2390 | 'total_alarms': total_alarms,
2391 | 'standard_alarms': standard_count,
2392 | 'high_resolution_alarms': high_res_count,
2393 | 'composite_alarms': composite_count,
2394 | 'billable_standard_alarms': billable_standard,
2395 | 'total_monthly_cost': round(total_monthly_cost, 2),
2396 | 'total_annual_cost': round(total_monthly_cost * 12, 2),
2397 | 'cost_estimation_method': 'actual_evaluations' if can_use_expensive_cost_to_estimate else 'dimension_based'
2398 | },
2399 | 'pricing_info': {
2400 | 'standard_alarm_cost': pricing['standard_alarms_per_alarm'],
2401 | 'high_resolution_alarm_cost': pricing['high_resolution_alarms_per_alarm'],
2402 | 'composite_alarm_cost': pricing['composite_alarms_per_alarm'],
2403 | 'free_tier_alarms': free_tier['alarms_count']
2404 | },
2405 | 'filters_applied': {
2406 | 'alarm_name_prefix': alarm_name_prefix
2407 | }
2408 | }
2409 |
2410 | except Exception as e:
2411 | logger.error(f"Error listing alarms: {str(e)}")
2412 | return {'status': 'error', 'message': str(e)}
2413 |
2414 | async def listInvalidAlarm(self, page: int = 1, alarm_name_prefix: Optional[str] = None) -> Dict[str, Any]:
2415 | """
2416 | List alarms with INSUFFICIENT_DATA state, ordered by dimension count (descending), paginated.
2417 |
2418 | Alarms in INSUFFICIENT_DATA state may indicate:
2419 | - Misconfigured metrics or dimensions
2420 | - Resources that no longer exist
2421 | - Metrics that are not being published
2422 | - Potential cost waste on non-functional alarms
2423 |
2424 | Args:
2425 | page: Page number (1-based)
2426 | alarm_name_prefix: Optional filter for alarm names
2427 |
2428 | Returns:
2429 | Dict with invalid alarms, pagination, summary, and optimization recommendations
2430 | """
2431 | try:
2432 | # Get alarms with INSUFFICIENT_DATA state
2433 | alarms_data = await self.dao.describe_alarms(
2434 | alarm_name_prefix=alarm_name_prefix,
2435 | state_value='INSUFFICIENT_DATA'
2436 | )
2437 | alarms = alarms_data['alarms']
2438 |
2439 | # Get pricing data
2440 | pricing = self.pricing_dao.get_pricing_data('alarms')
2441 |
2442 | # Process each alarm
2443 | invalid_alarms = []
2444 | for alarm in alarms:
2445 | alarm_info = {
2446 | 'alarm_name': alarm.get('AlarmName'),
2447 | 'alarm_arn': alarm.get('AlarmArn'),
2448 | 'state_value': alarm.get('StateValue'),
2449 | 'state_reason': alarm.get('StateReason'),
2450 | 'state_updated_timestamp': alarm.get('StateUpdatedTimestamp'),
2451 | 'actions_enabled': alarm.get('ActionsEnabled', False),
2452 | 'alarm_actions': alarm.get('AlarmActions', []),
2453 | 'alarm_description': alarm.get('AlarmDescription', '')
2454 | }
2455 |
2456 | # Calculate how long it's been in INSUFFICIENT_DATA state
2457 | state_updated = alarm.get('StateUpdatedTimestamp')
2458 | if state_updated:
2459 | if isinstance(state_updated, str):
2460 | state_updated = datetime.fromisoformat(state_updated.replace('Z', '+00:00'))
2461 | days_in_state = (datetime.now(timezone.utc) - state_updated).days
2462 | alarm_info['days_in_insufficient_data_state'] = days_in_state
2463 | else:
2464 | alarm_info['days_in_insufficient_data_state'] = None
2465 |
2466 | # Determine alarm type and cost
2467 | if 'MetricName' in alarm:
2468 | # Metric alarm
2469 | dimensions = alarm.get('Dimensions', [])
2470 | dimension_count = len(dimensions)
2471 | period = alarm.get('Period', 300)
2472 |
2473 | alarm_info['alarm_type'] = 'metric'
2474 | alarm_info['metric_name'] = alarm.get('MetricName')
2475 | alarm_info['namespace'] = alarm.get('Namespace')
2476 | alarm_info['dimensions'] = dimensions
2477 | alarm_info['dimension_count'] = dimension_count
2478 | alarm_info['period'] = period
2479 |
2480 | # Determine if high resolution
2481 | is_high_resolution = period < 300
2482 | alarm_info['is_high_resolution'] = is_high_resolution
2483 |
2484 | if is_high_resolution:
2485 | alarm_info['monthly_cost'] = pricing['high_resolution_alarms_per_alarm']
2486 | alarm_info['cost_type'] = 'high_resolution'
2487 | else:
2488 | alarm_info['monthly_cost'] = pricing['standard_alarms_per_alarm']
2489 | alarm_info['cost_type'] = 'standard'
2490 |
2491 | elif 'AlarmRule' in alarm:
2492 | # Composite alarm
2493 | alarm_info['alarm_type'] = 'composite'
2494 | alarm_info['alarm_rule'] = alarm.get('AlarmRule')
2495 | alarm_info['dimension_count'] = 0
2496 | alarm_info['monthly_cost'] = pricing['composite_alarms_per_alarm']
2497 | alarm_info['cost_type'] = 'composite'
2498 | else:
2499 | # Unknown type
2500 | alarm_info['alarm_type'] = 'unknown'
2501 | alarm_info['dimension_count'] = 0
2502 | alarm_info['monthly_cost'] = 0
2503 | alarm_info['cost_type'] = 'unknown'
2504 |
2505 | alarm_info['annual_cost'] = alarm_info['monthly_cost'] * 12
2506 |
2507 | # Add optimization recommendation
2508 | if alarm_info.get('days_in_insufficient_data_state', 0) > 7:
2509 | alarm_info['recommendation'] = 'Consider deleting - in INSUFFICIENT_DATA state for over 7 days'
2510 | alarm_info['recommendation_priority'] = 'high'
2511 | elif alarm_info.get('days_in_insufficient_data_state', 0) > 3:
2512 | alarm_info['recommendation'] = 'Review alarm configuration - prolonged INSUFFICIENT_DATA state'
2513 | alarm_info['recommendation_priority'] = 'medium'
2514 | else:
2515 | alarm_info['recommendation'] = 'Monitor - recently entered INSUFFICIENT_DATA state'
2516 | alarm_info['recommendation_priority'] = 'low'
2517 |
2518 | invalid_alarms.append(alarm_info)
2519 |
2520 | # Sort by dimension count (descending) - more complex alarms first
2521 | invalid_alarms.sort(key=lambda x: x.get('dimension_count', 0), reverse=True)
2522 |
2523 | # Calculate pagination
2524 | total_invalid = len(invalid_alarms)
2525 | total_pages = (total_invalid + self._page_size - 1) // self._page_size
2526 | page = max(1, min(page, total_pages)) if total_pages > 0 else 1
2527 |
2528 | start_idx = (page - 1) * self._page_size
2529 | end_idx = start_idx + self._page_size
2530 | paginated_alarms = invalid_alarms[start_idx:end_idx]
2531 |
2532 | # Calculate summary
2533 | total_wasted_monthly_cost = sum(a['monthly_cost'] for a in invalid_alarms)
2534 | high_priority_count = sum(1 for a in invalid_alarms if a.get('recommendation_priority') == 'high')
2535 |
2536 | standard_count = sum(1 for a in invalid_alarms if a.get('cost_type') == 'standard')
2537 | high_res_count = sum(1 for a in invalid_alarms if a.get('cost_type') == 'high_resolution')
2538 | composite_count = sum(1 for a in invalid_alarms if a.get('cost_type') == 'composite')
2539 |
2540 | return {
2541 | 'status': 'success',
2542 | 'invalid_alarms': paginated_alarms,
2543 | 'pagination': {
2544 | 'current_page': page,
2545 | 'page_size': self._page_size,
2546 | 'total_items': total_invalid,
2547 | 'total_pages': total_pages,
2548 | 'has_next': page < total_pages,
2549 | 'has_previous': page > 1
2550 | },
2551 | 'summary': {
2552 | 'total_invalid_alarms': total_invalid,
2553 | 'standard_alarms': standard_count,
2554 | 'high_resolution_alarms': high_res_count,
2555 | 'composite_alarms': composite_count,
2556 | 'high_priority_recommendations': high_priority_count,
2557 | 'potential_monthly_savings': round(total_wasted_monthly_cost, 2),
2558 | 'potential_annual_savings': round(total_wasted_monthly_cost * 12, 2)
2559 | },
2560 | 'optimization_insight': {
2561 | 'message': f'Found {total_invalid} alarms in INSUFFICIENT_DATA state, potentially wasting ${round(total_wasted_monthly_cost, 2)}/month',
2562 | 'action': 'Review and delete or fix alarms that have been in INSUFFICIENT_DATA state for extended periods'
2563 | },
2564 | 'filters_applied': {
2565 | 'alarm_name_prefix': alarm_name_prefix,
2566 | 'state_filter': 'INSUFFICIENT_DATA'
2567 | }
2568 | }
2569 |
2570 | except Exception as e:
2571 | logger.error(f"Error listing invalid alarms: {str(e)}")
2572 | return {'status': 'error', 'message': str(e)}
2573 |
2574 | async def analyze_alarms_usage(self, alarm_names: Optional[List[str]] = None) -> Dict[str, Any]:
2575 | """Analyze CloudWatch Alarms usage and optimization opportunities."""
2576 | try:
2577 | alarms_data = await self.dao.describe_alarms(alarm_names=alarm_names)
2578 | alarms = alarms_data['alarms']
2579 |
2580 | # Categorize alarms
2581 | standard_alarms = 0
2582 | high_resolution_alarms = 0
2583 | composite_alarms = 0
2584 | alarms_without_actions = []
2585 |
2586 | for alarm in alarms:
2587 | if 'MetricName' in alarm:
2588 | if alarm.get('Period', 300) < 300:
2589 | high_resolution_alarms += 1
2590 | else:
2591 | standard_alarms += 1
2592 | else:
2593 | composite_alarms += 1
2594 |
2595 | # Check for actions
2596 | if not (alarm.get('AlarmActions') or alarm.get('OKActions') or
2597 | alarm.get('InsufficientDataActions')):
2598 | alarms_without_actions.append(alarm.get('AlarmName'))
2599 |
2600 | # Calculate costs
2601 | alarms_cost = self.pricing_dao.calculate_cost('alarms', {
2602 | 'standard_alarms_count': standard_alarms,
2603 | 'high_resolution_alarms_count': high_resolution_alarms,
2604 | 'composite_alarms_count': composite_alarms
2605 | })
2606 |
2607 | return {
2608 | 'status': 'success',
2609 | 'alarms_summary': {
2610 | 'total_alarms': len(alarms),
2611 | 'standard_alarms': standard_alarms,
2612 | 'high_resolution_alarms': high_resolution_alarms,
2613 | 'composite_alarms': composite_alarms,
2614 | 'alarms_without_actions': len(alarms_without_actions)
2615 | },
2616 | 'cost_analysis': alarms_cost,
2617 | 'alarms_details': alarms[:50]
2618 | }
2619 |
2620 | except Exception as e:
2621 | logger.error(f"Error analyzing alarms usage: {str(e)}")
2622 | return {'status': 'error', 'message': str(e)}
2623 |
2624 |
2625 | class CWDashboardTips:
2626 | """CloudWatch Dashboards optimization and analysis."""
2627 |
2628 | def __init__(self, dao: CloudWatchDAO, pricing_dao: AWSPricingDAO, cost_preferences: CostPreferences):
2629 | self.dao = dao
2630 | self.pricing_dao = pricing_dao
2631 | self.cost_preferences = cost_preferences
2632 | self._page_size = 10 # Items per page
2633 |
2634 | async def listDashboard(self, page: int = 1, dashboard_name_prefix: Optional[str] = None) -> Dict[str, Any]:
2635 | """
2636 | List dashboards ordered by total metric dimensions referenced (descending), paginated.
2637 |
2638 | Dashboards with more metric dimensions are typically more complex and may:
2639 | - Be more expensive to maintain
2640 | - Have performance issues
2641 | - Be harder to understand and maintain
2642 |
2643 | Args:
2644 | page: Page number (1-based)
2645 | dashboard_name_prefix: Optional filter for dashboard names
2646 |
2647 | Returns:
2648 | Dict with dashboards, pagination, summary, and pricing_info
2649 | """
2650 | try:
2651 | # Get all dashboards from DAO
2652 | dashboards_data = await self.dao.list_dashboards(dashboard_name_prefix=dashboard_name_prefix)
2653 | dashboards = dashboards_data['dashboards']
2654 |
2655 | # Get pricing data
2656 | pricing = self.pricing_dao.get_pricing_data('dashboards')
2657 | free_tier = self.pricing_dao.get_free_tier_limits()
2658 |
2659 | # Process each dashboard
2660 | dashboards_with_metrics = []
2661 | now = datetime.now(timezone.utc)
2662 |
2663 | for dashboard in dashboards:
2664 | dashboard_name = dashboard.get('DashboardName')
2665 |
2666 | dashboard_info = {
2667 | 'dashboard_name': dashboard_name,
2668 | 'dashboard_arn': dashboard.get('DashboardArn'),
2669 | 'last_modified': dashboard.get('LastModified'),
2670 | 'size': dashboard.get('Size', 0)
2671 | }
2672 |
2673 | # Calculate days since last modified
2674 | last_modified = dashboard.get('LastModified')
2675 | if last_modified:
2676 | if isinstance(last_modified, str):
2677 | last_modified = datetime.fromisoformat(last_modified.replace('Z', '+00:00'))
2678 | days_since_modified = (now - last_modified).days
2679 | dashboard_info['days_since_modified'] = days_since_modified
2680 | dashboard_info['is_stale'] = days_since_modified > 90
2681 | else:
2682 | dashboard_info['days_since_modified'] = None
2683 | dashboard_info['is_stale'] = False
2684 |
2685 | # Get dashboard body to analyze metrics
2686 | try:
2687 | dashboard_config = await self.dao.get_dashboard(dashboard_name)
2688 | dashboard_body = dashboard_config.get('dashboard_body', '{}')
2689 |
2690 | # Parse dashboard body
2691 | if isinstance(dashboard_body, str):
2692 | dashboard_body = json.loads(dashboard_body)
2693 |
2694 | # Count metrics and dimensions
2695 | total_metrics = 0
2696 | total_dimensions = 0
2697 | unique_namespaces = set()
2698 | unique_metric_names = set()
2699 |
2700 | widgets = dashboard_body.get('widgets', [])
2701 | for widget in widgets:
2702 | properties = widget.get('properties', {})
2703 | metrics = properties.get('metrics', [])
2704 |
2705 | for metric in metrics:
2706 | if isinstance(metric, list) and len(metric) >= 2:
2707 | # Metric format: [namespace, metric_name, dim1_name, dim1_value, dim2_name, dim2_value, ...]
2708 | namespace = metric[0]
2709 | metric_name = metric[1]
2710 |
2711 | unique_namespaces.add(namespace)
2712 | unique_metric_names.add(f"{namespace}/{metric_name}")
2713 | total_metrics += 1
2714 |
2715 | # Count dimensions (pairs after namespace and metric_name)
2716 | dimension_count = (len(metric) - 2) // 2
2717 | total_dimensions += dimension_count
2718 |
2719 | dashboard_info['total_metrics'] = total_metrics
2720 | dashboard_info['total_dimensions'] = total_dimensions
2721 | dashboard_info['unique_namespaces'] = len(unique_namespaces)
2722 | dashboard_info['unique_metric_names'] = len(unique_metric_names)
2723 | dashboard_info['widget_count'] = len(widgets)
2724 | dashboard_info['avg_dimensions_per_metric'] = round(total_dimensions / total_metrics, 2) if total_metrics > 0 else 0
2725 |
2726 | except Exception as e:
2727 | logger.warning(f"Failed to parse dashboard {dashboard_name}: {str(e)}")
2728 | dashboard_info['total_metrics'] = 0
2729 | dashboard_info['total_dimensions'] = 0
2730 | dashboard_info['unique_namespaces'] = 0
2731 | dashboard_info['unique_metric_names'] = 0
2732 | dashboard_info['widget_count'] = 0
2733 | dashboard_info['avg_dimensions_per_metric'] = 0
2734 | dashboard_info['parse_error'] = str(e)
2735 |
2736 | # Calculate cost (fixed per dashboard)
2737 | dashboard_info['monthly_cost'] = pricing['dashboard_per_month']
2738 | dashboard_info['annual_cost'] = pricing['dashboard_per_month'] * 12
2739 |
2740 | dashboards_with_metrics.append(dashboard_info)
2741 |
2742 | # Sort by total dimensions (descending)
2743 | dashboards_with_metrics.sort(key=lambda x: x['total_dimensions'], reverse=True)
2744 |
2745 | # Calculate pagination
2746 | total_dashboards = len(dashboards_with_metrics)
2747 | total_pages = (total_dashboards + self._page_size - 1) // self._page_size
2748 | page = max(1, min(page, total_pages)) if total_pages > 0 else 1
2749 |
2750 | start_idx = (page - 1) * self._page_size
2751 | end_idx = start_idx + self._page_size
2752 | paginated_dashboards = dashboards_with_metrics[start_idx:end_idx]
2753 |
2754 | # Calculate summary
2755 | billable_dashboards = max(0, total_dashboards - free_tier['dashboards_count'])
2756 | total_monthly_cost = billable_dashboards * pricing['dashboard_per_month']
2757 |
2758 | total_metrics_all = sum(d['total_metrics'] for d in dashboards_with_metrics)
2759 | total_dimensions_all = sum(d['total_dimensions'] for d in dashboards_with_metrics)
2760 | stale_count = sum(1 for d in dashboards_with_metrics if d['is_stale'])
2761 |
2762 | return {
2763 | 'status': 'success',
2764 | 'dashboards': paginated_dashboards,
2765 | 'pagination': {
2766 | 'current_page': page,
2767 | 'page_size': self._page_size,
2768 | 'total_items': total_dashboards,
2769 | 'total_pages': total_pages,
2770 | 'has_next': page < total_pages,
2771 | 'has_previous': page > 1
2772 | },
2773 | 'summary': {
2774 | 'total_dashboards': total_dashboards,
2775 | 'billable_dashboards': billable_dashboards,
2776 | 'free_tier_dashboards': min(total_dashboards, free_tier['dashboards_count']),
2777 | 'total_metrics': total_metrics_all,
2778 | 'total_dimensions': total_dimensions_all,
2779 | 'avg_dimensions_per_dashboard': round(total_dimensions_all / total_dashboards, 2) if total_dashboards > 0 else 0,
2780 | 'stale_dashboards': stale_count,
2781 | 'total_monthly_cost': round(total_monthly_cost, 2),
2782 | 'total_annual_cost': round(total_monthly_cost * 12, 2)
2783 | },
2784 | 'pricing_info': {
2785 | 'dashboard_cost': pricing['dashboard_per_month'],
2786 | 'free_tier_dashboards': free_tier['dashboards_count'],
2787 | 'free_tier_metrics_per_dashboard': free_tier['dashboard_metrics']
2788 | },
2789 | 'optimization_insights': {
2790 | 'high_complexity_dashboards': sum(1 for d in dashboards_with_metrics if d['total_dimensions'] > 50),
2791 | 'stale_dashboards': stale_count,
2792 | 'recommendation': 'Review dashboards with high dimension counts for optimization opportunities' if total_dimensions_all > 100 else 'Dashboard complexity is reasonable'
2793 | },
2794 | 'filters_applied': {
2795 | 'dashboard_name_prefix': dashboard_name_prefix
2796 | }
2797 | }
2798 |
2799 | except Exception as e:
2800 | logger.error(f"Error listing dashboards: {str(e)}")
2801 | return {'status': 'error', 'message': str(e)}
2802 |
2803 | async def analyze_dashboards_usage(self, dashboard_names: Optional[List[str]] = None) -> Dict[str, Any]:
2804 | """Analyze CloudWatch Dashboards usage and optimization opportunities."""
2805 | try:
2806 | dashboards_data = await self.dao.list_dashboards()
2807 | dashboards = dashboards_data['dashboards']
2808 |
2809 | if dashboard_names:
2810 | dashboards = [d for d in dashboards if d.get('DashboardName') in dashboard_names]
2811 |
2812 | # Analyze stale dashboards
2813 | stale_dashboards = []
2814 | now = datetime.now(timezone.utc)
2815 |
2816 | for dashboard in dashboards:
2817 | last_modified = dashboard.get('LastModified')
2818 | if last_modified:
2819 | if isinstance(last_modified, str):
2820 | last_modified = datetime.fromisoformat(last_modified.replace('Z', '+00:00'))
2821 |
2822 | days_since_modified = (now - last_modified).days
2823 | if days_since_modified > 90:
2824 | stale_dashboards.append({
2825 | 'name': dashboard.get('DashboardName'),
2826 | 'days_since_modified': days_since_modified
2827 | })
2828 |
2829 | # Calculate costs
2830 | dashboards_cost = self.pricing_dao.calculate_cost('dashboards', {
2831 | 'dashboards_count': len(dashboards)
2832 | })
2833 |
2834 | return {
2835 | 'status': 'success',
2836 | 'dashboards_summary': {
2837 | 'total_dashboards': len(dashboards),
2838 | 'exceeds_free_tier': len(dashboards) > 3,
2839 | 'billable_dashboards': max(0, len(dashboards) - 3),
2840 | 'stale_dashboards': len(stale_dashboards)
2841 | },
2842 | 'cost_analysis': dashboards_cost,
2843 | 'dashboards_details': dashboards[:20]
2844 | }
2845 |
2846 | except Exception as e:
2847 | logger.error(f"Error analyzing dashboards usage: {str(e)}")
2848 | return {'status': 'error', 'message': str(e)}
2849 |
2850 |
2851 | class CloudWatchService:
2852 | """
2853 | Main CloudWatch service that manages specialized tip classes.
2854 |
2855 | This service acts as a factory and lifecycle manager for:
2856 | - CWGeneralSpendTips: General spending analysis
2857 | - CWMetricsTips: Metrics optimization
2858 | - CWLogsTips: Logs optimization
2859 | - CWAlarmsTips: Alarms optimization
2860 | - CWDashboardTips: Dashboard optimization
2861 |
2862 | Usage:
2863 | service = CloudWatchService(region='us-east-1')
2864 | general_tips = service.getGeneralSpendService()
2865 | result = await general_tips.analyze_overall_spending()
2866 | """
2867 |
2868 | def __init__(self, config: Optional[CloudWatchServiceConfig] = None, region: Optional[str] = None):
2869 | """Initialize CloudWatch service with configuration."""
2870 | # Handle backward compatibility
2871 | if region is not None and config is None:
2872 | config = CloudWatchServiceConfig(region=region)
2873 |
2874 | self.config = config or CloudWatchServiceConfig()
2875 | self.region = self.config.region
2876 |
2877 | # Initialize cost controller
2878 | self.cost_controller = CostController()
2879 | self.cost_preferences = self.config.cost_preferences or CostPreferences()
2880 |
2881 | # Initialize DAOs (shared by all tip classes)
2882 | self._dao = CloudWatchDAO(region=self.region, cost_controller=self.cost_controller)
2883 | self._pricing_dao = AWSPricingDAO(region=self.region or 'us-east-1')
2884 |
2885 | # Initialize tip classes (lazy loading)
2886 | self._general_spend_tips = None
2887 | self._metrics_tips = None
2888 | self._logs_tips = None
2889 | self._alarms_tips = None
2890 | self._dashboard_tips = None
2891 |
2892 | log_cloudwatch_operation(logger, "service_initialization",
2893 | region=self.region or 'default',
2894 | cost_preferences=str(self.cost_preferences))
2895 |
2896 | def getGeneralSpendService(self) -> CWGeneralSpendTips:
2897 | """Get general spending analysis service."""
2898 | if self._general_spend_tips is None:
2899 | self._general_spend_tips = CWGeneralSpendTips(
2900 | self._dao, self._pricing_dao, self.cost_preferences
2901 | )
2902 | return self._general_spend_tips
2903 |
2904 | def getMetricsService(self) -> CWMetricsTips:
2905 | """Get metrics optimization service."""
2906 | if self._metrics_tips is None:
2907 | self._metrics_tips = CWMetricsTips(
2908 | self._dao, self._pricing_dao, self.cost_preferences
2909 | )
2910 | return self._metrics_tips
2911 |
2912 | def getLogsService(self) -> CWLogsTips:
2913 | """Get logs optimization service."""
2914 | if self._logs_tips is None:
2915 | self._logs_tips = CWLogsTips(
2916 | self._dao, self._pricing_dao, self.cost_preferences
2917 | )
2918 | return self._logs_tips
2919 |
2920 | def getAlarmsService(self) -> CWAlarmsTips:
2921 | """Get alarms optimization service."""
2922 | if self._alarms_tips is None:
2923 | self._alarms_tips = CWAlarmsTips(
2924 | self._dao, self._pricing_dao, self.cost_preferences
2925 | )
2926 | return self._alarms_tips
2927 |
2928 | def getDashboardsService(self) -> CWDashboardTips:
2929 | """Get dashboards optimization service."""
2930 | if self._dashboard_tips is None:
2931 | self._dashboard_tips = CWDashboardTips(
2932 | self._dao, self._pricing_dao, self.cost_preferences
2933 | )
2934 | return self._dashboard_tips
2935 |
2936 | @property
2937 | def pricing(self):
2938 | """Backward compatibility property for pricing DAO."""
2939 | return self._pricing_dao
2940 |
2941 | def update_cost_preferences(self, preferences: CostPreferences):
2942 | """Update cost control preferences for all services."""
2943 | self.cost_preferences = preferences
2944 |
2945 | # Update preferences in existing tip classes
2946 | if self._general_spend_tips:
2947 | self._general_spend_tips.cost_preferences = preferences
2948 | if self._metrics_tips:
2949 | self._metrics_tips.cost_preferences = preferences
2950 | if self._logs_tips:
2951 | self._logs_tips.cost_preferences = preferences
2952 | if self._alarms_tips:
2953 | self._alarms_tips.cost_preferences = preferences
2954 | if self._dashboard_tips:
2955 | self._dashboard_tips.cost_preferences = preferences
2956 |
2957 | log_cloudwatch_operation(logger, "cost_preferences_update",
2958 | preferences=str(preferences))
2959 |
2960 | def get_service_statistics(self) -> Dict[str, Any]:
2961 | """Get service statistics including cache performance."""
2962 | return {
2963 | 'service_info': {
2964 | 'region': self.region,
2965 | 'cost_preferences': self.cost_preferences.__dict__,
2966 | 'initialized_services': {
2967 | 'general_spend': self._general_spend_tips is not None,
2968 | 'metrics': self._metrics_tips is not None,
2969 | 'logs': self._logs_tips is not None,
2970 | 'alarms': self._alarms_tips is not None,
2971 | 'dashboards': self._dashboard_tips is not None
2972 | }
2973 | },
2974 | 'cache_statistics': self._dao.get_cache_stats(),
2975 | 'cost_control_status': {
2976 | 'cost_controller_active': True,
2977 | 'preferences_validated': True,
2978 | 'transparency_enabled': self.config.enable_cost_tracking
2979 | }
2980 | }
2981 |
2982 | def clear_cache(self) -> None:
2983 | """Clear all cached data."""
2984 | self._dao.clear_cache()
2985 | log_cloudwatch_operation(logger, "cache_cleared")
2986 |
2987 |
2988 | # Convenience function for backward compatibility
2989 |
2990 | async def create_cloudwatch_service(region: Optional[str] = None,
2991 | cost_preferences: Optional[CostPreferences] = None) -> CloudWatchService:
2992 | """Create a configured CloudWatchService instance."""
2993 | config = CloudWatchServiceConfig(region=region, cost_preferences=cost_preferences)
2994 | service = CloudWatchService(config)
2995 |
2996 | log_cloudwatch_operation(logger, "service_created", region=region or 'default')
2997 | return service
```