This is page 17 of 19. Use http://codebase.md/aws-samples/sample-cfm-tips-mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .gitignore
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── diagnose_cost_optimization_hub_v2.py
├── LICENSE
├── logging_config.py
├── mcp_runbooks.json
├── mcp_server_with_runbooks.py
├── playbooks
│ ├── __init__.py
│ ├── aws_lambda
│ │ ├── __init__.py
│ │ └── lambda_optimization.py
│ ├── cloudtrail
│ │ ├── __init__.py
│ │ └── cloudtrail_optimization.py
│ ├── cloudtrail_optimization.py
│ ├── cloudwatch
│ │ ├── __init__.py
│ │ ├── aggregation_queries.py
│ │ ├── alarms_and_dashboards_analyzer.py
│ │ ├── analysis_engine.py
│ │ ├── base_analyzer.py
│ │ ├── cloudwatch_optimization_analyzer.py
│ │ ├── cloudwatch_optimization_tool.py
│ │ ├── cloudwatch_optimization.py
│ │ ├── cost_controller.py
│ │ ├── general_spend_analyzer.py
│ │ ├── logs_optimization_analyzer.py
│ │ ├── metrics_optimization_analyzer.py
│ │ ├── optimization_orchestrator.py
│ │ └── result_processor.py
│ ├── comprehensive_optimization.py
│ ├── ebs
│ │ ├── __init__.py
│ │ └── ebs_optimization.py
│ ├── ebs_optimization.py
│ ├── ec2
│ │ ├── __init__.py
│ │ └── ec2_optimization.py
│ ├── ec2_optimization.py
│ ├── lambda_optimization.py
│ ├── rds
│ │ ├── __init__.py
│ │ └── rds_optimization.py
│ ├── rds_optimization.py
│ └── s3
│ ├── __init__.py
│ ├── analyzers
│ │ ├── __init__.py
│ │ ├── api_cost_analyzer.py
│ │ ├── archive_optimization_analyzer.py
│ │ ├── general_spend_analyzer.py
│ │ ├── governance_analyzer.py
│ │ ├── multipart_cleanup_analyzer.py
│ │ └── storage_class_analyzer.py
│ ├── base_analyzer.py
│ ├── s3_aggregation_queries.py
│ ├── s3_analysis_engine.py
│ ├── s3_comprehensive_optimization_tool.py
│ ├── s3_optimization_orchestrator.py
│ └── s3_optimization.py
├── README.md
├── requirements.txt
├── runbook_functions_extended.py
├── runbook_functions.py
├── RUNBOOKS_GUIDE.md
├── services
│ ├── __init__.py
│ ├── cloudwatch_pricing.py
│ ├── cloudwatch_service_vended_log.py
│ ├── cloudwatch_service.py
│ ├── compute_optimizer.py
│ ├── cost_explorer.py
│ ├── optimization_hub.py
│ ├── performance_insights.py
│ ├── pricing.py
│ ├── s3_pricing.py
│ ├── s3_service.py
│ ├── storage_lens_service.py
│ └── trusted_advisor.py
├── setup.py
├── test_runbooks.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── cloudwatch
│ │ │ └── test_cloudwatch_integration.py
│ │ ├── test_cloudwatch_comprehensive_tool_integration.py
│ │ ├── test_cloudwatch_orchestrator_integration.py
│ │ ├── test_integration_suite.py
│ │ └── test_orchestrator_integration.py
│ ├── legacy
│ │ ├── example_output_with_docs.py
│ │ ├── example_wellarchitected_output.py
│ │ ├── test_aws_session_management.py
│ │ ├── test_cloudwatch_orchestrator_pagination.py
│ │ ├── test_cloudwatch_pagination_integration.py
│ │ ├── test_cloudwatch_performance_optimizations.py
│ │ ├── test_cloudwatch_result_processor.py
│ │ ├── test_cloudwatch_timeout_issue.py
│ │ ├── test_documentation_links.py
│ │ ├── test_metrics_pagination_count.py
│ │ ├── test_orchestrator_integration.py
│ │ ├── test_pricing_cache_fix_moved.py
│ │ ├── test_pricing_cache_fix.py
│ │ ├── test_runbook_integration.py
│ │ ├── test_runbooks.py
│ │ ├── test_setup_verification.py
│ │ └── test_stack_trace_fix.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── cloudwatch
│ │ │ └── test_cloudwatch_performance.py
│ │ ├── test_cloudwatch_parallel_execution.py
│ │ ├── test_parallel_execution.py
│ │ └── test_performance_suite.py
│ ├── pytest-cloudwatch.ini
│ ├── pytest.ini
│ ├── README.md
│ ├── requirements-test.txt
│ ├── run_cloudwatch_tests.py
│ ├── run_tests.py
│ ├── test_setup_verification.py
│ ├── test_suite_main.py
│ └── unit
│ ├── __init__.py
│ ├── analyzers
│ │ ├── __init__.py
│ │ ├── conftest_cloudwatch.py
│ │ ├── test_alarms_and_dashboards_analyzer.py
│ │ ├── test_base_analyzer.py
│ │ ├── test_cloudwatch_base_analyzer.py
│ │ ├── test_cloudwatch_cost_constraints.py
│ │ ├── test_cloudwatch_general_spend_analyzer.py
│ │ ├── test_general_spend_analyzer.py
│ │ ├── test_logs_optimization_analyzer.py
│ │ └── test_metrics_optimization_analyzer.py
│ ├── cloudwatch
│ │ ├── test_cache_control.py
│ │ ├── test_cloudwatch_api_mocking.py
│ │ ├── test_cloudwatch_metrics_pagination.py
│ │ ├── test_cloudwatch_pagination_architecture.py
│ │ ├── test_cloudwatch_pagination_comprehensive_fixed.py
│ │ ├── test_cloudwatch_pagination_comprehensive.py
│ │ ├── test_cloudwatch_pagination_fixed.py
│ │ ├── test_cloudwatch_pagination_real_format.py
│ │ ├── test_cloudwatch_pagination_simple.py
│ │ ├── test_cloudwatch_query_pagination.py
│ │ ├── test_cloudwatch_unit_suite.py
│ │ ├── test_general_spend_tips_refactor.py
│ │ ├── test_import_error.py
│ │ ├── test_mcp_pagination_bug.py
│ │ └── test_mcp_surface_pagination.py
│ ├── s3
│ │ └── live
│ │ ├── test_bucket_listing.py
│ │ ├── test_s3_governance_bucket_discovery.py
│ │ └── test_top_buckets.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── test_cloudwatch_cost_controller.py
│ │ ├── test_cloudwatch_query_service.py
│ │ ├── test_cloudwatch_service.py
│ │ ├── test_cost_control_routing.py
│ │ └── test_s3_service.py
│ └── test_unit_suite.py
└── utils
├── __init__.py
├── aws_client_factory.py
├── cache_decorator.py
├── cleanup_manager.py
├── cloudwatch_cache.py
├── documentation_links.py
├── error_handler.py
├── intelligent_cache.py
├── logging_config.py
├── memory_manager.py
├── parallel_executor.py
├── performance_monitor.py
├── progressive_timeout.py
├── service_orchestrator.py
└── session_manager.py
```
# Files
--------------------------------------------------------------------------------
/playbooks/cloudwatch/cloudwatch_optimization_tool.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Comprehensive CloudWatch Optimization Tool
3 |
4 | Unified tool that executes all 4 CloudWatch optimization functionalities in parallel
5 | with intelligent analysis orchestration, cost-aware priority-based execution,
6 | and comprehensive reporting.
7 | """
8 |
9 | import logging
10 | import asyncio
11 | from typing import Dict, List, Any, Optional
12 | from datetime import datetime
13 |
14 | from playbooks.cloudwatch.optimization_orchestrator import CloudWatchOptimizationOrchestrator
15 | from playbooks.cloudwatch.cost_controller import CostController, CostPreferences, CostEstimate
16 | from utils.logging_config import log_cloudwatch_operation
17 |
18 | logger = logging.getLogger(__name__)
19 |
20 |
21 | class CloudWatchOptimizationTool:
22 | """
23 | Comprehensive CloudWatch optimization tool that executes all functionalities.
24 |
25 | This tool provides:
26 | - Unified execution of all 4 CloudWatch optimization functionalities in parallel
27 | - Intelligent analysis orchestration with cost-aware priority-based execution
28 | - Comprehensive reporting that aggregates results from all analyzers
29 | - Executive summary with top recommendations and functionality coverage metrics
30 | - Configurable analysis scope (specific functionalities, resource filtering, time ranges, cost preferences)
31 | - Cost estimation and user consent workflows for paid features
32 | """
33 |
34 | def __init__(self, region: str = None, session_id: str = None):
35 | """Initialize the comprehensive CloudWatch optimization tool."""
36 | self.region = region
37 | self.session_id = session_id
38 | self.logger = logging.getLogger(__name__)
39 |
40 | # Initialize orchestrator
41 | self.orchestrator = CloudWatchOptimizationOrchestrator(
42 | region=region,
43 | session_id=session_id
44 | )
45 |
46 | # Initialize cost controller
47 | self.cost_controller = CostController()
48 |
49 | # Tool configuration
50 | self.tool_version = "1.0.0"
51 | self.supported_functionalities = [
52 | 'general_spend',
53 | 'metrics_optimization',
54 | 'logs_optimization',
55 | 'alarms_and_dashboards'
56 | ]
57 |
58 | log_cloudwatch_operation(self.logger, "optimization_tool_initialized",
59 | region=region, session_id=session_id,
60 | supported_functionalities=len(self.supported_functionalities))
61 |
62 | async def execute_comprehensive_optimization_analysis(self, **kwargs) -> Dict[str, Any]:
63 | """
64 | Execute comprehensive CloudWatch optimization analysis with all 4 functionalities in parallel.
65 |
66 | This method implements intelligent analysis orchestration with cost-aware priority-based execution,
67 | configurable analysis scope, and comprehensive cost estimation and user consent workflows.
68 |
69 | Args:
70 | **kwargs: Analysis parameters including:
71 | - region: AWS region
72 | - lookback_days: Number of days to analyze (default: 30)
73 | - allow_cost_explorer: Enable Cost Explorer analysis (default: False)
74 | - allow_aws_config: Enable AWS Config governance checks (default: False)
75 | - allow_cloudtrail: Enable CloudTrail usage pattern analysis (default: False)
76 | - allow_minimal_cost_metrics: Enable minimal cost metrics (default: False)
77 | - functionalities: List of specific functionalities to run (default: all)
78 | - resource_filters: Dict of resource filters (log_group_names, alarm_names, etc.)
79 | - time_range_filters: Dict of time range filters (start_date, end_date)
80 | - priority_mode: 'cost_impact' | 'execution_time' | 'balanced' (default: 'balanced')
81 | - store_results: Whether to store results in session (default: True)
82 | - generate_executive_summary: Whether to generate executive summary (default: True)
83 | - max_parallel_analyses: Maximum parallel analyses (default: 4)
84 |
85 | Returns:
86 | Dictionary containing comprehensive optimization analysis results
87 | """
88 | start_time = datetime.now()
89 |
90 | log_cloudwatch_operation(self.logger, "comprehensive_optimization_start",
91 | session_id=self.session_id,
92 | priority_mode=kwargs.get('priority_mode', 'balanced'))
93 |
94 | try:
95 | # Step 1: Validate and prepare cost preferences with detailed validation
96 | cost_validation = await self._validate_cost_preferences_with_consent(**kwargs)
97 | if cost_validation['status'] != 'success':
98 | return {
99 | 'status': 'error',
100 | 'error_message': f"Cost preference validation failed: {cost_validation.get('error_message')}",
101 | 'timestamp': start_time.isoformat(),
102 | 'cost_validation_details': cost_validation
103 | }
104 |
105 | cost_preferences = cost_validation['validated_preferences']
106 | functionality_coverage = cost_validation['functionality_coverage']
107 |
108 | # Step 2: Get detailed cost estimate with user consent workflow
109 | cost_estimate_result = await self._get_detailed_cost_estimate(**kwargs)
110 | if cost_estimate_result['status'] != 'success':
111 | return {
112 | 'status': 'error',
113 | 'error_message': f"Cost estimation failed: {cost_estimate_result.get('error_message')}",
114 | 'timestamp': start_time.isoformat(),
115 | 'cost_estimate_details': cost_estimate_result
116 | }
117 |
118 | cost_estimate = cost_estimate_result['cost_estimate']
119 |
120 | # Step 3: Configure analysis scope with intelligent filtering
121 | analysis_scope = self._configure_analysis_scope(**kwargs)
122 |
123 | # Step 4: Determine functionalities with priority-based ordering
124 | execution_plan = self._create_intelligent_execution_plan(
125 | requested_functionalities=kwargs.get('functionalities', self.supported_functionalities),
126 | cost_preferences=cost_preferences,
127 | priority_mode=kwargs.get('priority_mode', 'balanced'),
128 | analysis_scope=analysis_scope
129 | )
130 |
131 | if not execution_plan['valid_functionalities']:
132 | return {
133 | 'status': 'error',
134 | 'error_message': 'No valid functionalities specified or enabled',
135 | 'supported_functionalities': self.supported_functionalities,
136 | 'execution_plan': execution_plan,
137 | 'timestamp': start_time.isoformat()
138 | }
139 |
140 | # Step 5: Execute parallel analysis with intelligent orchestration
141 | parallel_results = await self._execute_parallel_analyses_with_orchestration(
142 | execution_plan=execution_plan,
143 | analysis_scope=analysis_scope,
144 | cost_preferences=cost_preferences,
145 | **kwargs
146 | )
147 |
148 | # Step 6: Execute cross-analysis insights and correlations
149 | cross_analysis_insights = await self._execute_cross_analysis_insights(
150 | parallel_results, analysis_scope, **kwargs
151 | )
152 |
153 | # Step 7: Generate executive summary with actionable insights
154 | executive_summary = None
155 | if kwargs.get('generate_executive_summary', True):
156 | executive_summary = await self._generate_executive_summary(
157 | parallel_results, cross_analysis_insights, execution_plan, **kwargs
158 | )
159 |
160 | # Step 8: Compile comprehensive optimization report
161 | optimization_report = self._compile_comprehensive_report(
162 | parallel_results=parallel_results,
163 | cross_analysis_insights=cross_analysis_insights,
164 | executive_summary=executive_summary,
165 | execution_plan=execution_plan,
166 | analysis_scope=analysis_scope,
167 | cost_preferences=cost_preferences,
168 | functionality_coverage=functionality_coverage,
169 | cost_estimate=cost_estimate,
170 | start_time=start_time,
171 | **kwargs
172 | )
173 |
174 | log_cloudwatch_operation(self.logger, "comprehensive_optimization_complete",
175 | session_id=self.session_id,
176 | status=optimization_report.get('status'),
177 | total_execution_time=optimization_report.get('total_execution_time'),
178 | analyses_executed=len(execution_plan['valid_functionalities']))
179 |
180 | return optimization_report
181 |
182 | except Exception as e:
183 | import traceback
184 | full_traceback = traceback.format_exc()
185 | error_message = str(e)
186 | self.logger.error(f"Comprehensive optimization analysis failed: {error_message}")
187 | self.logger.error(f"Full traceback: {full_traceback}")
188 | return {
189 | 'status': 'error',
190 | 'error_message': error_message,
191 | 'error_type': e.__class__.__name__,
192 | 'full_exception_details': {
193 | 'traceback': full_traceback,
194 | 'error_location': self._extract_error_location(full_traceback)
195 | },
196 | 'timestamp': start_time.isoformat(),
197 | 'session_id': self.session_id,
198 | 'tool_version': self.tool_version,
199 | 'execution_context': {
200 | 'kwargs_keys': list(kwargs.keys()),
201 | 'supported_functionalities': self.supported_functionalities
202 | }
203 | }
204 |
205 | def _compile_comprehensive_report(self, parallel_results: Dict[str, Any],
206 | cross_analysis_insights: Dict[str, Any],
207 | executive_summary: Optional[Dict[str, Any]],
208 | execution_plan: Dict[str, Any],
209 | analysis_scope: Dict[str, Any],
210 | cost_preferences: Dict[str, Any],
211 | functionality_coverage: Dict[str, Any],
212 | cost_estimate: Dict[str, Any],
213 | start_time: datetime,
214 | **kwargs) -> Dict[str, Any]:
215 | """
216 | Compile comprehensive optimization report from all analysis results.
217 |
218 | This method creates a unified report that includes:
219 | - Execution summary with intelligent orchestration metrics
220 | - Cost-aware analysis results with transparency
221 | - Cross-analysis insights and correlations
222 | - Executive summary with actionable recommendations
223 | - Detailed configuration and scope information
224 | """
225 |
226 | total_execution_time = (datetime.now() - start_time).total_seconds()
227 |
228 | # Determine overall status based on parallel execution results
229 | overall_status = 'success'
230 | if parallel_results.get('status') == 'error':
231 | overall_status = 'error'
232 | elif parallel_results.get('failed_analyses', 0) > 0:
233 | if parallel_results.get('successful_analyses', 0) > 0:
234 | overall_status = 'partial'
235 | else:
236 | overall_status = 'error'
237 | elif cross_analysis_insights.get('status') == 'error':
238 | overall_status = 'partial'
239 |
240 | # Extract key metrics from parallel execution
241 | successful_analyses = parallel_results.get('successful_analyses', 0)
242 | failed_analyses = parallel_results.get('failed_analyses', 0)
243 | total_analyses = successful_analyses + failed_analyses
244 |
245 | # Compile top recommendations from all sources
246 | top_recommendations = self._extract_top_recommendations_enhanced(
247 | parallel_results, cross_analysis_insights, executive_summary
248 | )
249 |
250 | # Create comprehensive report with enhanced structure
251 | report = {
252 | 'status': overall_status,
253 | 'report_type': 'comprehensive_cloudwatch_optimization_v2',
254 | 'tool_version': self.tool_version,
255 | 'generated_at': datetime.now().isoformat(),
256 | 'analysis_started_at': start_time.isoformat(),
257 | 'total_execution_time': total_execution_time,
258 | 'session_id': self.session_id,
259 | 'region': self.region,
260 |
261 | # Enhanced Analysis Configuration
262 | 'analysis_configuration': {
263 | 'execution_plan': execution_plan,
264 | 'analysis_scope': analysis_scope,
265 | 'cost_preferences': cost_preferences,
266 | 'functionality_coverage': functionality_coverage,
267 | 'cost_estimate': cost_estimate,
268 | 'intelligent_orchestration': {
269 | 'priority_mode': execution_plan.get('priority_mode', 'balanced'),
270 | 'execution_batches': len(execution_plan.get('execution_batches', [])),
271 | 'max_parallel_analyses': analysis_scope.get('performance_constraints', {}).get('max_parallel_analyses', 4)
272 | }
273 | },
274 |
275 | # Enhanced Execution Summary
276 | 'execution_summary': {
277 | 'parallel_execution_metrics': {
278 | 'total_functionalities_requested': len(execution_plan.get('valid_functionalities', [])),
279 | 'successful_analyses': successful_analyses,
280 | 'failed_analyses': failed_analyses,
281 | 'success_rate': (successful_analyses / total_analyses * 100) if total_analyses > 0 else 0,
282 | 'total_execution_time': parallel_results.get('total_execution_time', 0),
283 | 'average_analysis_time': (parallel_results.get('total_execution_time', 0) / total_analyses) if total_analyses > 0 else 0,
284 | 'execution_efficiency': self._calculate_execution_efficiency(parallel_results, execution_plan)
285 | },
286 | 'cost_transparency': {
287 | 'cost_incurred': self._extract_cost_incurred(parallel_results),
288 | 'cost_incurring_operations': self._extract_cost_operations(parallel_results),
289 | 'primary_data_sources': self._extract_primary_data_sources(parallel_results),
290 | 'fallback_usage': self._extract_fallback_usage(parallel_results)
291 | },
292 | 'batch_execution_details': parallel_results.get('batch_summaries', [])
293 | },
294 |
295 | # Enhanced Key Findings and Recommendations
296 | 'key_findings': self._extract_key_findings_enhanced(parallel_results, cross_analysis_insights, executive_summary),
297 | 'top_recommendations': top_recommendations,
298 | 'optimization_priorities': self._determine_optimization_priorities_enhanced(top_recommendations, cross_analysis_insights),
299 |
300 | # Detailed Results with Enhanced Structure
301 | 'detailed_results': {
302 | 'individual_analyses': parallel_results.get('individual_results', {}),
303 | 'cross_analysis_insights': cross_analysis_insights,
304 | 'executive_summary': executive_summary,
305 | 'parallel_execution_summary': parallel_results
306 | },
307 |
308 | # Enhanced Session and Data Information
309 | 'session_metadata': {
310 | 'session_id': self.session_id,
311 | 'stored_tables': self._extract_stored_tables(parallel_results),
312 | 'query_capabilities': 'Full SQL querying available on all stored analysis data',
313 | 'data_retention': 'Session data available for 24 hours',
314 | 'cross_analysis_correlations': cross_analysis_insights.get('correlation_strength', 'unknown')
315 | },
316 |
317 | # Enhanced Next Steps with Implementation Guidance
318 | 'recommended_next_steps': self._generate_next_steps_enhanced(
319 | top_recommendations, functionality_coverage, cost_estimate, cross_analysis_insights, executive_summary
320 | ),
321 |
322 | # Implementation Support
323 | 'implementation_support': {
324 | 'cost_impact_analysis': self._generate_cost_impact_analysis(top_recommendations),
325 | 'risk_assessment': self._generate_risk_assessment(top_recommendations),
326 | 'timeline_recommendations': self._generate_timeline_recommendations(top_recommendations),
327 | 'monitoring_recommendations': self._generate_monitoring_recommendations(parallel_results)
328 | }
329 | }
330 |
331 | return report
332 |
333 | def _extract_top_recommendations_enhanced(self, parallel_results: Dict[str, Any],
334 | cross_analysis_insights: Dict[str, Any],
335 | executive_summary: Optional[Dict[str, Any]]) -> List[Dict[str, Any]]:
336 | """Extract and prioritize top recommendations from all analysis sources."""
337 | all_recommendations = []
338 |
339 | # Extract recommendations from individual parallel analyses
340 | individual_results = parallel_results.get('individual_results', {})
341 | for analysis_type, analysis_data in individual_results.items():
342 | if analysis_data.get('status') == 'success':
343 | recommendations = analysis_data.get('recommendations', [])
344 | for rec in recommendations:
345 | rec['source'] = f'individual_analysis_{analysis_type}'
346 | rec['analysis_type'] = analysis_type
347 | all_recommendations.append(rec)
348 |
349 | # Extract recommendations from cross-analysis insights
350 | if cross_analysis_insights.get('status') == 'success':
351 | insights = cross_analysis_insights.get('insights', {})
352 |
353 | # Priority recommendations from cross-analysis
354 | priority_recs = insights.get('priority_recommendations', [])
355 | for rec in priority_recs:
356 | rec['source'] = 'cross_analysis_priority'
357 | all_recommendations.append(rec)
358 |
359 | # Synergy-based recommendations
360 | synergies = insights.get('optimization_synergies', [])
361 | for synergy in synergies:
362 | all_recommendations.append({
363 | 'type': 'optimization_synergy',
364 | 'priority': 'high',
365 | 'title': f"Synergistic Optimization: {synergy.get('type', 'Unknown')}",
366 | 'description': synergy.get('description', ''),
367 | 'potential_savings': 0, # Synergies often have compound benefits
368 | 'implementation_effort': 'medium',
369 | 'source': 'cross_analysis_synergy',
370 | 'synergy_details': synergy
371 | })
372 |
373 | # Extract recommendations from executive summary
374 | if executive_summary and executive_summary.get('status') == 'success':
375 | exec_summary = executive_summary.get('executive_summary', {})
376 | immediate_actions = exec_summary.get('immediate_actions', [])
377 | for action in immediate_actions:
378 | all_recommendations.append({
379 | 'type': 'immediate_action',
380 | 'priority': 'critical',
381 | 'title': action.get('action', 'Immediate Action Required'),
382 | 'description': f"Source: {action.get('source', 'Unknown')}",
383 | 'potential_savings': action.get('impact', 0),
384 | 'implementation_effort': action.get('effort', 'medium'),
385 | 'source': 'executive_summary_immediate',
386 | 'timeline': action.get('timeline', 'Within 1 week')
387 | })
388 |
389 | # Enhanced sorting with multiple criteria
390 | priority_order = {'critical': 4, 'high': 3, 'medium': 2, 'low': 1}
391 | effort_order = {'low': 3, 'medium': 2, 'high': 1}
392 |
393 | sorted_recommendations = sorted(
394 | all_recommendations,
395 | key=lambda x: (
396 | priority_order.get(x.get('priority', 'low'), 1),
397 | x.get('potential_savings', 0),
398 | effort_order.get(x.get('implementation_effort', 'medium'), 2)
399 | ),
400 | reverse=True
401 | )
402 |
403 | # Return top 15 recommendations with enhanced metadata
404 | top_recommendations = sorted_recommendations[:15]
405 |
406 | # Add ranking and context to each recommendation
407 | for i, rec in enumerate(top_recommendations):
408 | rec['recommendation_rank'] = i + 1
409 | rec['recommendation_context'] = {
410 | 'total_recommendations_analyzed': len(all_recommendations),
411 | 'ranking_criteria': ['priority', 'potential_savings', 'implementation_effort'],
412 | 'confidence_score': self._calculate_recommendation_confidence(rec)
413 | }
414 |
415 | return top_recommendations
416 |
417 | def _extract_key_findings(self, comprehensive_result: Dict[str, Any],
418 | cross_analysis_insights: Dict[str, Any]) -> List[str]:
419 | """Extract key findings from all analyses."""
420 | findings = []
421 |
422 | # Analysis execution findings
423 | analysis_summary = comprehensive_result.get('analysis_summary', {})
424 | if analysis_summary:
425 | successful = analysis_summary.get('successful_analyses', 0)
426 | total = analysis_summary.get('total_analyses', 0)
427 | findings.append(f"Successfully completed {successful} of {total} CloudWatch optimization analyses")
428 |
429 | # Cost findings
430 | if comprehensive_result.get('cost_incurred', False):
431 | cost_ops = comprehensive_result.get('cost_incurring_operations', [])
432 | findings.append(f"Analysis used {len(cost_ops)} cost-incurring operations: {', '.join(cost_ops)}")
433 | else:
434 | findings.append("Analysis completed using only free AWS operations")
435 |
436 | # Data source findings
437 | primary_source = comprehensive_result.get('primary_data_source', 'unknown')
438 | findings.append(f"Primary data source: {primary_source}")
439 |
440 | # Cross-analysis findings
441 | insight_summary = cross_analysis_insights.get('summary', {})
442 | key_insights = insight_summary.get('key_findings', [])
443 | findings.extend(key_insights)
444 |
445 | # Cost impact findings
446 | cost_impact = insight_summary.get('cost_impact_analysis', {})
447 | if cost_impact.get('potential_monthly_savings'):
448 | savings = cost_impact['potential_monthly_savings']
449 | percentage = cost_impact.get('savings_percentage', 0)
450 | findings.append(f"Potential monthly savings identified: ${savings:.2f} ({percentage:.1f}% of current spend)")
451 |
452 | return findings
453 |
454 | def _determine_optimization_priorities(self, recommendations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
455 | """Determine optimization priorities based on recommendations."""
456 | priorities = []
457 |
458 | # Group recommendations by priority level
459 | critical_recs = [r for r in recommendations if r.get('priority') == 'critical']
460 | high_recs = [r for r in recommendations if r.get('priority') == 'high']
461 |
462 | if critical_recs:
463 | priorities.append({
464 | 'priority_level': 'critical',
465 | 'title': 'Immediate Action Required',
466 | 'description': f'{len(critical_recs)} critical optimization opportunities identified',
467 | 'recommended_timeline': 'Within 1 week',
468 | 'recommendations': critical_recs[:3] # Top 3 critical
469 | })
470 |
471 | if high_recs:
472 | priorities.append({
473 | 'priority_level': 'high',
474 | 'title': 'High-Impact Optimizations',
475 | 'description': f'{len(high_recs)} high-impact optimization opportunities identified',
476 | 'recommended_timeline': 'Within 1 month',
477 | 'recommendations': high_recs[:5] # Top 5 high
478 | })
479 |
480 | # Add general implementation strategy
481 | priorities.append({
482 | 'priority_level': 'strategic',
483 | 'title': 'Implementation Strategy',
484 | 'description': 'Recommended approach for CloudWatch optimization',
485 | 'recommended_timeline': 'Ongoing',
486 | 'strategy': [
487 | '1. Address critical issues immediately (cost impact >$100/month)',
488 | '2. Implement high-impact optimizations (cost impact >$50/month)',
489 | '3. Clean up unused resources (alarms, metrics, log groups)',
490 | '4. Establish governance policies to prevent future waste',
491 | '5. Schedule regular optimization reviews (quarterly)'
492 | ]
493 | })
494 |
495 | return priorities
496 |
497 | def _generate_next_steps(self, recommendations: List[Dict[str, Any]],
498 | functionality_coverage: Dict[str, Any],
499 | cost_estimate: Dict[str, Any]) -> List[str]:
500 | """Generate recommended next steps based on analysis results."""
501 | next_steps = []
502 |
503 | # Based on functionality coverage
504 | overall_coverage = functionality_coverage.get('overall_coverage', 0)
505 | if overall_coverage < 70:
506 | next_steps.append(
507 | f"Consider enabling additional cost features for more comprehensive analysis "
508 | f"(current coverage: {overall_coverage:.1f}%)"
509 | )
510 |
511 | # Based on cost estimate
512 | if cost_estimate.get('total_estimated_cost', 0) == 0:
513 | next_steps.append(
514 | "All analysis completed with free operations - consider enabling Cost Explorer "
515 | "for historical cost data and trends"
516 | )
517 |
518 | # Based on recommendations
519 | if recommendations:
520 | critical_count = len([r for r in recommendations if r.get('priority') == 'critical'])
521 | if critical_count > 0:
522 | next_steps.append(f"Address {critical_count} critical optimization opportunities immediately")
523 |
524 | high_count = len([r for r in recommendations if r.get('priority') == 'high'])
525 | if high_count > 0:
526 | next_steps.append(f"Plan implementation of {high_count} high-impact optimizations")
527 |
528 | # General next steps
529 | next_steps.extend([
530 | "Review detailed analysis results for specific resource optimization opportunities",
531 | "Use session SQL queries to explore data relationships and patterns",
532 | "Implement monitoring for optimization progress tracking",
533 | "Schedule follow-up analysis after implementing changes"
534 | ])
535 |
536 | return next_steps
537 |
538 | async def execute_specific_functionalities(self, functionalities: List[str], **kwargs) -> Dict[str, Any]:
539 | """
540 | Execute specific CloudWatch optimization functionalities with intelligent orchestration.
541 |
542 | This method allows users to run a subset of the available functionalities with the same
543 | intelligent orchestration, cost-aware execution, and comprehensive reporting as the
544 | comprehensive analysis.
545 |
546 | Args:
547 | functionalities: List of specific functionalities to execute
548 | **kwargs: Analysis parameters (same as comprehensive analysis)
549 |
550 | Returns:
551 | Dictionary containing results for the specified functionalities
552 | """
553 | # Validate requested functionalities
554 | valid_functionalities = [f for f in functionalities if f in self.supported_functionalities]
555 |
556 | if not valid_functionalities:
557 | return {
558 | 'status': 'error',
559 | 'error_message': 'No valid functionalities specified',
560 | 'requested_functionalities': functionalities,
561 | 'supported_functionalities': self.supported_functionalities
562 | }
563 |
564 | # Execute comprehensive analysis with filtered functionalities
565 | kwargs['functionalities'] = valid_functionalities
566 |
567 | log_cloudwatch_operation(self.logger, "specific_functionalities_execution",
568 | requested=functionalities,
569 | valid=valid_functionalities,
570 | session_id=self.session_id)
571 |
572 | return await self.execute_comprehensive_optimization_analysis(**kwargs)
573 |
574 | def get_tool_info(self) -> Dict[str, Any]:
575 | """Get information about the optimization tool."""
576 | return {
577 | 'tool_name': 'CloudWatch Optimization Tool',
578 | 'version': self.tool_version,
579 | 'supported_functionalities': self.supported_functionalities,
580 | 'session_id': self.session_id,
581 | 'region': self.region,
582 | 'capabilities': [
583 | 'Comprehensive CloudWatch cost analysis with all 4 functionalities',
584 | 'Intelligent parallel execution with cost-aware priority-based orchestration',
585 | 'Configurable analysis scope (specific functionalities, resource filtering, time ranges)',
586 | 'Cost-aware analysis with user consent workflows and detailed cost estimation',
587 | 'Cross-analysis insights and correlations for optimization synergies',
588 | 'Executive summary generation with actionable recommendations',
589 | 'SQL-queryable session data storage with comprehensive metadata',
590 | 'Enhanced reporting with implementation guidance and risk assessment'
591 | ],
592 | 'cost_control_features': [
593 | 'Free operations prioritized (60% functionality coverage)',
594 | 'Granular user-controlled paid operations with explicit consent',
595 | 'Detailed cost estimation before analysis execution',
596 | 'Transparent cost tracking with operation-level visibility',
597 | 'Functionality coverage reporting with cost justification',
598 | 'Graceful degradation to free operations when paid features disabled'
599 | ],
600 | 'intelligent_orchestration_features': [
601 | 'Priority-based execution ordering (cost_impact, execution_time, balanced)',
602 | 'Parallel batch execution with configurable concurrency',
603 | 'Intelligent resource filtering and scope configuration',
604 | 'Cross-analysis correlation and synergy identification',
605 | 'Executive summary with implementation roadmap',
606 | 'Enhanced error handling with partial result recovery'
607 | ]
608 | }
609 |
610 | async def _validate_cost_preferences_with_consent(self, **kwargs) -> Dict[str, Any]:
611 | """
612 | Validate cost preferences with detailed consent workflow.
613 |
614 | Args:
615 | **kwargs: Analysis parameters including cost preferences
616 |
617 | Returns:
618 | Dictionary containing validation results and consent details
619 | """
620 | try:
621 | # Validate basic cost preferences
622 | cost_validation = self.orchestrator.validate_cost_preferences(**kwargs)
623 |
624 | if cost_validation.get('validation_status') != 'success':
625 | # Convert validation_status to status for consistency
626 | cost_validation['status'] = cost_validation.get('validation_status', 'error')
627 | return cost_validation
628 |
629 | # Add consent workflow details
630 | cost_validation['consent_workflow'] = {
631 | 'consent_required': any([
632 | kwargs.get('allow_cost_explorer', False),
633 | kwargs.get('allow_aws_config', False),
634 | kwargs.get('allow_cloudtrail', False),
635 | kwargs.get('allow_minimal_cost_metrics', False)
636 | ]),
637 | 'consent_timestamp': datetime.now().isoformat(),
638 | 'consent_details': {
639 | 'cost_explorer_consent': kwargs.get('allow_cost_explorer', False),
640 | 'aws_config_consent': kwargs.get('allow_aws_config', False),
641 | 'cloudtrail_consent': kwargs.get('allow_cloudtrail', False),
642 | 'minimal_cost_metrics_consent': kwargs.get('allow_minimal_cost_metrics', False)
643 | }
644 | }
645 |
646 | # Ensure status field is set for consistency
647 | cost_validation['status'] = cost_validation.get('validation_status', 'success')
648 |
649 | return cost_validation
650 |
651 | except Exception as e:
652 | import traceback
653 | full_traceback = traceback.format_exc()
654 | error_message = str(e)
655 | self.logger.error(f"Cost preference validation failed: {error_message}")
656 | self.logger.error(f"Full traceback: {full_traceback}")
657 | return {
658 | 'status': 'error',
659 | 'error_message': f"Cost preference validation failed: {error_message}",
660 | 'error_type': e.__class__.__name__,
661 | 'full_exception_details': {
662 | 'traceback': full_traceback,
663 | 'error_location': self._extract_error_location(full_traceback)
664 | },
665 | 'validation_context': kwargs
666 | }
667 |
668 | async def _get_detailed_cost_estimate(self, **kwargs) -> Dict[str, Any]:
669 | """
670 | Get detailed cost estimate with user consent workflow.
671 |
672 | Args:
673 | **kwargs: Analysis parameters
674 |
675 | Returns:
676 | Dictionary containing detailed cost estimation
677 | """
678 | try:
679 | # Get basic cost estimate
680 | cost_estimate_result = self.orchestrator.get_cost_estimate(**kwargs)
681 |
682 | if cost_estimate_result.get('status') == 'error':
683 | return cost_estimate_result
684 |
685 | # Add detailed cost breakdown and consent workflow
686 | cost_estimate_result['detailed_breakdown'] = {
687 | 'free_operations_cost': 0.0,
688 | 'paid_operations_estimate': cost_estimate_result.get('cost_estimate', {}).get('total_estimated_cost', 0.0),
689 | 'cost_by_service': {
690 | 'cost_explorer': 0.01 if kwargs.get('allow_cost_explorer', False) else 0.0,
691 | 'aws_config': 0.003 if kwargs.get('allow_aws_config', False) else 0.0,
692 | 'cloudtrail': 0.10 if kwargs.get('allow_cloudtrail', False) else 0.0,
693 | 'minimal_cost_metrics': 0.01 if kwargs.get('allow_minimal_cost_metrics', False) else 0.0
694 | },
695 | 'cost_justification': self._generate_cost_justification(**kwargs)
696 | }
697 |
698 | cost_estimate_result['status'] = 'success'
699 | return cost_estimate_result
700 |
701 | except Exception as e:
702 | return {
703 | 'status': 'error',
704 | 'error_message': f"Cost estimation failed: {str(e)}",
705 | 'estimation_context': kwargs
706 | }
707 |
708 | def _generate_cost_justification(self, **kwargs) -> List[Dict[str, Any]]:
709 | """Generate cost justification for enabled paid features."""
710 | justifications = []
711 |
712 | if kwargs.get('allow_cost_explorer', False):
713 | justifications.append({
714 | 'service': 'cost_explorer',
715 | 'cost': 0.01,
716 | 'justification': 'Provides historical cost trends and detailed spend analysis',
717 | 'functionality_gain': '30% additional analysis coverage'
718 | })
719 |
720 | if kwargs.get('allow_aws_config', False):
721 | justifications.append({
722 | 'service': 'aws_config',
723 | 'cost': 0.003,
724 | 'justification': 'Enables compliance checking and governance analysis',
725 | 'functionality_gain': '5% additional analysis coverage'
726 | })
727 |
728 | if kwargs.get('allow_cloudtrail', False):
729 | justifications.append({
730 | 'service': 'cloudtrail',
731 | 'cost': 0.10,
732 | 'justification': 'Provides usage pattern analysis and access insights',
733 | 'functionality_gain': '1% additional analysis coverage'
734 | })
735 |
736 | if kwargs.get('allow_minimal_cost_metrics', False):
737 | justifications.append({
738 | 'service': 'minimal_cost_metrics',
739 | 'cost': 0.01,
740 | 'justification': 'Enables detailed log ingestion pattern analysis',
741 | 'functionality_gain': '4% additional analysis coverage'
742 | })
743 |
744 | return justifications
745 |
746 | def _configure_analysis_scope(self, **kwargs) -> Dict[str, Any]:
747 | """
748 | Configure intelligent analysis scope with resource filtering and time ranges.
749 |
750 | Args:
751 | **kwargs: Analysis parameters
752 |
753 | Returns:
754 | Dictionary containing configured analysis scope
755 | """
756 | # Base scope configuration
757 | scope = {
758 | 'temporal_scope': {
759 | 'lookback_days': kwargs.get('lookback_days', 30),
760 | 'start_date': kwargs.get('start_date'),
761 | 'end_date': kwargs.get('end_date'),
762 | 'analysis_granularity': kwargs.get('analysis_granularity', 'daily')
763 | },
764 | 'resource_filters': {
765 | 'log_group_names': kwargs.get('log_group_names', []),
766 | 'log_group_patterns': kwargs.get('log_group_patterns', []),
767 | 'alarm_names': kwargs.get('alarm_names', []),
768 | 'alarm_patterns': kwargs.get('alarm_patterns', []),
769 | 'dashboard_names': kwargs.get('dashboard_names', []),
770 | 'metric_namespaces': kwargs.get('metric_namespaces', []),
771 | 'exclude_patterns': kwargs.get('exclude_patterns', [])
772 | },
773 | 'analysis_depth': {
774 | 'include_detailed_metrics': kwargs.get('include_detailed_metrics', True),
775 | 'include_cost_analysis': kwargs.get('include_cost_analysis', True),
776 | 'include_governance_checks': kwargs.get('include_governance_checks', True),
777 | 'include_optimization_recommendations': kwargs.get('include_optimization_recommendations', True)
778 | },
779 | 'performance_constraints': {
780 | 'max_parallel_analyses': kwargs.get('max_parallel_analyses', 4),
781 | 'timeout_per_analysis': kwargs.get('timeout_per_analysis', 120),
782 | 'memory_limit_mb': kwargs.get('memory_limit_mb', 1024),
783 | 'enable_caching': kwargs.get('enable_caching', True)
784 | }
785 | }
786 |
787 | # Validate and sanitize scope
788 | scope = self._validate_analysis_scope(scope)
789 |
790 | return scope
791 |
792 | def _validate_analysis_scope(self, scope: Dict[str, Any]) -> Dict[str, Any]:
793 | """Validate and sanitize analysis scope parameters."""
794 | # Validate temporal scope
795 | lookback_days = scope['temporal_scope']['lookback_days']
796 | if lookback_days < 1 or lookback_days > 365:
797 | scope['temporal_scope']['lookback_days'] = min(max(lookback_days, 1), 365)
798 |
799 | # Validate performance constraints
800 | max_parallel = scope['performance_constraints']['max_parallel_analyses']
801 | if max_parallel < 1 or max_parallel > 8:
802 | scope['performance_constraints']['max_parallel_analyses'] = min(max(max_parallel, 1), 8)
803 |
804 | timeout = scope['performance_constraints']['timeout_per_analysis']
805 | if timeout < 30 or timeout > 600:
806 | scope['performance_constraints']['timeout_per_analysis'] = min(max(timeout, 30), 600)
807 |
808 | return scope
809 |
810 | def _create_intelligent_execution_plan(self, requested_functionalities: List[str],
811 | cost_preferences: Dict[str, Any],
812 | priority_mode: str,
813 | analysis_scope: Dict[str, Any]) -> Dict[str, Any]:
814 | """
815 | Create intelligent execution plan with cost-aware priority-based ordering.
816 |
817 | Args:
818 | requested_functionalities: List of requested analysis functionalities
819 | cost_preferences: Validated cost preferences
820 | priority_mode: Priority mode ('cost_impact', 'execution_time', 'balanced')
821 | analysis_scope: Configured analysis scope
822 |
823 | Returns:
824 | Dictionary containing execution plan with prioritized functionalities
825 | """
826 | # Validate requested functionalities
827 | valid_functionalities = [f for f in requested_functionalities if f in self.supported_functionalities]
828 |
829 | # Define functionality metadata for prioritization
830 | functionality_metadata = {
831 | 'general_spend': {
832 | 'cost_impact_score': 10, # High cost impact
833 | 'execution_time_score': 3, # Medium execution time
834 | 'data_requirements': ['cost_explorer', 'cloudwatch_apis'],
835 | 'dependencies': []
836 | },
837 | 'logs_optimization': {
838 | 'cost_impact_score': 9, # High cost impact
839 | 'execution_time_score': 4, # Medium-high execution time
840 | 'data_requirements': ['cost_explorer', 'cloudwatch_logs_apis', 'minimal_cost_metrics'],
841 | 'dependencies': []
842 | },
843 | 'metrics_optimization': {
844 | 'cost_impact_score': 8, # High cost impact
845 | 'execution_time_score': 5, # High execution time
846 | 'data_requirements': ['cost_explorer', 'cloudwatch_apis', 'minimal_cost_metrics'],
847 | 'dependencies': []
848 | },
849 | 'alarms_and_dashboards': {
850 | 'cost_impact_score': 6, # Medium cost impact
851 | 'execution_time_score': 2, # Low execution time
852 | 'data_requirements': ['cloudwatch_apis', 'aws_config', 'cloudtrail'],
853 | 'dependencies': []
854 | }
855 | }
856 |
857 | # Calculate priority scores based on mode
858 | prioritized_functionalities = []
859 | for functionality in valid_functionalities:
860 | metadata = functionality_metadata.get(functionality, {})
861 |
862 | if priority_mode == 'cost_impact':
863 | priority_score = metadata.get('cost_impact_score', 5)
864 | elif priority_mode == 'execution_time':
865 | priority_score = 10 - metadata.get('execution_time_score', 5) # Invert for faster first
866 | else: # balanced
867 | cost_score = metadata.get('cost_impact_score', 5)
868 | time_score = 10 - metadata.get('execution_time_score', 5)
869 | priority_score = (cost_score * 0.7) + (time_score * 0.3)
870 |
871 | # Adjust score based on data availability
872 | data_requirements = metadata.get('data_requirements', [])
873 | availability_multiplier = self._calculate_data_availability_multiplier(
874 | data_requirements, cost_preferences
875 | )
876 |
877 | final_score = priority_score * availability_multiplier
878 |
879 | prioritized_functionalities.append({
880 | 'functionality': functionality,
881 | 'priority_score': final_score,
882 | 'metadata': metadata,
883 | 'data_availability_multiplier': availability_multiplier
884 | })
885 |
886 | # Sort by priority score (highest first)
887 | prioritized_functionalities.sort(key=lambda x: x['priority_score'], reverse=True)
888 |
889 | execution_plan = {
890 | 'valid_functionalities': [f['functionality'] for f in prioritized_functionalities],
891 | 'prioritized_execution_order': prioritized_functionalities,
892 | 'priority_mode': priority_mode,
893 | 'total_functionalities': len(valid_functionalities),
894 | 'execution_batches': self._create_execution_batches(
895 | prioritized_functionalities, analysis_scope['performance_constraints']['max_parallel_analyses']
896 | )
897 | }
898 |
899 | return execution_plan
900 |
901 | def _calculate_data_availability_multiplier(self, data_requirements: List[str],
902 | cost_preferences: Dict[str, Any]) -> float:
903 | """Calculate data availability multiplier based on enabled cost preferences."""
904 | if not data_requirements:
905 | return 1.0
906 |
907 | available_sources = 0
908 | total_sources = len(data_requirements)
909 |
910 | for requirement in data_requirements:
911 | if requirement == 'cost_explorer' and cost_preferences.get('allow_cost_explorer', False):
912 | available_sources += 1
913 | elif requirement == 'aws_config' and cost_preferences.get('allow_aws_config', False):
914 | available_sources += 1
915 | elif requirement == 'cloudtrail' and cost_preferences.get('allow_cloudtrail', False):
916 | available_sources += 1
917 | elif requirement == 'minimal_cost_metrics' and cost_preferences.get('allow_minimal_cost_metrics', False):
918 | available_sources += 1
919 | elif requirement in ['cloudwatch_apis', 'cloudwatch_logs_apis']:
920 | available_sources += 1 # Always available (free)
921 |
922 | return available_sources / total_sources if total_sources > 0 else 1.0
923 |
924 | def _create_execution_batches(self, prioritized_functionalities: List[Dict[str, Any]],
925 | max_parallel: int) -> List[List[str]]:
926 | """Create execution batches for parallel processing."""
927 | batches = []
928 | current_batch = []
929 |
930 | for functionality_info in prioritized_functionalities:
931 | functionality = functionality_info['functionality']
932 |
933 | if len(current_batch) < max_parallel:
934 | current_batch.append(functionality)
935 | else:
936 | batches.append(current_batch)
937 | current_batch = [functionality]
938 |
939 | if current_batch:
940 | batches.append(current_batch)
941 |
942 | return batches
943 |
944 | async def _execute_parallel_analyses_with_orchestration(self, execution_plan: Dict[str, Any],
945 | analysis_scope: Dict[str, Any],
946 | cost_preferences: Dict[str, Any],
947 | **kwargs) -> Dict[str, Any]:
948 | """
949 | Execute analyses in parallel with intelligent orchestration.
950 |
951 | Args:
952 | execution_plan: Intelligent execution plan
953 | analysis_scope: Configured analysis scope
954 | cost_preferences: Validated cost preferences
955 | **kwargs: Additional analysis parameters
956 |
957 | Returns:
958 | Dictionary containing parallel execution results
959 | """
960 | parallel_start_time = datetime.now()
961 |
962 | log_cloudwatch_operation(self.logger, "parallel_execution_start",
963 | total_functionalities=len(execution_plan['valid_functionalities']),
964 | execution_batches=len(execution_plan['execution_batches']))
965 |
966 | try:
967 | # Execute analyses in batches for optimal parallel processing
968 | all_results = {}
969 | batch_results = []
970 |
971 | for batch_index, batch in enumerate(execution_plan['execution_batches']):
972 | batch_start_time = datetime.now()
973 |
974 | log_cloudwatch_operation(self.logger, "executing_batch",
975 | batch_index=batch_index,
976 | batch_functionalities=batch)
977 |
978 | # Execute batch in parallel
979 | batch_tasks = []
980 | for functionality in batch:
981 | task_kwargs = {
982 | **kwargs,
983 | 'analysis_scope': analysis_scope,
984 | 'cost_preferences': cost_preferences,
985 | 'execution_context': {
986 | 'batch_index': batch_index,
987 | 'functionality': functionality,
988 | 'priority_score': next(
989 | f['priority_score'] for f in execution_plan['prioritized_execution_order']
990 | if f['functionality'] == functionality
991 | )
992 | }
993 | }
994 |
995 | task = asyncio.create_task(
996 | self.orchestrator.execute_analysis(functionality, **task_kwargs)
997 | )
998 | batch_tasks.append((functionality, task))
999 |
1000 | # Wait for batch completion with timeout
1001 | timeout = analysis_scope['performance_constraints']['timeout_per_analysis']
1002 | batch_results_dict = {}
1003 |
1004 | for functionality, task in batch_tasks:
1005 | try:
1006 | result = await asyncio.wait_for(task, timeout=timeout)
1007 | batch_results_dict[functionality] = result
1008 | all_results[functionality] = result
1009 | except asyncio.TimeoutError:
1010 | error_result = {
1011 | 'status': 'timeout',
1012 | 'error_message': f'Analysis timed out after {timeout} seconds',
1013 | 'functionality': functionality,
1014 | 'batch_index': batch_index
1015 | }
1016 | batch_results_dict[functionality] = error_result
1017 | all_results[functionality] = error_result
1018 | except Exception as e:
1019 | error_result = {
1020 | 'status': 'error',
1021 | 'error_message': str(e),
1022 | 'functionality': functionality,
1023 | 'batch_index': batch_index
1024 | }
1025 | batch_results_dict[functionality] = error_result
1026 | all_results[functionality] = error_result
1027 |
1028 | batch_execution_time = (datetime.now() - batch_start_time).total_seconds()
1029 |
1030 | batch_summary = {
1031 | 'batch_index': batch_index,
1032 | 'functionalities': batch,
1033 | 'execution_time': batch_execution_time,
1034 | 'successful_analyses': len([r for r in batch_results_dict.values() if r.get('status') == 'success']),
1035 | 'failed_analyses': len([r for r in batch_results_dict.values() if r.get('status') in ['error', 'timeout']]),
1036 | 'results': batch_results_dict
1037 | }
1038 |
1039 | batch_results.append(batch_summary)
1040 |
1041 | log_cloudwatch_operation(self.logger, "batch_execution_complete",
1042 | batch_index=batch_index,
1043 | execution_time=batch_execution_time,
1044 | successful=batch_summary['successful_analyses'],
1045 | failed=batch_summary['failed_analyses'])
1046 |
1047 | total_parallel_time = (datetime.now() - parallel_start_time).total_seconds()
1048 |
1049 | # Compile parallel execution summary
1050 | parallel_summary = {
1051 | 'status': 'success',
1052 | 'total_execution_time': total_parallel_time,
1053 | 'total_functionalities': len(execution_plan['valid_functionalities']),
1054 | 'successful_analyses': len([r for r in all_results.values() if r.get('status') == 'success']),
1055 | 'failed_analyses': len([r for r in all_results.values() if r.get('status') in ['error', 'timeout']]),
1056 | 'batch_summaries': batch_results,
1057 | 'individual_results': all_results,
1058 | 'execution_plan': execution_plan,
1059 | 'analysis_scope': analysis_scope
1060 | }
1061 |
1062 | log_cloudwatch_operation(self.logger, "parallel_execution_complete",
1063 | total_time=total_parallel_time,
1064 | successful=parallel_summary['successful_analyses'],
1065 | failed=parallel_summary['failed_analyses'])
1066 |
1067 | return parallel_summary
1068 |
1069 | except Exception as e:
1070 | self.logger.error(f"Parallel execution failed: {str(e)}")
1071 | return {
1072 | 'status': 'error',
1073 | 'error_message': str(e),
1074 | 'execution_time': (datetime.now() - parallel_start_time).total_seconds(),
1075 | 'execution_plan': execution_plan
1076 | }
1077 |
1078 | async def _execute_cross_analysis_insights(self, parallel_results: Dict[str, Any],
1079 | analysis_scope: Dict[str, Any],
1080 | **kwargs) -> Dict[str, Any]:
1081 | """
1082 | Execute cross-analysis insights and correlations.
1083 |
1084 | Args:
1085 | parallel_results: Results from parallel analysis execution
1086 | analysis_scope: Configured analysis scope
1087 | **kwargs: Additional parameters
1088 |
1089 | Returns:
1090 | Dictionary containing cross-analysis insights
1091 | """
1092 | try:
1093 | # Extract successful results for cross-analysis
1094 | successful_results = {
1095 | k: v for k, v in parallel_results.get('individual_results', {}).items()
1096 | if v.get('status') == 'success'
1097 | }
1098 |
1099 | if len(successful_results) < 2:
1100 | return {
1101 | 'status': 'insufficient_data',
1102 | 'message': 'Cross-analysis requires at least 2 successful analyses',
1103 | 'available_analyses': list(successful_results.keys())
1104 | }
1105 |
1106 | # Perform cross-analysis correlations
1107 | cross_insights = {
1108 | 'cost_correlations': self._analyze_cost_correlations(successful_results),
1109 | 'resource_overlaps': self._analyze_resource_overlaps(successful_results),
1110 | 'optimization_synergies': self._identify_optimization_synergies(successful_results),
1111 | 'priority_recommendations': self._generate_priority_recommendations(successful_results)
1112 | }
1113 |
1114 | return {
1115 | 'status': 'success',
1116 | 'insights': cross_insights,
1117 | 'analyses_included': list(successful_results.keys()),
1118 | 'correlation_strength': self._calculate_correlation_strength(cross_insights)
1119 | }
1120 |
1121 | except Exception as e:
1122 | return {
1123 | 'status': 'error',
1124 | 'error_message': f"Cross-analysis failed: {str(e)}",
1125 | 'available_results': list(parallel_results.get('individual_results', {}).keys())
1126 | }
1127 |
1128 | def _analyze_cost_correlations(self, results: Dict[str, Any]) -> Dict[str, Any]:
1129 | """Analyze cost correlations across different CloudWatch components."""
1130 | correlations = {
1131 | 'high_cost_components': [],
1132 | 'cost_drivers': [],
1133 | 'optimization_impact': {}
1134 | }
1135 |
1136 | # Extract cost data from each analysis
1137 | for analysis_type, result in results.items():
1138 | if 'cost_analysis' in result.get('data', {}):
1139 | cost_data = result['data']['cost_analysis']
1140 | if cost_data.get('monthly_cost', 0) > 100: # High cost threshold
1141 | correlations['high_cost_components'].append({
1142 | 'component': analysis_type,
1143 | 'monthly_cost': cost_data.get('monthly_cost', 0),
1144 | 'cost_trend': cost_data.get('cost_trend', 'unknown')
1145 | })
1146 |
1147 | return correlations
1148 |
1149 | def _analyze_resource_overlaps(self, results: Dict[str, Any]) -> Dict[str, Any]:
1150 | """Analyze resource overlaps and dependencies."""
1151 | overlaps = {
1152 | 'shared_resources': [],
1153 | 'dependency_chains': [],
1154 | 'optimization_conflicts': []
1155 | }
1156 |
1157 | # Identify shared log groups, metrics, alarms across analyses
1158 | resource_usage = {}
1159 | for analysis_type, result in results.items():
1160 | resources = result.get('data', {}).get('resources_analyzed', [])
1161 | for resource in resources:
1162 | if resource not in resource_usage:
1163 | resource_usage[resource] = []
1164 | resource_usage[resource].append(analysis_type)
1165 |
1166 | # Find resources used by multiple analyses
1167 | for resource, analyses in resource_usage.items():
1168 | if len(analyses) > 1:
1169 | overlaps['shared_resources'].append({
1170 | 'resource': resource,
1171 | 'used_by_analyses': analyses,
1172 | 'optimization_coordination_needed': True
1173 | })
1174 |
1175 | return overlaps
1176 |
1177 | def _identify_optimization_synergies(self, results: Dict[str, Any]) -> List[Dict[str, Any]]:
1178 | """Identify optimization synergies across analyses."""
1179 | synergies = []
1180 |
1181 | # Look for complementary optimizations
1182 | if 'logs_optimization' in results and 'metrics_optimization' in results:
1183 | synergies.append({
1184 | 'type': 'logs_metrics_synergy',
1185 | 'description': 'Optimizing log retention can reduce both log storage costs and related custom metrics',
1186 | 'combined_impact': 'high',
1187 | 'implementation_order': ['logs_optimization', 'metrics_optimization']
1188 | })
1189 |
1190 | if 'alarms_and_dashboards' in results and 'metrics_optimization' in results:
1191 | synergies.append({
1192 | 'type': 'alarms_metrics_synergy',
1193 | 'description': 'Removing unused metrics can eliminate related alarms and dashboard widgets',
1194 | 'combined_impact': 'medium',
1195 | 'implementation_order': ['metrics_optimization', 'alarms_and_dashboards']
1196 | })
1197 |
1198 | return synergies
1199 |
1200 | def _generate_priority_recommendations(self, results: Dict[str, Any]) -> List[Dict[str, Any]]:
1201 | """Generate priority recommendations based on cross-analysis."""
1202 | priorities = []
1203 |
1204 | # Aggregate all recommendations and prioritize
1205 | all_recommendations = []
1206 | for analysis_type, result in results.items():
1207 | recommendations = result.get('recommendations', [])
1208 | for rec in recommendations:
1209 | rec['source_analysis'] = analysis_type
1210 | all_recommendations.append(rec)
1211 |
1212 | # Sort by priority and potential savings
1213 | priority_order = {'critical': 4, 'high': 3, 'medium': 2, 'low': 1}
1214 | sorted_recs = sorted(
1215 | all_recommendations,
1216 | key=lambda x: (
1217 | priority_order.get(x.get('priority', 'low'), 1),
1218 | x.get('potential_savings', 0)
1219 | ),
1220 | reverse=True
1221 | )
1222 |
1223 | # Take top 5 cross-analysis priorities
1224 | return sorted_recs[:5]
1225 |
1226 | def _calculate_correlation_strength(self, insights: Dict[str, Any]) -> str:
1227 | """Calculate overall correlation strength between analyses."""
1228 | correlation_factors = 0
1229 |
1230 | if insights.get('cost_correlations', {}).get('high_cost_components'):
1231 | correlation_factors += 1
1232 |
1233 | if insights.get('resource_overlaps', {}).get('shared_resources'):
1234 | correlation_factors += 1
1235 |
1236 | if insights.get('optimization_synergies'):
1237 | correlation_factors += 1
1238 |
1239 | if correlation_factors >= 3:
1240 | return 'strong'
1241 | elif correlation_factors >= 2:
1242 | return 'moderate'
1243 | elif correlation_factors >= 1:
1244 | return 'weak'
1245 | else:
1246 | return 'minimal'
1247 |
1248 | async def _generate_executive_summary(self, parallel_results: Dict[str, Any],
1249 | cross_analysis_insights: Dict[str, Any],
1250 | execution_plan: Dict[str, Any],
1251 | **kwargs) -> Dict[str, Any]:
1252 | """
1253 | Generate executive summary with actionable insights.
1254 |
1255 | Args:
1256 | parallel_results: Results from parallel analysis execution
1257 | cross_analysis_insights: Cross-analysis insights
1258 | execution_plan: Execution plan used
1259 | **kwargs: Additional parameters
1260 |
1261 | Returns:
1262 | Dictionary containing executive summary
1263 | """
1264 | try:
1265 | successful_results = {
1266 | k: v for k, v in parallel_results.get('individual_results', {}).items()
1267 | if v.get('status') == 'success'
1268 | }
1269 |
1270 | # Calculate key metrics
1271 | total_potential_savings = 0
1272 | critical_issues = 0
1273 | high_priority_recommendations = 0
1274 |
1275 | for result in successful_results.values():
1276 | recommendations = result.get('recommendations', [])
1277 | for rec in recommendations:
1278 | total_potential_savings += rec.get('potential_savings', 0)
1279 | if rec.get('priority') == 'critical':
1280 | critical_issues += 1
1281 | elif rec.get('priority') == 'high':
1282 | high_priority_recommendations += 1
1283 |
1284 | # Generate executive summary
1285 | executive_summary = {
1286 | 'analysis_overview': {
1287 | 'total_analyses_requested': len(execution_plan['valid_functionalities']),
1288 | 'successful_analyses': parallel_results.get('successful_analyses', 0),
1289 | 'failed_analyses': parallel_results.get('failed_analyses', 0),
1290 | 'total_execution_time': parallel_results.get('total_execution_time', 0),
1291 | 'analysis_coverage': f"{(parallel_results.get('successful_analyses', 0) / len(execution_plan['valid_functionalities']) * 100):.1f}%"
1292 | },
1293 | 'key_findings': {
1294 | 'total_potential_monthly_savings': total_potential_savings,
1295 | 'critical_issues_identified': critical_issues,
1296 | 'high_priority_recommendations': high_priority_recommendations,
1297 | 'cross_analysis_correlation': cross_analysis_insights.get('correlation_strength', 'unknown'),
1298 | 'optimization_synergies_found': len(cross_analysis_insights.get('insights', {}).get('optimization_synergies', []))
1299 | },
1300 | 'immediate_actions': self._generate_immediate_actions(successful_results, cross_analysis_insights),
1301 | 'strategic_recommendations': self._generate_strategic_recommendations(successful_results, cross_analysis_insights),
1302 | 'implementation_roadmap': self._generate_implementation_roadmap(successful_results, cross_analysis_insights),
1303 | 'cost_impact_summary': {
1304 | 'estimated_monthly_savings': total_potential_savings,
1305 | 'roi_timeline': '1-3 months',
1306 | 'implementation_effort': self._assess_implementation_effort(successful_results),
1307 | 'risk_level': 'low'
1308 | }
1309 | }
1310 |
1311 | return {
1312 | 'status': 'success',
1313 | 'executive_summary': executive_summary,
1314 | 'generated_at': datetime.now().isoformat()
1315 | }
1316 |
1317 | except Exception as e:
1318 | return {
1319 | 'status': 'error',
1320 | 'error_message': f"Executive summary generation failed: {str(e)}",
1321 | 'fallback_summary': {
1322 | 'message': 'Executive summary generation failed, but detailed analysis results are available',
1323 | 'successful_analyses': parallel_results.get('successful_analyses', 0),
1324 | 'total_analyses': len(execution_plan.get('valid_functionalities', []))
1325 | }
1326 | }
1327 |
1328 | def _generate_immediate_actions(self, results: Dict[str, Any],
1329 | cross_insights: Dict[str, Any]) -> List[Dict[str, Any]]:
1330 | """Generate immediate action items."""
1331 | actions = []
1332 |
1333 | # Extract critical recommendations
1334 | for analysis_type, result in results.items():
1335 | recommendations = result.get('recommendations', [])
1336 | critical_recs = [r for r in recommendations if r.get('priority') == 'critical']
1337 |
1338 | for rec in critical_recs[:2]: # Top 2 critical per analysis
1339 | actions.append({
1340 | 'action': rec.get('title', 'Critical optimization'),
1341 | 'source': analysis_type,
1342 | 'timeline': 'Within 1 week',
1343 | 'impact': rec.get('potential_savings', 0),
1344 | 'effort': rec.get('implementation_effort', 'medium')
1345 | })
1346 |
1347 | return actions[:5] # Top 5 immediate actions
1348 |
1349 | def _generate_strategic_recommendations(self, results: Dict[str, Any],
1350 | cross_insights: Dict[str, Any]) -> List[Dict[str, Any]]:
1351 | """Generate strategic recommendations."""
1352 | strategic = []
1353 |
1354 | # Add synergy-based recommendations
1355 | synergies = cross_insights.get('insights', {}).get('optimization_synergies', [])
1356 | for synergy in synergies:
1357 | strategic.append({
1358 | 'recommendation': synergy.get('description', 'Synergistic optimization'),
1359 | 'type': 'synergy',
1360 | 'impact': synergy.get('combined_impact', 'medium'),
1361 | 'timeline': '1-3 months'
1362 | })
1363 |
1364 | # Add governance recommendations
1365 | strategic.append({
1366 | 'recommendation': 'Establish CloudWatch cost governance policies',
1367 | 'type': 'governance',
1368 | 'impact': 'high',
1369 | 'timeline': '2-4 months',
1370 | 'description': 'Implement automated policies to prevent future CloudWatch cost waste'
1371 | })
1372 |
1373 | return strategic
1374 |
1375 | def _generate_implementation_roadmap(self, results: Dict[str, Any],
1376 | cross_insights: Dict[str, Any]) -> Dict[str, List[str]]:
1377 | """Generate implementation roadmap."""
1378 | return {
1379 | 'phase_1_immediate': [
1380 | 'Address critical cost optimization opportunities',
1381 | 'Remove unused alarms and dashboards',
1382 | 'Optimize log retention policies'
1383 | ],
1384 | 'phase_2_short_term': [
1385 | 'Implement metrics optimization recommendations',
1386 | 'Establish monitoring for optimization progress',
1387 | 'Create cost governance policies'
1388 | ],
1389 | 'phase_3_long_term': [
1390 | 'Implement automated cost optimization workflows',
1391 | 'Establish regular optimization review cycles',
1392 | 'Integrate with broader cloud cost management strategy'
1393 | ]
1394 | }
1395 |
1396 | def _assess_implementation_effort(self, results: Dict[str, Any]) -> str:
1397 | """Assess overall implementation effort."""
1398 | total_recommendations = sum(
1399 | len(result.get('recommendations', [])) for result in results.values()
1400 | )
1401 |
1402 | if total_recommendations > 20:
1403 | return 'high'
1404 | elif total_recommendations > 10:
1405 | return 'medium'
1406 | else:
1407 | return 'low'
1408 |
1409 | def cleanup(self):
1410 | """Clean up tool resources."""
1411 | try:
1412 | self.orchestrator.cleanup_session()
1413 | log_cloudwatch_operation(self.logger, "optimization_tool_cleanup_complete",
1414 | session_id=self.session_id)
1415 | except Exception as e:
1416 | self.logger.error(f"Error during tool cleanup: {str(e)}")
1417 |
1418 | def _calculate_recommendation_confidence(self, recommendation: Dict[str, Any]) -> float:
1419 | """Calculate confidence score for a recommendation."""
1420 | confidence = 0.5 # Base confidence
1421 |
1422 | # Increase confidence based on source
1423 | source = recommendation.get('source', '')
1424 | if 'individual_analysis' in source:
1425 | confidence += 0.3
1426 | elif 'cross_analysis' in source:
1427 | confidence += 0.4
1428 | elif 'executive_summary' in source:
1429 | confidence += 0.2
1430 |
1431 | # Increase confidence based on priority
1432 | priority = recommendation.get('priority', 'low')
1433 | if priority == 'critical':
1434 | confidence += 0.3
1435 | elif priority == 'high':
1436 | confidence += 0.2
1437 | elif priority == 'medium':
1438 | confidence += 0.1
1439 |
1440 | # Increase confidence if potential savings are specified
1441 | if recommendation.get('potential_savings', 0) > 0:
1442 | confidence += 0.1
1443 |
1444 | return min(confidence, 1.0)
1445 |
1446 | def _calculate_execution_efficiency(self, parallel_results: Dict[str, Any],
1447 | execution_plan: Dict[str, Any]) -> float:
1448 | """Calculate execution efficiency based on parallel performance."""
1449 | total_time = parallel_results.get('total_execution_time', 0)
1450 | total_analyses = len(execution_plan.get('valid_functionalities', []))
1451 |
1452 | if total_time == 0 or total_analyses == 0:
1453 | return 0.0
1454 |
1455 | # Theoretical sequential time (assuming 60s per analysis)
1456 | theoretical_sequential_time = total_analyses * 60
1457 |
1458 | # Efficiency is the ratio of theoretical time to actual time
1459 | efficiency = min(theoretical_sequential_time / total_time, 1.0)
1460 |
1461 | return efficiency
1462 |
1463 | def _extract_cost_incurred(self, parallel_results: Dict[str, Any]) -> bool:
1464 | """Extract whether any cost was incurred during analysis."""
1465 | individual_results = parallel_results.get('individual_results', {})
1466 |
1467 | for result in individual_results.values():
1468 | if result.get('cost_incurred', False):
1469 | return True
1470 |
1471 | return False
1472 |
1473 | def _extract_cost_operations(self, parallel_results: Dict[str, Any]) -> List[str]:
1474 | """Extract all cost-incurring operations from parallel results."""
1475 | all_operations = []
1476 | individual_results = parallel_results.get('individual_results', {})
1477 |
1478 | for result in individual_results.values():
1479 | operations = result.get('cost_incurring_operations', [])
1480 | all_operations.extend(operations)
1481 |
1482 | return list(set(all_operations)) # Remove duplicates
1483 |
1484 | def _extract_primary_data_sources(self, parallel_results: Dict[str, Any]) -> List[str]:
1485 | """Extract primary data sources used across analyses."""
1486 | sources = []
1487 | individual_results = parallel_results.get('individual_results', {})
1488 |
1489 | for result in individual_results.values():
1490 | source = result.get('primary_data_source')
1491 | if source and source not in sources:
1492 | sources.append(source)
1493 |
1494 | return sources
1495 |
1496 | def _extract_fallback_usage(self, parallel_results: Dict[str, Any]) -> Dict[str, Any]:
1497 | """Extract fallback usage information."""
1498 | fallback_info = {
1499 | 'fallback_used': False,
1500 | 'analyses_with_fallback': [],
1501 | 'fallback_reasons': []
1502 | }
1503 |
1504 | individual_results = parallel_results.get('individual_results', {})
1505 |
1506 | for analysis_type, result in individual_results.items():
1507 | if result.get('fallback_used', False):
1508 | fallback_info['fallback_used'] = True
1509 | fallback_info['analyses_with_fallback'].append(analysis_type)
1510 |
1511 | fallback_reason = result.get('fallback_reason', 'Unknown')
1512 | if fallback_reason not in fallback_info['fallback_reasons']:
1513 | fallback_info['fallback_reasons'].append(fallback_reason)
1514 |
1515 | return fallback_info
1516 |
1517 | def _extract_stored_tables(self, parallel_results: Dict[str, Any]) -> List[str]:
1518 | """Extract stored table names from parallel results."""
1519 | tables = []
1520 | individual_results = parallel_results.get('individual_results', {})
1521 |
1522 | for result in individual_results.values():
1523 | session_tables = result.get('session_tables', [])
1524 | tables.extend(session_tables)
1525 |
1526 | return list(set(tables)) # Remove duplicates
1527 |
1528 | def _extract_key_findings_enhanced(self, parallel_results: Dict[str, Any],
1529 | cross_analysis_insights: Dict[str, Any],
1530 | executive_summary: Optional[Dict[str, Any]]) -> List[str]:
1531 | """Extract enhanced key findings from all analysis sources."""
1532 | findings = []
1533 |
1534 | # Parallel execution findings
1535 | successful = parallel_results.get('successful_analyses', 0)
1536 | failed = parallel_results.get('failed_analyses', 0)
1537 | total = successful + failed
1538 |
1539 | if total > 0:
1540 | findings.append(f"Executed {total} CloudWatch optimization analyses with {successful} successful completions")
1541 |
1542 | if failed > 0:
1543 | findings.append(f"{failed} analyses encountered issues but partial results may be available")
1544 |
1545 | # Cost transparency findings
1546 | cost_incurred = self._extract_cost_incurred(parallel_results)
1547 | if cost_incurred:
1548 | cost_ops = self._extract_cost_operations(parallel_results)
1549 | findings.append(f"Analysis used {len(cost_ops)} cost-incurring operations: {', '.join(cost_ops)}")
1550 | else:
1551 | findings.append("Analysis completed using only free AWS operations with no additional charges")
1552 |
1553 | # Execution efficiency findings
1554 | efficiency = self._calculate_execution_efficiency(parallel_results, parallel_results.get('execution_plan', {}))
1555 | if efficiency > 0.8:
1556 | findings.append(f"Parallel execution achieved high efficiency ({efficiency:.1%}) through intelligent orchestration")
1557 | elif efficiency > 0.5:
1558 | findings.append(f"Parallel execution achieved moderate efficiency ({efficiency:.1%})")
1559 |
1560 | # Cross-analysis findings
1561 | if cross_analysis_insights.get('status') == 'success':
1562 | correlation = cross_analysis_insights.get('correlation_strength', 'unknown')
1563 | findings.append(f"Cross-analysis correlation strength: {correlation}")
1564 |
1565 | insights = cross_analysis_insights.get('insights', {})
1566 | synergies = insights.get('optimization_synergies', [])
1567 | if synergies:
1568 | findings.append(f"Identified {len(synergies)} optimization synergies for coordinated implementation")
1569 |
1570 | # Executive summary findings
1571 | if executive_summary and executive_summary.get('status') == 'success':
1572 | exec_data = executive_summary.get('executive_summary', {})
1573 | key_findings = exec_data.get('key_findings', {})
1574 |
1575 | total_savings = key_findings.get('total_potential_monthly_savings', 0)
1576 | if total_savings > 0:
1577 | findings.append(f"Identified potential monthly savings of ${total_savings:.2f}")
1578 |
1579 | critical_issues = key_findings.get('critical_issues_identified', 0)
1580 | if critical_issues > 0:
1581 | findings.append(f"Found {critical_issues} critical issues requiring immediate attention")
1582 |
1583 | return findings
1584 |
1585 | def _determine_optimization_priorities_enhanced(self, recommendations: List[Dict[str, Any]],
1586 | cross_analysis_insights: Dict[str, Any]) -> List[Dict[str, Any]]:
1587 | """Determine enhanced optimization priorities with cross-analysis context."""
1588 | priorities = []
1589 |
1590 | # Critical priority tier
1591 | critical_recs = [r for r in recommendations if r.get('priority') == 'critical']
1592 | if critical_recs:
1593 | priorities.append({
1594 | 'priority_level': 'critical',
1595 | 'title': 'Immediate Action Required',
1596 | 'description': f'{len(critical_recs)} critical optimization opportunities requiring immediate attention',
1597 | 'recommended_timeline': 'Within 1 week',
1598 | 'estimated_impact': sum(r.get('potential_savings', 0) for r in critical_recs[:3]),
1599 | 'recommendations': critical_recs[:3],
1600 | 'implementation_notes': [
1601 | 'Address highest cost impact items first',
1602 | 'Coordinate with stakeholders before implementation',
1603 | 'Monitor impact after each change'
1604 | ]
1605 | })
1606 |
1607 | # High priority tier with synergies
1608 | high_recs = [r for r in recommendations if r.get('priority') == 'high']
1609 | if high_recs:
1610 | synergy_note = ""
1611 | if cross_analysis_insights.get('status') == 'success':
1612 | synergies = cross_analysis_insights.get('insights', {}).get('optimization_synergies', [])
1613 | if synergies:
1614 | synergy_note = f" Consider {len(synergies)} identified synergies for coordinated implementation."
1615 |
1616 | priorities.append({
1617 | 'priority_level': 'high',
1618 | 'title': 'High-Impact Optimizations',
1619 | 'description': f'{len(high_recs)} high-impact optimization opportunities identified.{synergy_note}',
1620 | 'recommended_timeline': 'Within 1 month',
1621 | 'estimated_impact': sum(r.get('potential_savings', 0) for r in high_recs[:5]),
1622 | 'recommendations': high_recs[:5],
1623 | 'synergy_opportunities': cross_analysis_insights.get('insights', {}).get('optimization_synergies', []),
1624 | 'implementation_notes': [
1625 | 'Group related optimizations for batch implementation',
1626 | 'Test changes in non-production environments first',
1627 | 'Document changes for future reference'
1628 | ]
1629 | })
1630 |
1631 | # Strategic implementation guidance
1632 | priorities.append({
1633 | 'priority_level': 'strategic',
1634 | 'title': 'CloudWatch Optimization Strategy',
1635 | 'description': 'Comprehensive approach to CloudWatch cost optimization and governance',
1636 | 'recommended_timeline': 'Ongoing',
1637 | 'strategy_components': [
1638 | '1. Immediate: Address critical cost drivers (>$100/month impact)',
1639 | '2. Short-term: Implement high-impact optimizations (>$50/month impact)',
1640 | '3. Medium-term: Establish governance policies and automation',
1641 | '4. Long-term: Integrate with broader cloud cost management strategy',
1642 | '5. Continuous: Regular optimization reviews and monitoring'
1643 | ],
1644 | 'governance_recommendations': [
1645 | 'Implement CloudWatch cost budgets and alerts',
1646 | 'Establish log retention policies by environment',
1647 | 'Create metrics naming conventions and lifecycle policies',
1648 | 'Implement automated cleanup for unused resources'
1649 | ]
1650 | })
1651 |
1652 | return priorities
1653 |
1654 | def _generate_next_steps_enhanced(self, recommendations: List[Dict[str, Any]],
1655 | functionality_coverage: Dict[str, Any],
1656 | cost_estimate: Dict[str, Any],
1657 | cross_analysis_insights: Dict[str, Any],
1658 | executive_summary: Optional[Dict[str, Any]]) -> List[str]:
1659 | """Generate enhanced next steps with implementation guidance."""
1660 | next_steps = []
1661 |
1662 | # Based on analysis completeness
1663 | overall_coverage = functionality_coverage.get('overall_coverage', 0)
1664 | if overall_coverage < 70:
1665 | next_steps.append(
1666 | f"Consider enabling additional cost features for more comprehensive analysis "
1667 | f"(current coverage: {overall_coverage:.1f}%). Cost Explorer provides the highest value addition."
1668 | )
1669 |
1670 | # Based on cost optimization opportunities
1671 | if recommendations:
1672 | critical_count = len([r for r in recommendations if r.get('priority') == 'critical'])
1673 | high_count = len([r for r in recommendations if r.get('priority') == 'high'])
1674 |
1675 | if critical_count > 0:
1676 | next_steps.append(f"URGENT: Address {critical_count} critical optimization opportunities within 1 week")
1677 |
1678 | if high_count > 0:
1679 | next_steps.append(f"Plan implementation of {high_count} high-impact optimizations within 1 month")
1680 |
1681 | # Based on cross-analysis insights
1682 | if cross_analysis_insights.get('status') == 'success':
1683 | insights = cross_analysis_insights.get('insights', {})
1684 | synergies = insights.get('optimization_synergies', [])
1685 |
1686 | if synergies:
1687 | next_steps.append(f"Coordinate implementation of {len(synergies)} identified optimization synergies for maximum impact")
1688 |
1689 | # Based on executive summary
1690 | if executive_summary and executive_summary.get('status') == 'success':
1691 | exec_data = executive_summary.get('executive_summary', {})
1692 | roadmap = exec_data.get('implementation_roadmap', {})
1693 |
1694 | phase_1 = roadmap.get('phase_1_immediate', [])
1695 | if phase_1:
1696 | next_steps.append(f"Execute Phase 1 immediate actions: {', '.join(phase_1[:2])}")
1697 |
1698 | # General implementation guidance
1699 | next_steps.extend([
1700 | "Review detailed analysis results and prioritize by cost impact and implementation effort",
1701 | "Use session SQL queries to explore data relationships and identify additional optimization patterns",
1702 | "Establish monitoring dashboards to track optimization progress and prevent regression",
1703 | "Schedule quarterly CloudWatch optimization reviews to maintain cost efficiency"
1704 | ])
1705 |
1706 | # Cost-specific guidance
1707 | total_estimated_cost = cost_estimate.get('total_estimated_cost', 0)
1708 | if total_estimated_cost == 0:
1709 | next_steps.append(
1710 | "All analysis completed with free operations. Consider enabling Cost Explorer "
1711 | "for historical trends and more detailed cost attribution analysis."
1712 | )
1713 |
1714 | return next_steps
1715 |
1716 | def _generate_cost_impact_analysis(self, recommendations: List[Dict[str, Any]]) -> Dict[str, Any]:
1717 | """Generate cost impact analysis for implementation planning."""
1718 | total_potential_savings = sum(r.get('potential_savings', 0) for r in recommendations)
1719 |
1720 | # Categorize by impact level
1721 | high_impact = [r for r in recommendations if r.get('potential_savings', 0) > 100]
1722 | medium_impact = [r for r in recommendations if 50 <= r.get('potential_savings', 0) <= 100]
1723 | low_impact = [r for r in recommendations if 0 < r.get('potential_savings', 0) < 50]
1724 |
1725 | return {
1726 | 'total_potential_monthly_savings': total_potential_savings,
1727 | 'annual_savings_projection': total_potential_savings * 12,
1728 | 'impact_distribution': {
1729 | 'high_impact_items': len(high_impact),
1730 | 'medium_impact_items': len(medium_impact),
1731 | 'low_impact_items': len(low_impact)
1732 | },
1733 | 'roi_analysis': {
1734 | 'estimated_implementation_time': '2-4 weeks',
1735 | 'break_even_timeline': '1-2 months',
1736 | 'confidence_level': 'high' if total_potential_savings > 200 else 'medium'
1737 | }
1738 | }
1739 |
1740 | def _generate_risk_assessment(self, recommendations: List[Dict[str, Any]]) -> Dict[str, Any]:
1741 | """Generate risk assessment for optimization implementations."""
1742 | # Assess implementation risks
1743 | high_effort_count = len([r for r in recommendations if r.get('implementation_effort') == 'high'])
1744 | critical_count = len([r for r in recommendations if r.get('priority') == 'critical'])
1745 |
1746 | risk_level = 'low'
1747 | if high_effort_count > 5 or critical_count > 3:
1748 | risk_level = 'medium'
1749 | if high_effort_count > 10 or critical_count > 5:
1750 | risk_level = 'high'
1751 |
1752 | return {
1753 | 'overall_risk_level': risk_level,
1754 | 'risk_factors': {
1755 | 'implementation_complexity': 'medium' if high_effort_count > 3 else 'low',
1756 | 'business_impact': 'low', # CloudWatch optimizations are generally low-risk
1757 | 'rollback_difficulty': 'low' # Most changes are easily reversible
1758 | },
1759 | 'mitigation_strategies': [
1760 | 'Test changes in non-production environments first',
1761 | 'Implement changes incrementally with monitoring',
1762 | 'Maintain backup configurations before making changes',
1763 | 'Coordinate with application teams for log retention changes'
1764 | ]
1765 | }
1766 |
1767 | def _generate_timeline_recommendations(self, recommendations: List[Dict[str, Any]]) -> Dict[str, List[str]]:
1768 | """Generate timeline recommendations for implementation."""
1769 | critical_recs = [r for r in recommendations if r.get('priority') == 'critical']
1770 | high_recs = [r for r in recommendations if r.get('priority') == 'high']
1771 | medium_recs = [r for r in recommendations if r.get('priority') == 'medium']
1772 |
1773 | return {
1774 | 'week_1': [
1775 | f"Address {len(critical_recs)} critical optimization opportunities",
1776 | "Remove unused alarms and dashboards (quick wins)",
1777 | "Optimize log retention policies for high-volume log groups"
1778 | ],
1779 | 'month_1': [
1780 | f"Implement {min(len(high_recs), 5)} high-impact optimizations",
1781 | "Establish CloudWatch cost monitoring and alerting",
1782 | "Create governance policies for new CloudWatch resources"
1783 | ],
1784 | 'quarter_1': [
1785 | f"Complete remaining {len(medium_recs)} medium-priority optimizations",
1786 | "Implement automated cleanup workflows",
1787 | "Conduct optimization impact review and adjust strategies"
1788 | ],
1789 | 'ongoing': [
1790 | "Monthly CloudWatch cost review and optimization",
1791 | "Quarterly comprehensive optimization analysis",
1792 | "Continuous monitoring of optimization effectiveness"
1793 | ]
1794 | }
1795 |
1796 | def _generate_monitoring_recommendations(self, parallel_results: Dict[str, Any]) -> List[Dict[str, Any]]:
1797 | """Generate monitoring recommendations based on analysis results."""
1798 | recommendations = []
1799 |
1800 | # Cost monitoring
1801 | recommendations.append({
1802 | 'type': 'cost_monitoring',
1803 | 'title': 'CloudWatch Cost Monitoring',
1804 | 'description': 'Implement comprehensive cost monitoring for CloudWatch services',
1805 | 'implementation': [
1806 | 'Set up CloudWatch cost budgets with alerts',
1807 | 'Create cost anomaly detection for CloudWatch spending',
1808 | 'Implement daily cost tracking dashboards'
1809 | ]
1810 | })
1811 |
1812 | # Resource monitoring
1813 | recommendations.append({
1814 | 'type': 'resource_monitoring',
1815 | 'title': 'Resource Utilization Monitoring',
1816 | 'description': 'Monitor CloudWatch resource utilization and efficiency',
1817 | 'implementation': [
1818 | 'Track log ingestion rates and patterns',
1819 | 'Monitor alarm state changes and effectiveness',
1820 | 'Track dashboard usage and access patterns'
1821 | ]
1822 | })
1823 |
1824 | # Optimization tracking
1825 | recommendations.append({
1826 | 'type': 'optimization_tracking',
1827 | 'title': 'Optimization Impact Tracking',
1828 | 'description': 'Track the impact of implemented optimizations',
1829 | 'implementation': [
1830 | 'Measure cost reduction from implemented changes',
1831 | 'Track resource cleanup and governance compliance',
1832 | 'Monitor for optimization regression'
1833 | ]
1834 | })
1835 |
1836 | return recommendations
1837 |
1838 | def _extract_error_location(self, traceback_str: str) -> str:
1839 | """
1840 | Extract error location from traceback string.
1841 |
1842 | Args:
1843 | traceback_str: Full traceback string
1844 |
1845 | Returns:
1846 | String describing the error location
1847 | """
1848 | try:
1849 | lines = traceback_str.strip().split('\n')
1850 |
1851 | # Look for the last "File" line which usually contains the actual error location
1852 | for line in reversed(lines):
1853 | if line.strip().startswith('File "') and ', line ' in line:
1854 | # Extract file and line number
1855 | parts = line.strip().split(', line ')
1856 | if len(parts) >= 2:
1857 | file_part = parts[0].replace('File "', '').replace('"', '')
1858 | line_part = parts[1].split(',')[0]
1859 |
1860 | # Get just the filename, not the full path
1861 | filename = file_part.split('/')[-1] if '/' in file_part else file_part
1862 |
1863 | return f"{filename}:line {line_part}"
1864 |
1865 | # Fallback: look for any line with file information
1866 | for line in lines:
1867 | if 'File "' in line and 'line ' in line:
1868 | return line.strip()
1869 |
1870 | return "Error location not found in traceback"
1871 |
1872 | except Exception as e:
1873 | return f"Error parsing traceback: {str(e)}"
```