This is page 19 of 19. Use http://codebase.md/aws-samples/sample-cfm-tips-mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .gitignore
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── diagnose_cost_optimization_hub_v2.py
├── LICENSE
├── logging_config.py
├── mcp_runbooks.json
├── mcp_server_with_runbooks.py
├── playbooks
│ ├── __init__.py
│ ├── aws_lambda
│ │ ├── __init__.py
│ │ └── lambda_optimization.py
│ ├── cloudtrail
│ │ ├── __init__.py
│ │ └── cloudtrail_optimization.py
│ ├── cloudtrail_optimization.py
│ ├── cloudwatch
│ │ ├── __init__.py
│ │ ├── aggregation_queries.py
│ │ ├── alarms_and_dashboards_analyzer.py
│ │ ├── analysis_engine.py
│ │ ├── base_analyzer.py
│ │ ├── cloudwatch_optimization_analyzer.py
│ │ ├── cloudwatch_optimization_tool.py
│ │ ├── cloudwatch_optimization.py
│ │ ├── cost_controller.py
│ │ ├── general_spend_analyzer.py
│ │ ├── logs_optimization_analyzer.py
│ │ ├── metrics_optimization_analyzer.py
│ │ ├── optimization_orchestrator.py
│ │ └── result_processor.py
│ ├── comprehensive_optimization.py
│ ├── ebs
│ │ ├── __init__.py
│ │ └── ebs_optimization.py
│ ├── ebs_optimization.py
│ ├── ec2
│ │ ├── __init__.py
│ │ └── ec2_optimization.py
│ ├── ec2_optimization.py
│ ├── lambda_optimization.py
│ ├── rds
│ │ ├── __init__.py
│ │ └── rds_optimization.py
│ ├── rds_optimization.py
│ └── s3
│ ├── __init__.py
│ ├── analyzers
│ │ ├── __init__.py
│ │ ├── api_cost_analyzer.py
│ │ ├── archive_optimization_analyzer.py
│ │ ├── general_spend_analyzer.py
│ │ ├── governance_analyzer.py
│ │ ├── multipart_cleanup_analyzer.py
│ │ └── storage_class_analyzer.py
│ ├── base_analyzer.py
│ ├── s3_aggregation_queries.py
│ ├── s3_analysis_engine.py
│ ├── s3_comprehensive_optimization_tool.py
│ ├── s3_optimization_orchestrator.py
│ └── s3_optimization.py
├── README.md
├── requirements.txt
├── runbook_functions_extended.py
├── runbook_functions.py
├── RUNBOOKS_GUIDE.md
├── services
│ ├── __init__.py
│ ├── cloudwatch_pricing.py
│ ├── cloudwatch_service_vended_log.py
│ ├── cloudwatch_service.py
│ ├── compute_optimizer.py
│ ├── cost_explorer.py
│ ├── optimization_hub.py
│ ├── performance_insights.py
│ ├── pricing.py
│ ├── s3_pricing.py
│ ├── s3_service.py
│ ├── storage_lens_service.py
│ └── trusted_advisor.py
├── setup.py
├── test_runbooks.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── cloudwatch
│ │ │ └── test_cloudwatch_integration.py
│ │ ├── test_cloudwatch_comprehensive_tool_integration.py
│ │ ├── test_cloudwatch_orchestrator_integration.py
│ │ ├── test_integration_suite.py
│ │ └── test_orchestrator_integration.py
│ ├── legacy
│ │ ├── example_output_with_docs.py
│ │ ├── example_wellarchitected_output.py
│ │ ├── test_aws_session_management.py
│ │ ├── test_cloudwatch_orchestrator_pagination.py
│ │ ├── test_cloudwatch_pagination_integration.py
│ │ ├── test_cloudwatch_performance_optimizations.py
│ │ ├── test_cloudwatch_result_processor.py
│ │ ├── test_cloudwatch_timeout_issue.py
│ │ ├── test_documentation_links.py
│ │ ├── test_metrics_pagination_count.py
│ │ ├── test_orchestrator_integration.py
│ │ ├── test_pricing_cache_fix_moved.py
│ │ ├── test_pricing_cache_fix.py
│ │ ├── test_runbook_integration.py
│ │ ├── test_runbooks.py
│ │ ├── test_setup_verification.py
│ │ └── test_stack_trace_fix.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── cloudwatch
│ │ │ └── test_cloudwatch_performance.py
│ │ ├── test_cloudwatch_parallel_execution.py
│ │ ├── test_parallel_execution.py
│ │ └── test_performance_suite.py
│ ├── pytest-cloudwatch.ini
│ ├── pytest.ini
│ ├── README.md
│ ├── requirements-test.txt
│ ├── run_cloudwatch_tests.py
│ ├── run_tests.py
│ ├── test_setup_verification.py
│ ├── test_suite_main.py
│ └── unit
│ ├── __init__.py
│ ├── analyzers
│ │ ├── __init__.py
│ │ ├── conftest_cloudwatch.py
│ │ ├── test_alarms_and_dashboards_analyzer.py
│ │ ├── test_base_analyzer.py
│ │ ├── test_cloudwatch_base_analyzer.py
│ │ ├── test_cloudwatch_cost_constraints.py
│ │ ├── test_cloudwatch_general_spend_analyzer.py
│ │ ├── test_general_spend_analyzer.py
│ │ ├── test_logs_optimization_analyzer.py
│ │ └── test_metrics_optimization_analyzer.py
│ ├── cloudwatch
│ │ ├── test_cache_control.py
│ │ ├── test_cloudwatch_api_mocking.py
│ │ ├── test_cloudwatch_metrics_pagination.py
│ │ ├── test_cloudwatch_pagination_architecture.py
│ │ ├── test_cloudwatch_pagination_comprehensive_fixed.py
│ │ ├── test_cloudwatch_pagination_comprehensive.py
│ │ ├── test_cloudwatch_pagination_fixed.py
│ │ ├── test_cloudwatch_pagination_real_format.py
│ │ ├── test_cloudwatch_pagination_simple.py
│ │ ├── test_cloudwatch_query_pagination.py
│ │ ├── test_cloudwatch_unit_suite.py
│ │ ├── test_general_spend_tips_refactor.py
│ │ ├── test_import_error.py
│ │ ├── test_mcp_pagination_bug.py
│ │ └── test_mcp_surface_pagination.py
│ ├── s3
│ │ └── live
│ │ ├── test_bucket_listing.py
│ │ ├── test_s3_governance_bucket_discovery.py
│ │ └── test_top_buckets.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── test_cloudwatch_cost_controller.py
│ │ ├── test_cloudwatch_query_service.py
│ │ ├── test_cloudwatch_service.py
│ │ ├── test_cost_control_routing.py
│ │ └── test_s3_service.py
│ └── test_unit_suite.py
└── utils
├── __init__.py
├── aws_client_factory.py
├── cache_decorator.py
├── cleanup_manager.py
├── cloudwatch_cache.py
├── documentation_links.py
├── error_handler.py
├── intelligent_cache.py
├── logging_config.py
├── memory_manager.py
├── parallel_executor.py
├── performance_monitor.py
├── progressive_timeout.py
├── service_orchestrator.py
└── session_manager.py
```
# Files
--------------------------------------------------------------------------------
/playbooks/s3/s3_optimization_orchestrator.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | S3 Optimization Orchestrator for CFM Tips MCP Server
3 |
4 | Main coordination layer for S3 optimization workflows with session integration,
5 | performance monitoring, intelligent caching, and memory management.
6 | """
7 |
8 | import json
9 | import logging
10 | import time
11 | from typing import Dict, List, Any, Optional
12 | from datetime import datetime
13 |
14 | from utils.service_orchestrator import ServiceOrchestrator
15 | from utils.session_manager import get_session_manager
16 | from .s3_analysis_engine import S3AnalysisEngine
17 | from .s3_aggregation_queries import S3AggregationQueries, S3QueryExecutor
18 | from utils.performance_monitor import get_performance_monitor
19 | from utils.intelligent_cache import get_pricing_cache, get_bucket_metadata_cache, get_analysis_results_cache
20 | from utils.memory_manager import get_memory_manager
21 | from utils.progressive_timeout import get_timeout_handler
22 | from utils.documentation_links import add_documentation_links
23 |
24 | logger = logging.getLogger(__name__)
25 |
26 |
27 | class S3OptimizationOrchestrator:
28 | """Main orchestrator for S3 optimization workflows."""
29 |
30 | def __init__(self, region: Optional[str] = None, session_id: Optional[str] = None):
31 | """
32 | Initialize S3OptimizationOrchestrator with performance optimizations.
33 |
34 | Args:
35 | region: AWS region for S3 operations
36 | session_id: Optional session ID for data persistence
37 | """
38 | self.region = region
39 | self.session_manager = get_session_manager()
40 |
41 | # Initialize ServiceOrchestrator (it will create session if session_id is None)
42 | self.service_orchestrator = ServiceOrchestrator(session_id)
43 |
44 | # Get the actual session ID from ServiceOrchestrator
45 | self.session_id = self.service_orchestrator.session_id
46 |
47 | # Initialize performance optimization components
48 | self.performance_monitor = get_performance_monitor()
49 | self.memory_manager = get_memory_manager()
50 | self.timeout_handler = get_timeout_handler()
51 |
52 | # Initialize caching systems
53 | self.pricing_cache = get_pricing_cache()
54 | self.bucket_metadata_cache = get_bucket_metadata_cache()
55 | self.analysis_results_cache = get_analysis_results_cache()
56 |
57 | # Register cache instances with memory manager for cleanup
58 | self.memory_manager.add_cache_reference(self.pricing_cache)
59 | self.memory_manager.add_cache_reference(self.bucket_metadata_cache)
60 | self.memory_manager.add_cache_reference(self.analysis_results_cache)
61 |
62 | # Set up performance monitor integration
63 | self.pricing_cache.set_performance_monitor(self.performance_monitor)
64 | self.bucket_metadata_cache.set_performance_monitor(self.performance_monitor)
65 | self.analysis_results_cache.set_performance_monitor(self.performance_monitor)
66 | self.memory_manager.set_performance_monitor(self.performance_monitor)
67 | self.timeout_handler.set_performance_monitor(self.performance_monitor)
68 |
69 | # Initialize analysis engine with all analyzers and performance components
70 | self.analysis_engine = S3AnalysisEngine(
71 | region=region,
72 | performance_monitor=self.performance_monitor,
73 | memory_manager=self.memory_manager,
74 | timeout_handler=self.timeout_handler,
75 | pricing_cache=self.pricing_cache,
76 | bucket_metadata_cache=self.bucket_metadata_cache,
77 | analysis_results_cache=self.analysis_results_cache
78 | )
79 |
80 | logger.info(f"S3OptimizationOrchestrator initialized with performance optimizations for region: {region or 'default'}, session: {self.session_id}")
81 |
82 | async def execute_analysis(self, analysis_type: str, **kwargs) -> Dict[str, Any]:
83 | """
84 | Execute a specific S3 analysis with performance optimizations.
85 |
86 | Args:
87 | analysis_type: Type of analysis to execute
88 | **kwargs: Analysis-specific parameters
89 |
90 | Returns:
91 | Dictionary containing analysis results
92 | """
93 | start_time = time.time()
94 |
95 | # Start performance monitoring
96 | monitoring_session = self.performance_monitor.start_analysis_monitoring(
97 | analysis_type,
98 | f"single_{int(start_time)}"
99 | )
100 |
101 | # Start memory tracking
102 | memory_tracker = self.memory_manager.start_memory_tracking(f"analysis_{analysis_type}")
103 |
104 | logger.info(f"Starting S3 analysis with performance optimizations: {analysis_type}")
105 |
106 | try:
107 | # Check cache for recent results first
108 | cache_key = [analysis_type, kwargs.get('region', self.region), kwargs]
109 | cached_result = self.analysis_results_cache.get(cache_key)
110 |
111 | if cached_result is not None:
112 | logger.info(f"Retrieved {analysis_type} analysis from cache")
113 | self.performance_monitor.record_cache_hit("analysis_results", analysis_type)
114 |
115 | # Update execution time and return cached result
116 | cached_result["orchestrator_execution_time"] = time.time() - start_time
117 | cached_result["session_id"] = self.session_id
118 | cached_result["from_cache"] = True
119 |
120 | # End monitoring
121 | self.performance_monitor.end_analysis_monitoring(monitoring_session, success=True)
122 | self.memory_manager.stop_memory_tracking(f"analysis_{analysis_type}")
123 |
124 | return cached_result
125 |
126 | self.performance_monitor.record_cache_miss("analysis_results", analysis_type)
127 |
128 | # Validate analysis type
129 | if not self._is_valid_analysis_type(analysis_type):
130 | error_result = {
131 | "status": "error",
132 | "message": f"Invalid analysis type: {analysis_type}",
133 | "analysis_type": analysis_type,
134 | "available_types": self.analysis_engine.analyzer_registry.list_analyzers()
135 | }
136 |
137 | self.performance_monitor.end_analysis_monitoring(
138 | monitoring_session,
139 | success=False,
140 | error_message="Invalid analysis type"
141 | )
142 | self.memory_manager.stop_memory_tracking(f"analysis_{analysis_type}")
143 |
144 | return error_result
145 |
146 | # Calculate intelligent timeout
147 | timeout_seconds = self.timeout_handler.get_timeout_for_analysis(analysis_type, **kwargs)
148 | kwargs['timeout_seconds'] = timeout_seconds
149 |
150 | logger.info(f"Calculated timeout for {analysis_type}: {timeout_seconds:.1f}s")
151 |
152 | # Prepare analysis parameters with performance optimizations
153 | analysis_params = self._prepare_analysis_params(analysis_type, **kwargs)
154 | analysis_params['monitoring_session'] = monitoring_session
155 | analysis_params['memory_tracker'] = memory_tracker
156 |
157 | # Execute analysis using the analysis engine
158 | result = await self.analysis_engine.run_analysis(analysis_type, **analysis_params)
159 |
160 | # Cache successful results
161 | if result.get('status') == 'success':
162 | # Cache result with appropriate TTL based on analysis type
163 | cache_ttl = self._get_cache_ttl_for_analysis(analysis_type)
164 | self.analysis_results_cache.put(
165 | cache_key,
166 | result.copy(),
167 | ttl_seconds=cache_ttl,
168 | tags={"analysis_type": analysis_type, "region": self.region}
169 | )
170 |
171 | logger.debug(f"Cached {analysis_type} analysis result (TTL: {cache_ttl}s)")
172 |
173 | # Store results if requested and analysis was successful
174 | if kwargs.get('store_results', True) and result.get('status') == 'success':
175 | self._store_analysis_results(analysis_type, result)
176 |
177 | execution_time = time.time() - start_time
178 | result["orchestrator_execution_time"] = execution_time
179 | result["session_id"] = self.session_id
180 | result["from_cache"] = False
181 |
182 | # Record performance metrics
183 | self.timeout_handler.record_execution_time(
184 | analysis_type,
185 | execution_time,
186 | self.timeout_handler.get_complexity_level(analysis_type, **kwargs)
187 | )
188 |
189 | # End monitoring
190 | success = result.get('status') == 'success'
191 | self.performance_monitor.end_analysis_monitoring(
192 | monitoring_session,
193 | success=success,
194 | error_message=result.get('error_message') if not success else None
195 | )
196 |
197 | memory_stats = self.memory_manager.stop_memory_tracking(f"analysis_{analysis_type}")
198 | if memory_stats:
199 | result["memory_usage"] = memory_stats
200 |
201 | logger.info(f"Completed S3 analysis: {analysis_type} in {execution_time:.2f}s")
202 |
203 | return result
204 |
205 | except Exception as e:
206 | execution_time = time.time() - start_time
207 | logger.error(f"Error in S3 analysis {analysis_type}: {str(e)}")
208 |
209 | # End monitoring with error
210 | self.performance_monitor.end_analysis_monitoring(
211 | monitoring_session,
212 | success=False,
213 | error_message=str(e)
214 | )
215 | self.memory_manager.stop_memory_tracking(f"analysis_{analysis_type}")
216 |
217 | return {
218 | "status": "error",
219 | "analysis_type": analysis_type,
220 | "message": f"Analysis failed: {str(e)}",
221 | "execution_time": execution_time,
222 | "session_id": self.session_id,
223 | "timestamp": datetime.now().isoformat(),
224 | "from_cache": False
225 | }
226 |
227 | async def execute_comprehensive_analysis(self, **kwargs) -> Dict[str, Any]:
228 | """
229 | Execute all S3 analyses in parallel with performance optimizations and intelligent resource management.
230 |
231 | Args:
232 | **kwargs: Analysis parameters
233 |
234 | Returns:
235 | Dictionary containing comprehensive analysis results
236 | """
237 | start_time = time.time()
238 |
239 | # Start comprehensive performance monitoring
240 | monitoring_session = self.performance_monitor.start_analysis_monitoring(
241 | "comprehensive",
242 | f"comprehensive_{int(start_time)}"
243 | )
244 |
245 | # Start memory tracking for comprehensive analysis
246 | memory_tracker = self.memory_manager.start_memory_tracking("comprehensive_analysis")
247 |
248 | logger.info("Starting comprehensive S3 analysis with performance optimizations and parallel execution")
249 |
250 | try:
251 | # Check cache for recent comprehensive analysis
252 | cache_key = ["comprehensive", kwargs.get('region', self.region), kwargs]
253 | cached_result = self.analysis_results_cache.get(cache_key)
254 |
255 | if cached_result is not None:
256 | logger.info("Retrieved comprehensive analysis from cache")
257 | self.performance_monitor.record_cache_hit("analysis_results", "comprehensive")
258 |
259 | # Update execution time and return cached result
260 | cached_result["orchestrator_execution_time"] = time.time() - start_time
261 | cached_result["session_id"] = self.session_id
262 | cached_result["from_cache"] = True
263 |
264 | # End monitoring
265 | self.performance_monitor.end_analysis_monitoring(monitoring_session, success=True)
266 | self.memory_manager.stop_memory_tracking("comprehensive_analysis")
267 |
268 | return cached_result
269 |
270 | self.performance_monitor.record_cache_miss("analysis_results", "comprehensive")
271 |
272 | # Get all available analysis types from the engine with priority information
273 | available_analyses = self.analysis_engine.get_available_analyses()
274 | analysis_types = [analysis["analysis_type"] for analysis in available_analyses]
275 |
276 | logger.info(f"Executing {len(analysis_types)} analyses with intelligent prioritization: {analysis_types}")
277 |
278 | # Calculate intelligent timeout for comprehensive analysis
279 | comprehensive_timeout = self.timeout_handler.get_timeout_for_analysis("comprehensive", **kwargs)
280 | kwargs['total_timeout'] = comprehensive_timeout
281 |
282 | logger.info(f"Calculated comprehensive analysis timeout: {comprehensive_timeout:.1f}s")
283 |
284 | # Prepare analysis parameters with performance optimizations
285 | analysis_params = self._prepare_comprehensive_analysis_params(**kwargs)
286 | analysis_params['monitoring_session'] = monitoring_session
287 | analysis_params['memory_tracker'] = memory_tracker
288 |
289 | # Register large object for memory management if available
290 | if self.memory_manager:
291 | try:
292 | self.memory_manager.register_large_object(
293 | f"comprehensive_analysis_{int(start_time)}",
294 | analysis_params,
295 | size_mb=1.0, # Estimated size
296 | cleanup_callback=lambda: logger.debug("Cleaned up comprehensive analysis parameters")
297 | )
298 | except Exception as e:
299 | logger.warning(f"Could not register large object with memory manager: {str(e)}")
300 |
301 | # Create parallel analysis tasks using analysis engine
302 | service_calls = self.analysis_engine.create_parallel_analysis_tasks(
303 | analysis_types=analysis_types,
304 | **analysis_params
305 | )
306 |
307 | logger.info(f"Created {len(service_calls)} intelligently prioritized parallel tasks")
308 |
309 | # Execute analyses in parallel using ServiceOrchestrator with session-sql integration
310 | execution_results = self.service_orchestrator.execute_parallel_analysis(
311 | service_calls=service_calls,
312 | store_results=kwargs.get('store_results', True),
313 | timeout=comprehensive_timeout
314 | )
315 |
316 | logger.info(f"Parallel execution completed: {execution_results['successful']}/{execution_results['total_tasks']} successful")
317 |
318 | # Record performance metrics
319 | self.performance_monitor.record_metric(
320 | "comprehensive_analysis_success_rate",
321 | (execution_results['successful'] / execution_results['total_tasks'] * 100) if execution_results['total_tasks'] > 0 else 0,
322 | tags={"session_id": self.session_id}
323 | )
324 |
325 | # Aggregate results using enhanced aggregation with cross-analyzer insights
326 | aggregated_results = self.aggregate_results_with_insights(
327 | results=execution_results.get('results', {}),
328 | include_cross_analysis=kwargs.get('include_cross_analysis', True)
329 | )
330 |
331 | # Store aggregated results with session-sql integration
332 | if kwargs.get('store_results', True):
333 | self._store_comprehensive_results(aggregated_results, execution_results)
334 |
335 | # Execute cross-analysis aggregation queries for deeper insights
336 | cross_analysis_data = {}
337 | if kwargs.get('include_cross_analysis', True) and execution_results.get('stored_tables'):
338 | cross_analysis_data = self._execute_cross_analysis_queries(execution_results['stored_tables'])
339 |
340 | execution_time = time.time() - start_time
341 |
342 | # Create comprehensive result
343 | comprehensive_result = {
344 | "status": "success",
345 | "analysis_type": "comprehensive",
346 | "execution_summary": execution_results,
347 | "aggregated_results": aggregated_results,
348 | "cross_analysis_data": cross_analysis_data,
349 | "execution_time": execution_time,
350 | "session_id": self.session_id,
351 | "timestamp": datetime.now().isoformat(),
352 | "from_cache": False,
353 | "performance_optimizations": {
354 | "intelligent_timeout": comprehensive_timeout,
355 | "cache_enabled": True,
356 | "memory_management": True,
357 | "progressive_timeouts": True
358 | },
359 | "analysis_metadata": {
360 | "total_analyses": len(analysis_types),
361 | "successful_analyses": aggregated_results.get("aggregation_metadata", {}).get("successful_analyses", 0),
362 | "failed_analyses": aggregated_results.get("aggregation_metadata", {}).get("failed_analyses", 0),
363 | "total_potential_savings": aggregated_results.get("total_potential_savings", 0),
364 | "stored_tables": execution_results.get('stored_tables', []),
365 | "task_prioritization": self._get_task_prioritization_summary(available_analyses)
366 | }
367 | }
368 |
369 | # Cache the comprehensive result
370 | cache_ttl = self._get_cache_ttl_for_analysis("comprehensive")
371 | self.analysis_results_cache.put(
372 | cache_key,
373 | comprehensive_result.copy(),
374 | ttl_seconds=cache_ttl,
375 | tags={"analysis_type": "comprehensive", "region": self.region}
376 | )
377 |
378 | # Record performance metrics
379 | self.timeout_handler.record_execution_time(
380 | "comprehensive",
381 | execution_time,
382 | self.timeout_handler.get_complexity_level("comprehensive", **kwargs)
383 | )
384 |
385 | # End monitoring
386 | self.performance_monitor.end_analysis_monitoring(monitoring_session, success=True)
387 | memory_stats = self.memory_manager.stop_memory_tracking("comprehensive_analysis")
388 |
389 | if memory_stats:
390 | comprehensive_result["memory_usage"] = memory_stats
391 |
392 | logger.info(f"Completed comprehensive S3 analysis with optimizations in {execution_time:.2f}s")
393 |
394 | return comprehensive_result
395 |
396 | except Exception as e:
397 | execution_time = time.time() - start_time
398 | logger.error(f"Error in comprehensive S3 analysis: {str(e)}")
399 |
400 | # End monitoring with error
401 | self.performance_monitor.end_analysis_monitoring(
402 | monitoring_session,
403 | success=False,
404 | error_message=str(e)
405 | )
406 | self.memory_manager.stop_memory_tracking("comprehensive_analysis")
407 |
408 | return {
409 | "status": "error",
410 | "analysis_type": "comprehensive",
411 | "message": f"Comprehensive analysis failed: {str(e)}",
412 | "execution_time": execution_time,
413 | "session_id": self.session_id,
414 | "timestamp": datetime.now().isoformat(),
415 | "from_cache": False
416 | }
417 |
418 | def get_analysis_results(self, query: str) -> List[Dict[str, Any]]:
419 | """
420 | Query stored analysis results.
421 |
422 | Args:
423 | query: SQL query to execute
424 |
425 | Returns:
426 | List of query results
427 | """
428 | try:
429 | return self.service_orchestrator.query_session_data(query)
430 | except Exception as e:
431 | logger.error(f"Error querying analysis results: {str(e)}")
432 | return []
433 |
434 | def get_stored_tables(self) -> List[str]:
435 | """
436 | Get list of tables stored in the session.
437 |
438 | Returns:
439 | List of table names
440 | """
441 | try:
442 | return self.service_orchestrator.get_stored_tables()
443 | except Exception as e:
444 | logger.error(f"Error getting stored tables: {str(e)}")
445 | return []
446 |
447 | def _is_valid_analysis_type(self, analysis_type: str) -> bool:
448 | """Validate analysis type using the analysis engine registry."""
449 | if analysis_type == "comprehensive":
450 | return True
451 | return self.analysis_engine.analyzer_registry.get(analysis_type) is not None
452 |
453 | def get_analyzer_registry(self) -> Dict[str, Any]:
454 | """
455 | Get comprehensive information about the analyzer registry and registered analyzers.
456 |
457 | Returns:
458 | Dictionary containing analyzer registry information
459 | """
460 | try:
461 | # Get basic registry info
462 | registry_info = {
463 | "registry_status": "active",
464 | "total_analyzers": len(self.analysis_engine.analyzer_registry.list_analyzers()),
465 | "registered_analyzers": self.analysis_engine.analyzer_registry.get_analyzer_info(),
466 | "analysis_priorities": self.analysis_engine.analysis_priorities,
467 | "available_analyses": self.analysis_engine.get_available_analyses(),
468 | "registry_timestamp": datetime.now().isoformat()
469 | }
470 |
471 | # Get comprehensive health status from analysis engine
472 | health_status = self.analysis_engine.get_analyzer_health_status()
473 | registry_info["health_status"] = health_status
474 |
475 | # Add loading results if available
476 | if hasattr(self.analysis_engine, 'analyzer_loading_results'):
477 | registry_info["loading_results"] = self.analysis_engine.analyzer_loading_results
478 |
479 | # Add execution history summary
480 | if hasattr(self.analysis_engine, 'execution_history') and self.analysis_engine.execution_history:
481 | recent_executions = self.analysis_engine.execution_history[-10:] # Last 10 executions
482 | registry_info["recent_executions"] = recent_executions
483 |
484 | # Calculate execution statistics
485 | successful_executions = [e for e in recent_executions if e.get('status') == 'success']
486 | registry_info["execution_statistics"] = {
487 | "recent_success_rate": len(successful_executions) / len(recent_executions) * 100 if recent_executions else 0,
488 | "total_executions": len(self.analysis_engine.execution_history),
489 | "recent_executions": len(recent_executions)
490 | }
491 |
492 | # Add performance optimization status
493 | registry_info["performance_optimizations"] = {
494 | "performance_monitor_enabled": self.performance_monitor is not None,
495 | "memory_manager_enabled": self.memory_manager is not None,
496 | "timeout_handler_enabled": self.timeout_handler is not None,
497 | "caching_enabled": {
498 | "pricing_cache": self.pricing_cache is not None,
499 | "bucket_metadata_cache": self.bucket_metadata_cache is not None,
500 | "analysis_results_cache": self.analysis_results_cache is not None
501 | }
502 | }
503 |
504 | return registry_info
505 |
506 | except Exception as e:
507 | logger.error(f"Error getting analyzer registry info: {str(e)}")
508 | return {
509 | "registry_status": "error",
510 | "error": str(e),
511 | "error_type": e.__class__.__name__,
512 | "registry_timestamp": datetime.now().isoformat()
513 | }
514 |
515 | def reload_analyzer(self, analysis_type: str) -> Dict[str, Any]:
516 | """
517 | Reload a specific analyzer through the analysis engine.
518 |
519 | Args:
520 | analysis_type: Type of analyzer to reload
521 |
522 | Returns:
523 | Dictionary containing reload results
524 | """
525 | try:
526 | logger.info(f"Orchestrator reloading analyzer: {analysis_type}")
527 |
528 | # Use analysis engine's reload method
529 | reload_result = self.analysis_engine.reload_analyzer(analysis_type)
530 |
531 | # Clear related caches if reload was successful
532 | if reload_result.get("status") == "success":
533 | try:
534 | # Clear analysis results cache for this analyzer
535 | if self.analysis_results_cache:
536 | cache_keys_to_clear = []
537 | cache_dict = getattr(self.analysis_results_cache, '_cache', {})
538 | for key in cache_dict.keys():
539 | if isinstance(key, (list, tuple)) and len(key) > 0 and key[0] == analysis_type:
540 | cache_keys_to_clear.append(key)
541 |
542 | for key in cache_keys_to_clear:
543 | if hasattr(self.analysis_results_cache, 'invalidate'):
544 | self.analysis_results_cache.invalidate(key)
545 | elif hasattr(self.analysis_results_cache, 'delete'):
546 | self.analysis_results_cache.delete(key)
547 |
548 | logger.info(f"Cleared {len(cache_keys_to_clear)} cache entries for {analysis_type}")
549 | reload_result["cache_entries_cleared"] = len(cache_keys_to_clear)
550 |
551 | except Exception as cache_error:
552 | logger.warning(f"Error clearing cache for {analysis_type}: {str(cache_error)}")
553 | reload_result["cache_clear_warning"] = str(cache_error)
554 |
555 | return reload_result
556 |
557 | except Exception as e:
558 | logger.error(f"Error in orchestrator reload for {analysis_type}: {str(e)}")
559 | return {
560 | "status": "error",
561 | "message": f"Orchestrator reload failed for {analysis_type}: {str(e)}",
562 | "error_type": e.__class__.__name__,
563 | "reloaded_at": datetime.now().isoformat()
564 | }
565 |
566 | def handle_analyzer_failure(self, analysis_type: str, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]:
567 | """
568 | Handle analyzer failures with comprehensive error handling and recovery strategies.
569 |
570 | Args:
571 | analysis_type: Type of analysis that failed
572 | error: Exception that occurred
573 | context: Analysis context
574 |
575 | Returns:
576 | Dictionary containing error handling results and recovery recommendations
577 | """
578 | try:
579 | logger.error(f"Handling analyzer failure for {analysis_type}: {str(error)}")
580 |
581 | # Use analysis engine's error handling if available
582 | if hasattr(self.analysis_engine, 'handle_analyzer_failure'):
583 | return self.analysis_engine.handle_analyzer_failure(analysis_type, error, context)
584 |
585 | # Fallback error handling
586 | error_message = str(error)
587 | error_type = error.__class__.__name__
588 |
589 | # Determine recovery strategy
590 | recovery_strategy = self._determine_recovery_strategy(error_type, error_message)
591 |
592 | # Create comprehensive error result
593 | error_result = {
594 | "status": "error",
595 | "analysis_type": analysis_type,
596 | "error_message": error_message,
597 | "error_type": error_type,
598 | "error_category": recovery_strategy["category"],
599 | "recovery_strategy": recovery_strategy["strategy"],
600 | "context": context,
601 | "timestamp": datetime.now().isoformat(),
602 | "session_id": self.session_id,
603 | "orchestrator_handled": True,
604 | "recommendations": []
605 | }
606 |
607 | # Add specific recommendations based on error type
608 | if "permission" in error_message.lower() or "access" in error_message.lower():
609 | error_result["recommendations"].extend([
610 | {
611 | "type": "permission_fix",
612 | "priority": "high",
613 | "title": "Fix AWS Permissions",
614 | "description": f"Analysis {analysis_type} failed due to permission issues",
615 | "action_items": [
616 | "Check IAM permissions for S3, Cost Explorer, and Storage Lens",
617 | "Verify AWS credentials are valid and not expired",
618 | "Ensure required service permissions are granted",
619 | "Check if MFA is required for API access"
620 | ]
621 | }
622 | ])
623 | elif "timeout" in error_message.lower():
624 | error_result["recommendations"].extend([
625 | {
626 | "type": "timeout_optimization",
627 | "priority": "medium",
628 | "title": "Optimize Timeout Settings",
629 | "description": f"Analysis {analysis_type} timed out during execution",
630 | "action_items": [
631 | "Increase timeout_seconds parameter",
632 | "Reduce lookback_days to limit data volume",
633 | "Filter to specific bucket_names if possible",
634 | "Run analysis during off-peak hours"
635 | ]
636 | }
637 | ])
638 | elif "rate" in error_message.lower() or "throttl" in error_message.lower():
639 | error_result["recommendations"].extend([
640 | {
641 | "type": "rate_limit_handling",
642 | "priority": "medium",
643 | "title": "Handle API Rate Limits",
644 | "description": f"Analysis {analysis_type} hit API rate limits",
645 | "action_items": [
646 | "Implement exponential backoff retry logic",
647 | "Reduce concurrent analysis execution",
648 | "Spread analysis execution over time",
649 | "Consider using AWS SDK retry configuration"
650 | ]
651 | }
652 | ])
653 |
654 | # Record error in performance monitor if available
655 | if self.performance_monitor:
656 | self.performance_monitor.record_metric(
657 | f"orchestrator_analyzer_failure_{analysis_type}",
658 | 1,
659 | tags={
660 | "error_type": error_type,
661 | "error_category": recovery_strategy["category"],
662 | "session_id": self.session_id
663 | }
664 | )
665 |
666 | # Attempt automatic recovery if strategy suggests it
667 | if recovery_strategy.get("auto_recovery", False):
668 | recovery_result = self._attempt_auto_recovery(analysis_type, error, context)
669 | error_result["auto_recovery_attempted"] = True
670 | error_result["auto_recovery_result"] = recovery_result
671 |
672 | return error_result
673 |
674 | except Exception as handling_error:
675 | logger.error(f"Error in analyzer failure handling: {str(handling_error)}")
676 | return {
677 | "status": "error",
678 | "analysis_type": analysis_type,
679 | "error_message": f"Original error: {str(error)}. Handling error: {str(handling_error)}",
680 | "error_type": "FailureHandlingError",
681 | "timestamp": datetime.now().isoformat(),
682 | "session_id": self.session_id,
683 | "critical_error": True
684 | }
685 |
686 | def _determine_recovery_strategy(self, error_type: str, error_message: str) -> Dict[str, Any]:
687 | """
688 | Determine recovery strategy based on error type and message.
689 |
690 | Args:
691 | error_type: Type of exception
692 | error_message: Error message
693 |
694 | Returns:
695 | Dictionary containing recovery strategy information
696 | """
697 | error_lower = error_message.lower()
698 |
699 | if "permission" in error_lower or "access" in error_lower or "credential" in error_lower:
700 | return {
701 | "category": "permission_error",
702 | "strategy": "Check and fix AWS permissions",
703 | "auto_recovery": False,
704 | "severity": "high"
705 | }
706 | elif "timeout" in error_lower:
707 | return {
708 | "category": "timeout_error",
709 | "strategy": "Reduce scope and increase timeout",
710 | "auto_recovery": True,
711 | "severity": "medium"
712 | }
713 | elif "throttl" in error_lower or "rate" in error_lower:
714 | return {
715 | "category": "rate_limit_error",
716 | "strategy": "Implement backoff and retry",
717 | "auto_recovery": True,
718 | "severity": "medium"
719 | }
720 | elif "network" in error_lower or "connection" in error_lower:
721 | return {
722 | "category": "network_error",
723 | "strategy": "Retry with exponential backoff",
724 | "auto_recovery": True,
725 | "severity": "medium"
726 | }
727 | elif "service" in error_lower and "unavailable" in error_lower:
728 | return {
729 | "category": "service_error",
730 | "strategy": "Wait and retry, use fallback data sources",
731 | "auto_recovery": True,
732 | "severity": "high"
733 | }
734 | else:
735 | return {
736 | "category": "unknown_error",
737 | "strategy": "Manual investigation required",
738 | "auto_recovery": False,
739 | "severity": "high"
740 | }
741 |
742 | def _attempt_auto_recovery(self, analysis_type: str, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]:
743 | """
744 | Attempt automatic recovery for recoverable errors.
745 |
746 | Args:
747 | analysis_type: Type of analysis that failed
748 | error: Exception that occurred
749 | context: Analysis context
750 |
751 | Returns:
752 | Dictionary containing recovery attempt results
753 | """
754 | try:
755 | logger.info(f"Attempting auto-recovery for {analysis_type}")
756 |
757 | recovery_actions = []
758 |
759 | # Try reloading the analyzer
760 | reload_result = self.reload_analyzer(analysis_type)
761 | recovery_actions.append({
762 | "action": "reload_analyzer",
763 | "result": reload_result.get("status"),
764 | "details": reload_result
765 | })
766 |
767 | # Clear related caches
768 | if self.analysis_results_cache:
769 | try:
770 | cache_keys_cleared = 0
771 | cache_dict = getattr(self.analysis_results_cache, '_cache', {})
772 | for key in list(cache_dict.keys()):
773 | if isinstance(key, (list, tuple)) and len(key) > 0 and key[0] == analysis_type:
774 | if hasattr(self.analysis_results_cache, 'invalidate'):
775 | self.analysis_results_cache.invalidate(key)
776 | elif hasattr(self.analysis_results_cache, 'delete'):
777 | self.analysis_results_cache.delete(key)
778 | cache_keys_cleared += 1
779 |
780 | recovery_actions.append({
781 | "action": "clear_cache",
782 | "result": "success",
783 | "details": {"cache_keys_cleared": cache_keys_cleared}
784 | })
785 | except Exception as cache_error:
786 | recovery_actions.append({
787 | "action": "clear_cache",
788 | "result": "error",
789 | "details": {"error": str(cache_error)}
790 | })
791 |
792 | return {
793 | "status": "completed",
794 | "recovery_actions": recovery_actions,
795 | "timestamp": datetime.now().isoformat(),
796 | "analysis_type": analysis_type
797 | }
798 |
799 | except Exception as recovery_error:
800 | logger.error(f"Error during auto-recovery for {analysis_type}: {str(recovery_error)}")
801 | return {
802 | "status": "failed",
803 | "error": str(recovery_error),
804 | "timestamp": datetime.now().isoformat(),
805 | "analysis_type": analysis_type
806 | }
807 |
808 | def get_analyzer_diagnostics(self) -> Dict[str, Any]:
809 | """
810 | Get comprehensive diagnostics for all analyzers and the analysis engine.
811 |
812 | Returns:
813 | Dictionary containing diagnostic information
814 | """
815 | try:
816 | diagnostics = {
817 | "diagnostics_timestamp": datetime.now().isoformat(),
818 | "orchestrator_info": {
819 | "region": self.region,
820 | "session_id": self.session_id,
821 | "performance_optimizations_enabled": True
822 | },
823 | "analysis_engine_info": {
824 | "region": self.analysis_engine.region,
825 | "total_analyzers": len(self.analysis_engine.analyzer_registry.list_analyzers()),
826 | "execution_history_count": len(getattr(self.analysis_engine, 'execution_history', []))
827 | }
828 | }
829 |
830 | # Get health status
831 | health_status = self.analysis_engine.get_analyzer_health_status()
832 | diagnostics["health_status"] = health_status
833 |
834 | # Get loading results
835 | if hasattr(self.analysis_engine, 'analyzer_loading_results'):
836 | diagnostics["loading_results"] = self.analysis_engine.analyzer_loading_results
837 |
838 | # Test analyzer connectivity
839 | connectivity_tests = {}
840 | for analysis_type in self.analysis_engine.analyzer_registry.list_analyzers():
841 | analyzer = self.analysis_engine.analyzer_registry.get(analysis_type)
842 |
843 | connectivity_test = {
844 | "analyzer_valid": self.analysis_engine._validate_analyzer(analyzer),
845 | "services_connected": {
846 | "s3_service": analyzer.s3_service is not None,
847 | "pricing_service": analyzer.pricing_service is not None,
848 | "storage_lens_service": analyzer.storage_lens_service is not None
849 | },
850 | "performance_components": {
851 | "performance_monitor": hasattr(analyzer, 'performance_monitor') and analyzer.performance_monitor is not None,
852 | "memory_manager": hasattr(analyzer, 'memory_manager') and analyzer.memory_manager is not None,
853 | "timeout_handler": hasattr(analyzer, 'timeout_handler') and analyzer.timeout_handler is not None
854 | }
855 | }
856 |
857 | connectivity_tests[analysis_type] = connectivity_test
858 |
859 | diagnostics["connectivity_tests"] = connectivity_tests
860 |
861 | # Get cache statistics
862 | cache_stats = {}
863 | if self.pricing_cache:
864 | try:
865 | cache_size = len(getattr(self.pricing_cache, '_cache', {}))
866 | except:
867 | cache_size = 0
868 | cache_stats["pricing_cache"] = {
869 | "size": cache_size,
870 | "hit_rate": getattr(self.pricing_cache, 'hit_rate', 0),
871 | "enabled": True
872 | }
873 |
874 | if self.bucket_metadata_cache:
875 | try:
876 | cache_size = len(getattr(self.bucket_metadata_cache, '_cache', {}))
877 | except:
878 | cache_size = 0
879 | cache_stats["bucket_metadata_cache"] = {
880 | "size": cache_size,
881 | "hit_rate": getattr(self.bucket_metadata_cache, 'hit_rate', 0),
882 | "enabled": True
883 | }
884 |
885 | if self.analysis_results_cache:
886 | try:
887 | cache_size = len(getattr(self.analysis_results_cache, '_cache', {}))
888 | except:
889 | cache_size = 0
890 | cache_stats["analysis_results_cache"] = {
891 | "size": cache_size,
892 | "hit_rate": getattr(self.analysis_results_cache, 'hit_rate', 0),
893 | "enabled": True
894 | }
895 |
896 | diagnostics["cache_statistics"] = cache_stats
897 |
898 | return diagnostics
899 |
900 | except Exception as e:
901 | logger.error(f"Error getting analyzer diagnostics: {str(e)}")
902 | return {
903 | "diagnostics_timestamp": datetime.now().isoformat(),
904 | "status": "error",
905 | "error_message": str(e),
906 | "error_type": e.__class__.__name__
907 | }
908 |
909 | def reload_analyzers(self) -> Dict[str, Any]:
910 | """
911 | Reload all analyzers in the registry with fresh instances.
912 |
913 | Returns:
914 | Dictionary containing reload results
915 | """
916 | try:
917 | logger.info("Reloading analyzers in registry")
918 |
919 | # Get current analyzer count
920 | old_count = len(self.analysis_engine.analyzer_registry.list_analyzers())
921 |
922 | # Clear existing registry
923 | self.analysis_engine.analyzer_registry._analyzers.clear()
924 |
925 | # Reinitialize analyzers
926 | self.analysis_engine._initialize_analyzers()
927 |
928 | # Get new analyzer count
929 | new_count = len(self.analysis_engine.analyzer_registry.list_analyzers())
930 |
931 | reload_result = {
932 | "status": "success",
933 | "message": "Analyzers reloaded successfully",
934 | "old_analyzer_count": old_count,
935 | "new_analyzer_count": new_count,
936 | "reloaded_analyzers": self.analysis_engine.analyzer_registry.list_analyzers(),
937 | "reload_timestamp": datetime.now().isoformat()
938 | }
939 |
940 | logger.info(f"Successfully reloaded {new_count} analyzers")
941 | return reload_result
942 |
943 | except Exception as e:
944 | logger.error(f"Error reloading analyzers: {str(e)}")
945 | return {
946 | "status": "error",
947 | "message": f"Failed to reload analyzers: {str(e)}",
948 | "reload_timestamp": datetime.now().isoformat()
949 | }
950 |
951 | def handle_analyzer_failure(self, analysis_type: str, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]:
952 | """
953 | Handle analyzer failures with comprehensive error handling and recovery strategies.
954 |
955 | Args:
956 | analysis_type: Type of analysis that failed
957 | error: Exception that occurred
958 | context: Analysis context
959 |
960 | Returns:
961 | Dictionary containing error handling results and recovery recommendations
962 | """
963 | error_message = str(error)
964 | error_type = error.__class__.__name__
965 |
966 | logger.error(f"Analyzer failure in {analysis_type}: {error_message}")
967 |
968 | # Determine error category and recovery strategy
969 | recovery_strategy = self._determine_recovery_strategy(error_type, error_message)
970 |
971 | # Create comprehensive error result
972 | error_result = {
973 | "status": "error",
974 | "analysis_type": analysis_type,
975 | "error_message": error_message,
976 | "error_type": error_type,
977 | "error_category": recovery_strategy["category"],
978 | "recovery_strategy": recovery_strategy["strategy"],
979 | "context": context,
980 | "timestamp": datetime.now().isoformat(),
981 | "session_id": self.session_id,
982 | "recommendations": []
983 | }
984 |
985 | # Add specific recommendations based on error type
986 | if "permission" in error_message.lower() or "access" in error_message.lower():
987 | error_result["recommendations"].extend([
988 | {
989 | "type": "permission_fix",
990 | "priority": "high",
991 | "title": "Fix AWS Permissions",
992 | "description": f"Analysis {analysis_type} failed due to permission issues",
993 | "action_items": [
994 | "Check IAM permissions for S3, Cost Explorer, and Storage Lens",
995 | "Verify AWS credentials are valid and not expired",
996 | "Ensure required service permissions are granted",
997 | "Check if MFA is required for API access"
998 | ]
999 | }
1000 | ])
1001 | elif "timeout" in error_message.lower():
1002 | error_result["recommendations"].extend([
1003 | {
1004 | "type": "timeout_optimization",
1005 | "priority": "medium",
1006 | "title": "Optimize Timeout Settings",
1007 | "description": f"Analysis {analysis_type} timed out during execution",
1008 | "action_items": [
1009 | "Increase timeout_seconds parameter",
1010 | "Reduce lookback_days to limit data volume",
1011 | "Filter to specific bucket_names if possible",
1012 | "Run analysis during off-peak hours"
1013 | ]
1014 | }
1015 | ])
1016 | elif "rate" in error_message.lower() or "throttl" in error_message.lower():
1017 | error_result["recommendations"].extend([
1018 | {
1019 | "type": "rate_limit_handling",
1020 | "priority": "medium",
1021 | "title": "Handle API Rate Limits",
1022 | "description": f"Analysis {analysis_type} hit API rate limits",
1023 | "action_items": [
1024 | "Implement exponential backoff retry logic",
1025 | "Reduce concurrent analysis execution",
1026 | "Spread analysis execution over time",
1027 | "Consider using AWS SDK retry configuration"
1028 | ]
1029 | }
1030 | ])
1031 | else:
1032 | error_result["recommendations"].extend([
1033 | {
1034 | "type": "general_troubleshooting",
1035 | "priority": "medium",
1036 | "title": "General Troubleshooting",
1037 | "description": f"Analysis {analysis_type} failed with unexpected error",
1038 | "action_items": [
1039 | "Check AWS service status and availability",
1040 | "Verify network connectivity",
1041 | "Review analysis parameters for validity",
1042 | "Check system resources and memory usage",
1043 | "Try running analysis with reduced scope"
1044 | ]
1045 | }
1046 | ])
1047 |
1048 | # Record error in performance monitor if available
1049 | if self.performance_monitor:
1050 | self.performance_monitor.record_metric(
1051 | f"analyzer_failure_{analysis_type}",
1052 | 1,
1053 | tags={
1054 | "error_type": error_type,
1055 | "error_category": recovery_strategy["category"],
1056 | "session_id": self.session_id
1057 | }
1058 | )
1059 |
1060 | # Attempt automatic recovery if strategy suggests it
1061 | if recovery_strategy["auto_recovery"]:
1062 | recovery_result = self._attempt_auto_recovery(analysis_type, error, context)
1063 | error_result["auto_recovery_attempted"] = True
1064 | error_result["auto_recovery_result"] = recovery_result
1065 |
1066 | return error_result
1067 |
1068 | def _determine_recovery_strategy(self, error_type: str, error_message: str) -> Dict[str, Any]:
1069 | """
1070 | Determine recovery strategy based on error type and message.
1071 |
1072 | Args:
1073 | error_type: Type of exception
1074 | error_message: Error message content
1075 |
1076 | Returns:
1077 | Dictionary containing recovery strategy information
1078 | """
1079 | error_message_lower = error_message.lower()
1080 |
1081 | # Permission/Access errors
1082 | if any(keyword in error_message_lower for keyword in ["permission", "access", "denied", "unauthorized", "forbidden"]):
1083 | return {
1084 | "category": "permission_error",
1085 | "strategy": "check_permissions",
1086 | "auto_recovery": False,
1087 | "severity": "high"
1088 | }
1089 |
1090 | # Timeout errors
1091 | elif any(keyword in error_message_lower for keyword in ["timeout", "timed out", "deadline"]):
1092 | return {
1093 | "category": "timeout_error",
1094 | "strategy": "increase_timeout_or_reduce_scope",
1095 | "auto_recovery": True,
1096 | "severity": "medium"
1097 | }
1098 |
1099 | # Rate limiting errors
1100 | elif any(keyword in error_message_lower for keyword in ["rate", "throttl", "limit", "quota"]):
1101 | return {
1102 | "category": "rate_limit_error",
1103 | "strategy": "implement_backoff_retry",
1104 | "auto_recovery": True,
1105 | "severity": "medium"
1106 | }
1107 |
1108 | # Network/connectivity errors
1109 | elif any(keyword in error_message_lower for keyword in ["network", "connection", "dns", "resolve"]):
1110 | return {
1111 | "category": "network_error",
1112 | "strategy": "check_connectivity",
1113 | "auto_recovery": True,
1114 | "severity": "medium"
1115 | }
1116 |
1117 | # Service unavailable errors
1118 | elif any(keyword in error_message_lower for keyword in ["unavailable", "service", "maintenance"]):
1119 | return {
1120 | "category": "service_error",
1121 | "strategy": "retry_later",
1122 | "auto_recovery": True,
1123 | "severity": "low"
1124 | }
1125 |
1126 | # Data/validation errors
1127 | elif any(keyword in error_message_lower for keyword in ["invalid", "validation", "parameter", "format"]):
1128 | return {
1129 | "category": "validation_error",
1130 | "strategy": "check_parameters",
1131 | "auto_recovery": False,
1132 | "severity": "medium"
1133 | }
1134 |
1135 | # Memory/resource errors
1136 | elif any(keyword in error_message_lower for keyword in ["memory", "resource", "capacity"]):
1137 | return {
1138 | "category": "resource_error",
1139 | "strategy": "optimize_resource_usage",
1140 | "auto_recovery": True,
1141 | "severity": "high"
1142 | }
1143 |
1144 | # Unknown errors
1145 | else:
1146 | return {
1147 | "category": "unknown_error",
1148 | "strategy": "general_troubleshooting",
1149 | "auto_recovery": False,
1150 | "severity": "medium"
1151 | }
1152 |
1153 | def register_custom_analyzer(self, analyzer: 'BaseAnalyzer') -> Dict[str, Any]:
1154 | """
1155 | Dynamically register a custom analyzer with the orchestrator.
1156 |
1157 | Args:
1158 | analyzer: BaseAnalyzer instance to register
1159 |
1160 | Returns:
1161 | Dictionary containing registration results
1162 | """
1163 | try:
1164 | # Validate analyzer
1165 | if not hasattr(analyzer, 'analyze') or not hasattr(analyzer, 'get_recommendations'):
1166 | return {
1167 | "status": "error",
1168 | "message": "Analyzer must implement analyze() and get_recommendations() methods",
1169 | "timestamp": datetime.now().isoformat()
1170 | }
1171 |
1172 | # Add performance optimization components if available
1173 | if self.performance_monitor:
1174 | analyzer.performance_monitor = self.performance_monitor
1175 | if self.memory_manager:
1176 | analyzer.memory_manager = self.memory_manager
1177 |
1178 | # Register with the analysis engine
1179 | self.analysis_engine.analyzer_registry.register(analyzer)
1180 |
1181 | # Update analysis priorities if not already defined
1182 | analysis_type = analyzer.analysis_type
1183 | if analysis_type not in self.analysis_engine.analysis_priorities:
1184 | self.analysis_engine.analysis_priorities[analysis_type] = {
1185 | "priority": 2, # Default medium priority
1186 | "cost_impact": "unknown",
1187 | "execution_time_estimate": 30,
1188 | "dependencies": [],
1189 | "description": f"Custom analyzer: {analyzer.__class__.__name__}"
1190 | }
1191 |
1192 | logger.info(f"Successfully registered custom analyzer: {analysis_type}")
1193 |
1194 | return {
1195 | "status": "success",
1196 | "message": f"Custom analyzer '{analysis_type}' registered successfully",
1197 | "analyzer_type": analysis_type,
1198 | "analyzer_class": analyzer.__class__.__name__,
1199 | "total_analyzers": len(self.analysis_engine.analyzer_registry.list_analyzers()),
1200 | "timestamp": datetime.now().isoformat()
1201 | }
1202 |
1203 | except Exception as e:
1204 | logger.error(f"Error registering custom analyzer: {str(e)}")
1205 | return {
1206 | "status": "error",
1207 | "message": f"Failed to register custom analyzer: {str(e)}",
1208 | "timestamp": datetime.now().isoformat()
1209 | }
1210 |
1211 | def unregister_analyzer(self, analysis_type: str) -> Dict[str, Any]:
1212 | """
1213 | Unregister an analyzer from the orchestrator.
1214 |
1215 | Args:
1216 | analysis_type: Type of analysis to unregister
1217 |
1218 | Returns:
1219 | Dictionary containing unregistration results
1220 | """
1221 | try:
1222 | if analysis_type not in self.analysis_engine.analyzer_registry.list_analyzers():
1223 | return {
1224 | "status": "error",
1225 | "message": f"Analyzer '{analysis_type}' is not registered",
1226 | "available_analyzers": self.analysis_engine.analyzer_registry.list_analyzers(),
1227 | "timestamp": datetime.now().isoformat()
1228 | }
1229 |
1230 | # Remove from registry
1231 | del self.analysis_engine.analyzer_registry._analyzers[analysis_type]
1232 |
1233 | # Remove from priorities if it exists
1234 | if analysis_type in self.analysis_engine.analysis_priorities:
1235 | del self.analysis_engine.analysis_priorities[analysis_type]
1236 |
1237 | logger.info(f"Successfully unregistered analyzer: {analysis_type}")
1238 |
1239 | return {
1240 | "status": "success",
1241 | "message": f"Analyzer '{analysis_type}' unregistered successfully",
1242 | "remaining_analyzers": self.analysis_engine.analyzer_registry.list_analyzers(),
1243 | "timestamp": datetime.now().isoformat()
1244 | }
1245 |
1246 | except Exception as e:
1247 | logger.error(f"Error unregistering analyzer {analysis_type}: {str(e)}")
1248 | return {
1249 | "status": "error",
1250 | "message": f"Failed to unregister analyzer '{analysis_type}': {str(e)}",
1251 | "timestamp": datetime.now().isoformat()
1252 | }
1253 |
1254 | def get_analyzer_execution_history(self, analysis_type: Optional[str] = None) -> Dict[str, Any]:
1255 | """
1256 | Get execution history for analyzers.
1257 |
1258 | Args:
1259 | analysis_type: Optional specific analyzer type to get history for
1260 |
1261 | Returns:
1262 | Dictionary containing execution history
1263 | """
1264 | try:
1265 | if analysis_type:
1266 | # Get history for specific analyzer
1267 | analyzer = self.analysis_engine.analyzer_registry.get(analysis_type)
1268 | if not analyzer:
1269 | return {
1270 | "status": "error",
1271 | "message": f"Analyzer '{analysis_type}' not found",
1272 | "available_analyzers": self.analysis_engine.analyzer_registry.list_analyzers()
1273 | }
1274 |
1275 | return {
1276 | "status": "success",
1277 | "analysis_type": analysis_type,
1278 | "execution_count": analyzer.execution_count,
1279 | "last_execution": analyzer.last_execution.isoformat() if analyzer.last_execution else None,
1280 | "analyzer_info": analyzer.get_analyzer_info()
1281 | }
1282 | else:
1283 | # Get history for all analyzers
1284 | all_history = {}
1285 | for analyzer_type in self.analysis_engine.analyzer_registry.list_analyzers():
1286 | analyzer = self.analysis_engine.analyzer_registry.get(analyzer_type)
1287 | all_history[analyzer_type] = {
1288 | "execution_count": analyzer.execution_count,
1289 | "last_execution": analyzer.last_execution.isoformat() if analyzer.last_execution else None,
1290 | "analyzer_class": analyzer.__class__.__name__
1291 | }
1292 |
1293 | # Add engine-level execution history
1294 | engine_history = getattr(self.analysis_engine, 'execution_history', [])
1295 |
1296 | return {
1297 | "status": "success",
1298 | "analyzer_history": all_history,
1299 | "engine_execution_history": engine_history[-10:], # Last 10 executions
1300 | "total_analyzers": len(all_history),
1301 | "timestamp": datetime.now().isoformat()
1302 | }
1303 |
1304 | except Exception as e:
1305 | logger.error(f"Error getting analyzer execution history: {str(e)}")
1306 | return {
1307 | "status": "error",
1308 | "message": f"Failed to get execution history: {str(e)}",
1309 | "timestamp": datetime.now().isoformat()
1310 | }
1311 |
1312 | def _attempt_auto_recovery(self, analysis_type: str, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]:
1313 | """
1314 | Attempt automatic recovery from analyzer failures.
1315 |
1316 | Args:
1317 | analysis_type: Type of analysis that failed
1318 | error: Exception that occurred
1319 | context: Analysis context
1320 |
1321 | Returns:
1322 | Dictionary containing recovery attempt results
1323 | """
1324 | recovery_result = {
1325 | "attempted": True,
1326 | "success": False,
1327 | "strategy_used": "none",
1328 | "timestamp": datetime.now().isoformat()
1329 | }
1330 |
1331 | try:
1332 | error_message = str(error).lower()
1333 |
1334 | # Timeout recovery: reduce scope and retry
1335 | if "timeout" in error_message:
1336 | recovery_result["strategy_used"] = "reduce_scope_retry"
1337 |
1338 | # Reduce lookback days
1339 | original_lookback = context.get("lookback_days", 30)
1340 | reduced_lookback = max(7, original_lookback // 2)
1341 |
1342 | # Reduce bucket scope if applicable
1343 | original_buckets = context.get("bucket_names")
1344 | reduced_buckets = original_buckets[:5] if original_buckets and len(original_buckets) > 5 else original_buckets
1345 |
1346 | logger.info(f"Attempting timeout recovery for {analysis_type}: reducing lookback from {original_lookback} to {reduced_lookback} days")
1347 |
1348 | # This would be implemented as a retry mechanism in the actual execution flow
1349 | recovery_result["recovery_parameters"] = {
1350 | "reduced_lookback_days": reduced_lookback,
1351 | "reduced_bucket_scope": reduced_buckets,
1352 | "original_lookback_days": original_lookback,
1353 | "original_bucket_count": len(original_buckets) if original_buckets else 0
1354 | }
1355 | recovery_result["success"] = True # Mark as successful strategy identification
1356 |
1357 | # Rate limit recovery: implement backoff
1358 | elif any(keyword in error_message for keyword in ["rate", "throttl", "limit"]):
1359 | recovery_result["strategy_used"] = "exponential_backoff"
1360 |
1361 | # Calculate backoff delay
1362 | backoff_delay = min(60, 2 ** context.get("retry_count", 0))
1363 |
1364 | recovery_result["recovery_parameters"] = {
1365 | "backoff_delay_seconds": backoff_delay,
1366 | "retry_count": context.get("retry_count", 0) + 1,
1367 | "max_retries": 3
1368 | }
1369 | recovery_result["success"] = True
1370 |
1371 | # Memory recovery: trigger cleanup
1372 | elif "memory" in error_message:
1373 | recovery_result["strategy_used"] = "memory_cleanup"
1374 |
1375 | if self.memory_manager:
1376 | cleanup_result = self.memory_manager.force_cleanup("aggressive")
1377 | recovery_result["recovery_parameters"] = {
1378 | "cleanup_performed": True,
1379 | "cleanup_result": cleanup_result
1380 | }
1381 | recovery_result["success"] = True
1382 | else:
1383 | recovery_result["recovery_parameters"] = {
1384 | "cleanup_performed": False,
1385 | "reason": "memory_manager_not_available"
1386 | }
1387 |
1388 | # Network recovery: simple retry with delay
1389 | elif any(keyword in error_message for keyword in ["network", "connection", "dns"]):
1390 | recovery_result["strategy_used"] = "network_retry"
1391 | recovery_result["recovery_parameters"] = {
1392 | "retry_delay_seconds": 10,
1393 | "retry_count": context.get("retry_count", 0) + 1,
1394 | "max_retries": 2
1395 | }
1396 | recovery_result["success"] = True
1397 |
1398 | return recovery_result
1399 |
1400 | except Exception as recovery_error:
1401 | logger.error(f"Error during auto-recovery attempt: {str(recovery_error)}")
1402 | recovery_result["error"] = str(recovery_error)
1403 | return recovery_result
1404 |
1405 | def _prepare_analysis_params(self, analysis_type: str, **kwargs) -> Dict[str, Any]:
1406 | """Prepare parameters for specific analysis type."""
1407 | base_params = {
1408 | 'region': self.region,
1409 | 'session_id': self.session_id,
1410 | 'lookback_days': kwargs.get('lookback_days', 30),
1411 | 'include_cost_analysis': kwargs.get('include_cost_analysis', True),
1412 | 'bucket_names': kwargs.get('bucket_names'),
1413 | 'timeout_seconds': kwargs.get('timeout_seconds', 60)
1414 | }
1415 |
1416 | # Add analysis-specific parameters
1417 | if analysis_type == "storage_class":
1418 | base_params.update({
1419 | 'min_object_size_mb': kwargs.get('min_object_size_mb', 1),
1420 | 'include_recommendations': kwargs.get('include_recommendations', True)
1421 | })
1422 | elif analysis_type == "archive_optimization":
1423 | base_params.update({
1424 | 'min_age_days': kwargs.get('min_age_days', 180),
1425 | 'archive_tier_preference': kwargs.get('archive_tier_preference', 'auto'),
1426 | 'include_compliance_check': kwargs.get('include_compliance_check', True)
1427 | })
1428 | elif analysis_type == "api_cost":
1429 | base_params.update({
1430 | 'request_threshold': kwargs.get('request_threshold', 10000),
1431 | 'include_cloudfront_analysis': kwargs.get('include_cloudfront_analysis', True)
1432 | })
1433 | elif analysis_type == "multipart_cleanup":
1434 | base_params.update({
1435 | 'min_age_days': kwargs.get('min_age_days', 7),
1436 | 'max_results_per_bucket': kwargs.get('max_results_per_bucket', 1000)
1437 | })
1438 | elif analysis_type == "governance":
1439 | base_params.update({
1440 | 'check_lifecycle_policies': kwargs.get('check_lifecycle_policies', True),
1441 | 'check_versioning': kwargs.get('check_versioning', True),
1442 | 'check_tagging': kwargs.get('check_tagging', True)
1443 | })
1444 |
1445 | return base_params
1446 |
1447 | def _prepare_comprehensive_analysis_params(self, **kwargs) -> Dict[str, Any]:
1448 | """Prepare parameters for comprehensive analysis."""
1449 | return {
1450 | 'region': self.region,
1451 | 'session_id': self.session_id,
1452 | 'lookback_days': kwargs.get('lookback_days', 30),
1453 | 'include_cost_analysis': kwargs.get('include_cost_analysis', True),
1454 | 'bucket_names': kwargs.get('bucket_names'),
1455 | 'timeout_seconds': kwargs.get('timeout_seconds', 60),
1456 | 'store_results': kwargs.get('store_results', True),
1457 | 'include_cross_analysis': kwargs.get('include_cross_analysis', True)
1458 | }
1459 |
1460 | def _create_prioritized_analysis_tasks(self,
1461 | analysis_types: List[str],
1462 | available_analyses: List[Dict[str, Any]],
1463 | **kwargs) -> List[Dict[str, Any]]:
1464 | """
1465 | Create prioritized analysis tasks for parallel execution based on cost impact and execution time.
1466 |
1467 | Args:
1468 | analysis_types: List of analysis types to execute
1469 | available_analyses: List of analysis metadata with priority information
1470 | **kwargs: Analysis parameters
1471 |
1472 | Returns:
1473 | List of prioritized service call definitions for parallel execution
1474 | """
1475 | service_calls = []
1476 |
1477 | # Create priority mapping for quick lookup
1478 | priority_map = {
1479 | analysis["analysis_type"]: analysis
1480 | for analysis in available_analyses
1481 | }
1482 |
1483 | # Sort analysis types by priority (highest first), then by execution time (shortest first)
1484 | sorted_analyses = sorted(
1485 | analysis_types,
1486 | key=lambda x: (
1487 | priority_map.get(x, {}).get("priority", 1),
1488 | -priority_map.get(x, {}).get("execution_time_estimate", 30) # Negative for ascending order
1489 | ),
1490 | reverse=True
1491 | )
1492 |
1493 | logger.info(f"Task prioritization order: {sorted_analyses}")
1494 |
1495 | # Create service call definitions with proper prioritization
1496 | for i, analysis_type in enumerate(sorted_analyses):
1497 | analysis_info = priority_map.get(analysis_type, {})
1498 |
1499 | # Create synchronous wrapper function for the analysis (ServiceOrchestrator expects sync functions)
1500 | def create_analysis_wrapper(atype=analysis_type, params=kwargs.copy()):
1501 | def analysis_wrapper():
1502 | # Run async analysis in sync context
1503 | import asyncio
1504 | try:
1505 | # Get or create event loop
1506 | try:
1507 | loop = asyncio.get_event_loop()
1508 | except RuntimeError:
1509 | loop = asyncio.new_event_loop()
1510 | asyncio.set_event_loop(loop)
1511 |
1512 | # Run the analysis
1513 | if loop.is_running():
1514 | # If loop is already running, we need to use a different approach
1515 | import concurrent.futures
1516 | with concurrent.futures.ThreadPoolExecutor() as executor:
1517 | future = executor.submit(asyncio.run, self.analysis_engine.run_analysis(atype, **params))
1518 | return future.result(timeout=params.get("timeout_seconds", 60))
1519 | else:
1520 | return loop.run_until_complete(self.analysis_engine.run_analysis(atype, **params))
1521 | except Exception as e:
1522 | logger.error(f"Error in analysis wrapper for {atype}: {str(e)}")
1523 | return {
1524 | "status": "error",
1525 | "analysis_type": atype,
1526 | "error_message": str(e),
1527 | "timestamp": datetime.now().isoformat()
1528 | }
1529 | return analysis_wrapper
1530 |
1531 | # Calculate dynamic timeout based on priority and estimated execution time
1532 | base_timeout = analysis_info.get("execution_time_estimate", 30)
1533 | priority_multiplier = 1.0 + (analysis_info.get("priority", 1) * 0.2) # Higher priority gets more time
1534 | dynamic_timeout = int(base_timeout * priority_multiplier) + 15 # Add buffer
1535 |
1536 | service_call = {
1537 | "service": "s3_analysis_engine",
1538 | "operation": analysis_type,
1539 | "function": create_analysis_wrapper(),
1540 | "args": (),
1541 | "kwargs": {},
1542 | "timeout": kwargs.get("timeout_seconds", dynamic_timeout),
1543 | "priority": analysis_info.get("priority", 1),
1544 | "metadata": {
1545 | "analysis_type": analysis_type,
1546 | "cost_impact": analysis_info.get("cost_impact", "unknown"),
1547 | "execution_time_estimate": analysis_info.get("execution_time_estimate", 30),
1548 | "execution_order": i + 1,
1549 | "dependencies": analysis_info.get("dependencies", []),
1550 | "description": analysis_info.get("description", "S3 optimization analysis"),
1551 | "dynamic_timeout": dynamic_timeout,
1552 | "priority_multiplier": priority_multiplier
1553 | }
1554 | }
1555 |
1556 | service_calls.append(service_call)
1557 |
1558 | return service_calls
1559 |
1560 | def _get_task_prioritization_summary(self, available_analyses: List[Dict[str, Any]]) -> Dict[str, Any]:
1561 | """
1562 | Get summary of task prioritization for reporting.
1563 |
1564 | Args:
1565 | available_analyses: List of analysis metadata
1566 |
1567 | Returns:
1568 | Dictionary containing prioritization summary
1569 | """
1570 | prioritization_summary = {
1571 | "high_priority_analyses": [],
1572 | "medium_priority_analyses": [],
1573 | "low_priority_analyses": [],
1574 | "prioritization_criteria": {
1575 | "cost_impact": "Higher cost impact gets higher priority",
1576 | "execution_time": "Shorter execution time gets higher priority within same cost impact",
1577 | "dependencies": "Analyses with dependencies are scheduled after their dependencies"
1578 | }
1579 | }
1580 |
1581 | for analysis in available_analyses:
1582 | priority = analysis.get("priority", 1)
1583 | analysis_summary = {
1584 | "analysis_type": analysis["analysis_type"],
1585 | "priority": priority,
1586 | "cost_impact": analysis.get("cost_impact", "unknown"),
1587 | "execution_time_estimate": analysis.get("execution_time_estimate", 30)
1588 | }
1589 |
1590 | if priority >= 4:
1591 | prioritization_summary["high_priority_analyses"].append(analysis_summary)
1592 | elif priority >= 2:
1593 | prioritization_summary["medium_priority_analyses"].append(analysis_summary)
1594 | else:
1595 | prioritization_summary["low_priority_analyses"].append(analysis_summary)
1596 |
1597 | return prioritization_summary
1598 |
1599 | def _execute_cross_analysis_queries(self, stored_tables: List[str]) -> Dict[str, Any]:
1600 | """
1601 | Execute cross-analysis aggregation queries for deeper insights.
1602 |
1603 | Args:
1604 | stored_tables: List of table names stored in the session
1605 |
1606 | Returns:
1607 | Dictionary containing cross-analysis query results
1608 | """
1609 | cross_analysis_results = {}
1610 |
1611 | try:
1612 | # Import aggregation queries
1613 | from .s3_aggregation_queries import S3AggregationQueries
1614 |
1615 | # Get cross-analysis queries based on available tables
1616 | aggregation_queries = self._get_cross_analysis_queries_for_tables(stored_tables)
1617 |
1618 | if aggregation_queries:
1619 | logger.info(f"Executing {len(aggregation_queries)} cross-analysis queries")
1620 |
1621 | # Execute aggregation queries using ServiceOrchestrator
1622 | cross_analysis_results = self.service_orchestrator.aggregate_results(aggregation_queries)
1623 |
1624 | logger.info(f"Completed cross-analysis queries: {len(cross_analysis_results)} results")
1625 | else:
1626 | logger.info("No cross-analysis queries available for current table set")
1627 |
1628 | except Exception as e:
1629 | logger.error(f"Error executing cross-analysis queries: {str(e)}")
1630 | cross_analysis_results = {"error": str(e)}
1631 |
1632 | return cross_analysis_results
1633 |
1634 | def _get_cross_analysis_queries_for_tables(self, stored_tables: List[str]) -> List[Dict[str, str]]:
1635 | """
1636 | Get cross-analysis queries adapted for the specific tables available in the session.
1637 |
1638 | Args:
1639 | stored_tables: List of table names stored in the session
1640 |
1641 | Returns:
1642 | List of query definitions adapted for available tables
1643 | """
1644 | queries = []
1645 |
1646 | # Filter tables to S3-related ones
1647 | s3_tables = [table for table in stored_tables if table.startswith('s3_')]
1648 |
1649 | if not s3_tables:
1650 | return queries
1651 |
1652 | # Get table names by analysis type
1653 | table_map = {}
1654 | for table in s3_tables:
1655 | if 'general_spend' in table:
1656 | table_map['general_spend'] = table
1657 | elif 'storage_class' in table:
1658 | table_map['storage_class'] = table
1659 | elif 'archive_optimization' in table:
1660 | table_map['archive_optimization'] = table
1661 | elif 'api_cost' in table:
1662 | table_map['api_cost'] = table
1663 | elif 'multipart_cleanup' in table:
1664 | table_map['multipart_cleanup'] = table
1665 | elif 'governance' in table:
1666 | table_map['governance'] = table
1667 | elif 'comprehensive' in table:
1668 | table_map['comprehensive'] = table
1669 |
1670 | # Query 1: Recommendations by priority (if we have recommendation tables)
1671 | recommendation_tables = [table for table in s3_tables if 'comprehensive' not in table]
1672 | if recommendation_tables:
1673 | union_clauses = []
1674 | for table in recommendation_tables:
1675 | union_clauses.append(f"""
1676 | SELECT
1677 | priority,
1678 | analysis_type,
1679 | title,
1680 | description,
1681 | potential_savings,
1682 | implementation_effort
1683 | FROM "{table}"
1684 | WHERE record_type = 'recommendation'
1685 | """)
1686 |
1687 | if union_clauses:
1688 | queries.append({
1689 | "name": "recommendations_by_priority",
1690 | "query": f"""
1691 | SELECT
1692 | priority,
1693 | COUNT(*) as recommendation_count,
1694 | SUM(CASE WHEN potential_savings > 0 THEN potential_savings ELSE 0 END) as total_potential_savings,
1695 | AVG(CASE WHEN potential_savings > 0 THEN potential_savings ELSE NULL END) as avg_potential_savings
1696 | FROM (
1697 | {' UNION ALL '.join(union_clauses)}
1698 | ) all_recommendations
1699 | GROUP BY priority
1700 | ORDER BY
1701 | CASE priority
1702 | WHEN 'high' THEN 3
1703 | WHEN 'medium' THEN 2
1704 | WHEN 'low' THEN 1
1705 | ELSE 0
1706 | END DESC
1707 | """
1708 | })
1709 |
1710 | # Query 2: Top optimization opportunities (if we have comprehensive table)
1711 | if 'comprehensive' in table_map:
1712 | queries.append({
1713 | "name": "top_optimization_opportunities",
1714 | "query": f"""
1715 | SELECT
1716 | title,
1717 | description,
1718 | potential_savings,
1719 | source_analysis,
1720 | implementation_effort,
1721 | priority,
1722 | rank
1723 | FROM "{table_map['comprehensive']}"
1724 | WHERE record_type = 'optimization_opportunity'
1725 | ORDER BY potential_savings DESC, rank ASC
1726 | LIMIT 10
1727 | """
1728 | })
1729 |
1730 | # Query 3: Analysis execution summary
1731 | metadata_tables = [table for table in s3_tables if 'comprehensive' not in table]
1732 | if metadata_tables:
1733 | union_clauses = []
1734 | for table in metadata_tables:
1735 | union_clauses.append(f"""
1736 | SELECT
1737 | analysis_type,
1738 | status,
1739 | execution_time,
1740 | recommendations_count,
1741 | data_sources,
1742 | timestamp
1743 | FROM "{table}"
1744 | WHERE record_type = 'metadata'
1745 | """)
1746 |
1747 | if union_clauses:
1748 | queries.append({
1749 | "name": "analysis_execution_summary",
1750 | "query": f"""
1751 | SELECT
1752 | analysis_type,
1753 | status,
1754 | execution_time,
1755 | recommendations_count,
1756 | data_sources,
1757 | timestamp
1758 | FROM (
1759 | {' UNION ALL '.join(union_clauses)}
1760 | ) metadata
1761 | ORDER BY
1762 | CASE status
1763 | WHEN 'success' THEN 1
1764 | WHEN 'error' THEN 2
1765 | ELSE 3
1766 | END,
1767 | execution_time ASC
1768 | """
1769 | })
1770 |
1771 | # Query 4: Total potential savings by analysis type
1772 | if recommendation_tables:
1773 | union_clauses = []
1774 | for table in recommendation_tables:
1775 | union_clauses.append(f"""
1776 | SELECT
1777 | analysis_type,
1778 | potential_savings,
1779 | priority,
1780 | implementation_effort
1781 | FROM "{table}"
1782 | WHERE record_type = 'recommendation'
1783 | """)
1784 |
1785 | if union_clauses:
1786 | queries.append({
1787 | "name": "total_savings_by_analysis",
1788 | "query": f"""
1789 | SELECT
1790 | analysis_type,
1791 | COUNT(*) as recommendation_count,
1792 | SUM(CASE WHEN potential_savings > 0 THEN potential_savings ELSE 0 END) as total_potential_savings,
1793 | AVG(CASE WHEN potential_savings > 0 THEN potential_savings ELSE NULL END) as avg_potential_savings,
1794 | MAX(potential_savings) as max_potential_savings,
1795 | COUNT(CASE WHEN priority = 'high' THEN 1 END) as high_priority_count,
1796 | COUNT(CASE WHEN priority = 'medium' THEN 1 END) as medium_priority_count,
1797 | COUNT(CASE WHEN priority = 'low' THEN 1 END) as low_priority_count
1798 | FROM (
1799 | {' UNION ALL '.join(union_clauses)}
1800 | ) all_recommendations
1801 | GROUP BY analysis_type
1802 | ORDER BY total_potential_savings DESC
1803 | """
1804 | })
1805 |
1806 | # Query 5: Cross-analysis insights (if we have comprehensive table)
1807 | if 'comprehensive' in table_map:
1808 | queries.append({
1809 | "name": "cross_analysis_insights",
1810 | "query": f"""
1811 | SELECT
1812 | insight_type,
1813 | title,
1814 | description,
1815 | recommendation,
1816 | analyses_involved,
1817 | timestamp
1818 | FROM "{table_map['comprehensive']}"
1819 | WHERE record_type = 'cross_analysis_insight'
1820 | ORDER BY timestamp DESC
1821 | """
1822 | })
1823 |
1824 | logger.info(f"Generated {len(queries)} cross-analysis queries for {len(s3_tables)} S3 tables")
1825 | return queries
1826 |
1827 | def _get_analysis_priority(self, analysis_type: str) -> int:
1828 | """Get priority for analysis type (higher number = higher priority)."""
1829 | return self.analysis_engine.analysis_priorities.get(analysis_type, {}).get("priority", 1)
1830 |
1831 | def _get_cache_ttl_for_analysis(self, analysis_type: str) -> int:
1832 | """
1833 | Get appropriate cache TTL for analysis type.
1834 |
1835 | Args:
1836 | analysis_type: Type of analysis
1837 |
1838 | Returns:
1839 | TTL in seconds
1840 | """
1841 | # Different analysis types have different cache lifetimes
1842 | ttl_mapping = {
1843 | "general_spend": 1800, # 30 minutes - cost data changes frequently
1844 | "storage_class": 3600, # 1 hour - storage class analysis is more stable
1845 | "archive_optimization": 7200, # 2 hours - archive recommendations change slowly
1846 | "api_cost": 1800, # 30 minutes - API costs can fluctuate
1847 | "multipart_cleanup": 900, # 15 minutes - multipart uploads change frequently
1848 | "governance": 3600, # 1 hour - governance policies are relatively stable
1849 | "comprehensive": 1800 # 30 minutes - comprehensive analysis includes dynamic data
1850 | }
1851 |
1852 | return ttl_mapping.get(analysis_type, 1800) # Default 30 minutes
1853 |
1854 | def get_performance_summary(self) -> Dict[str, Any]:
1855 | """Get comprehensive performance summary from all optimization components."""
1856 | try:
1857 | return {
1858 | "timestamp": datetime.now().isoformat(),
1859 | "session_id": self.session_id,
1860 | "region": self.region,
1861 | "performance_monitor": self.performance_monitor.get_performance_summary(),
1862 | "memory_manager": self.memory_manager.get_memory_statistics(),
1863 | "timeout_handler": self.timeout_handler.get_performance_statistics(),
1864 | "caches": {
1865 | "pricing_cache": self.pricing_cache.get_statistics(),
1866 | "bucket_metadata_cache": self.bucket_metadata_cache.get_statistics(),
1867 | "analysis_results_cache": self.analysis_results_cache.get_statistics()
1868 | }
1869 | }
1870 | except Exception as e:
1871 | logger.error(f"Error getting performance summary: {e}")
1872 | return {"error": str(e)}
1873 |
1874 | def optimize_performance(self):
1875 | """Optimize performance by running optimization routines on all components."""
1876 | try:
1877 | logger.info("Running performance optimization")
1878 |
1879 | # Optimize timeout configuration based on historical data
1880 | self.timeout_handler.optimize_configuration()
1881 |
1882 | # Force gentle memory cleanup
1883 | self.memory_manager.force_cleanup("gentle")
1884 |
1885 | # Optimize caches
1886 | for cache in [self.pricing_cache, self.bucket_metadata_cache, self.analysis_results_cache]:
1887 | if hasattr(cache, '_optimize_cache'):
1888 | cache._optimize_cache()
1889 |
1890 | logger.info("Performance optimization completed")
1891 |
1892 | except Exception as e:
1893 | logger.error(f"Error during performance optimization: {e}")
1894 |
1895 | def clear_caches(self, cache_types: Optional[List[str]] = None):
1896 | """
1897 | Clear specified caches or all caches.
1898 |
1899 | Args:
1900 | cache_types: List of cache types to clear (None for all)
1901 | """
1902 | cache_map = {
1903 | "pricing": self.pricing_cache,
1904 | "bucket_metadata": self.bucket_metadata_cache,
1905 | "analysis_results": self.analysis_results_cache
1906 | }
1907 |
1908 | if cache_types is None:
1909 | cache_types = list(cache_map.keys())
1910 |
1911 | for cache_type in cache_types:
1912 | if cache_type in cache_map:
1913 | cache_map[cache_type].clear()
1914 | logger.info(f"Cleared {cache_type} cache")
1915 | else:
1916 | logger.warning(f"Unknown cache type: {cache_type}")
1917 |
1918 |
1919 |
1920 | def _store_analysis_results(self, analysis_type: str, result: Dict[str, Any]):
1921 | """Store analysis results in session database with proper schema for cross-analysis queries."""
1922 | try:
1923 | table_name = f"s3_{analysis_type}_{int(time.time())}"
1924 |
1925 | # Convert result to structured format for storage
1926 | data_to_store = []
1927 |
1928 | # Store main result metadata with consistent column names
1929 | data_to_store.append({
1930 | "record_type": "metadata",
1931 | "analysis_type": analysis_type,
1932 | "status": result.get("status"),
1933 | "execution_time": result.get("execution_time", 0),
1934 | "timestamp": result.get("timestamp", datetime.now().isoformat()),
1935 | "session_id": self.session_id,
1936 | "data_sources": str(result.get("data_sources", [])),
1937 | "recommendations_count": len(result.get("recommendations", [])),
1938 | # Add placeholder columns for cross-analysis compatibility
1939 | "priority": None,
1940 | "title": None,
1941 | "description": None,
1942 | "potential_savings": 0,
1943 | "implementation_effort": None,
1944 | "recommendation_id": None,
1945 | "rec_type": None
1946 | })
1947 |
1948 | # Store recommendations separately for easier querying with consistent schema
1949 | for i, rec in enumerate(result.get("recommendations", [])):
1950 | rec_data = {
1951 | "record_type": "recommendation",
1952 | "analysis_type": analysis_type,
1953 | "recommendation_id": i,
1954 | "rec_type": rec.get("type", ""),
1955 | "priority": rec.get("priority", "medium"),
1956 | "title": rec.get("title", ""),
1957 | "description": rec.get("description", ""),
1958 | "potential_savings": rec.get("potential_savings", 0),
1959 | "implementation_effort": rec.get("implementation_effort", "medium"),
1960 | "timestamp": datetime.now().isoformat(),
1961 | # Add metadata columns for consistency
1962 | "status": None,
1963 | "execution_time": None,
1964 | "session_id": self.session_id,
1965 | "data_sources": None,
1966 | "recommendations_count": None
1967 | }
1968 | data_to_store.append(rec_data)
1969 |
1970 | # Store analysis data summary with consistent schema
1971 | if result.get("data"):
1972 | data_summary = {
1973 | "record_type": "data_summary",
1974 | "analysis_type": analysis_type,
1975 | "data_keys": str(list(result["data"].keys())),
1976 | "timestamp": datetime.now().isoformat(),
1977 | # Add placeholder columns for consistency
1978 | "status": None,
1979 | "execution_time": None,
1980 | "session_id": self.session_id,
1981 | "data_sources": None,
1982 | "recommendations_count": None,
1983 | "priority": None,
1984 | "title": None,
1985 | "description": None,
1986 | "potential_savings": 0,
1987 | "implementation_effort": None,
1988 | "recommendation_id": None,
1989 | "rec_type": None
1990 | }
1991 | data_to_store.append(data_summary)
1992 |
1993 | success = self.session_manager.store_data(
1994 | self.session_id,
1995 | table_name,
1996 | data_to_store
1997 | )
1998 |
1999 | if success:
2000 | logger.info(f"Stored {analysis_type} results ({len(data_to_store)} records) in table {table_name}")
2001 | else:
2002 | logger.warning(f"Failed to store {analysis_type} results")
2003 |
2004 | except Exception as e:
2005 | logger.error(f"Error storing analysis results for {analysis_type}: {str(e)}")
2006 |
2007 | def _store_comprehensive_results(self, aggregated_results: Dict[str, Any], execution_results: Dict[str, Any]):
2008 | """Store comprehensive analysis results with execution metadata in session database."""
2009 | try:
2010 | table_name = f"s3_comprehensive_{int(time.time())}"
2011 |
2012 | data_to_store = []
2013 |
2014 | # Store aggregation metadata with execution information
2015 | metadata = aggregated_results.get("aggregation_metadata", {})
2016 | execution_summary = execution_results.get("execution_summary", execution_results)
2017 |
2018 | data_to_store.append({
2019 | "record_type": "comprehensive_metadata",
2020 | "total_analyses": metadata.get("total_analyses", 0),
2021 | "successful_analyses": metadata.get("successful_analyses", 0),
2022 | "failed_analyses": metadata.get("failed_analyses", 0),
2023 | "total_potential_savings": aggregated_results.get("total_potential_savings", 0),
2024 | "aggregated_at": metadata.get("aggregated_at", datetime.now().isoformat()),
2025 | "session_id": self.session_id,
2026 | "parallel_execution_time": execution_summary.get("total_execution_time", 0),
2027 | "successful_tasks": execution_summary.get("successful", 0),
2028 | "failed_tasks": execution_summary.get("failed", 0),
2029 | "timeout_tasks": execution_summary.get("timeout", 0),
2030 | "stored_tables_count": len(execution_summary.get("stored_tables", []))
2031 | })
2032 |
2033 | # Store execution task details for performance analysis
2034 | for task_id, task_result in execution_summary.get("results", {}).items():
2035 | task_data = {
2036 | "record_type": "task_execution",
2037 | "task_id": task_id,
2038 | "service": task_result.get("service", ""),
2039 | "operation": task_result.get("operation", ""),
2040 | "status": task_result.get("status", ""),
2041 | "execution_time": task_result.get("execution_time", 0),
2042 | "error": task_result.get("error", ""),
2043 | "stored_table": task_result.get("stored_table", ""),
2044 | "timestamp": datetime.now().isoformat()
2045 | }
2046 | data_to_store.append(task_data)
2047 |
2048 | # Store top optimization opportunities with enhanced metadata
2049 | for i, opportunity in enumerate(aggregated_results.get("optimization_opportunities", [])[:10]):
2050 | opp_data = {
2051 | "record_type": "optimization_opportunity",
2052 | "rank": opportunity.get("rank", i + 1),
2053 | "title": opportunity.get("title", ""),
2054 | "description": opportunity.get("description", ""),
2055 | "potential_savings": opportunity.get("potential_savings", 0),
2056 | "implementation_effort": opportunity.get("implementation_effort", "medium"),
2057 | "source_analysis": opportunity.get("source_analysis", ""),
2058 | "priority": opportunity.get("priority", "medium"),
2059 | "timestamp": datetime.now().isoformat()
2060 | }
2061 | data_to_store.append(opp_data)
2062 |
2063 | # Store cross-analysis insights
2064 | for i, insight in enumerate(aggregated_results.get("cross_analysis_insights", [])):
2065 | insight_data = {
2066 | "record_type": "cross_analysis_insight",
2067 | "insight_id": i,
2068 | "insight_type": insight.get("type", ""),
2069 | "title": insight.get("title", ""),
2070 | "description": insight.get("description", ""),
2071 | "recommendation": insight.get("recommendation", ""),
2072 | "analyses_involved": str(insight.get("analyses_involved", [])),
2073 | "timestamp": datetime.now().isoformat()
2074 | }
2075 | data_to_store.append(insight_data)
2076 |
2077 | # Store cost insights summary
2078 | cost_insights = aggregated_results.get("cost_insights", {})
2079 | if cost_insights and not cost_insights.get("error"):
2080 | cost_data = {
2081 | "record_type": "cost_insights_summary",
2082 | "total_storage_cost": cost_insights.get("total_storage_cost", 0),
2083 | "total_transfer_cost": cost_insights.get("total_transfer_cost", 0),
2084 | "total_api_cost": cost_insights.get("total_api_cost", 0),
2085 | "highest_cost_area": cost_insights.get("highest_cost_areas", [{}])[0].get("0", "Unknown") if cost_insights.get("highest_cost_areas") else "Unknown",
2086 | "optimization_potential_count": len(cost_insights.get("cost_optimization_potential", {})),
2087 | "timestamp": datetime.now().isoformat()
2088 | }
2089 | data_to_store.append(cost_data)
2090 |
2091 | success = self.session_manager.store_data(
2092 | self.session_id,
2093 | table_name,
2094 | data_to_store
2095 | )
2096 |
2097 | if success:
2098 | logger.info(f"Stored comprehensive results ({len(data_to_store)} records) in table {table_name}")
2099 | else:
2100 | logger.warning("Failed to store comprehensive results")
2101 |
2102 | except Exception as e:
2103 | logger.error(f"Error storing comprehensive results: {str(e)}")
2104 |
2105 | def get_available_analyses(self) -> List[Dict[str, Any]]:
2106 | """
2107 | Get list of available analyses with metadata.
2108 |
2109 | Returns:
2110 | List of analysis information dictionaries
2111 | """
2112 | return self.analysis_engine.get_available_analyses()
2113 |
2114 | def generate_cross_analyzer_insights(self, analysis_results: Dict[str, Any]) -> List[Dict[str, Any]]:
2115 | """
2116 | Generate cross-analyzer insights by analyzing relationships between different analysis results.
2117 |
2118 | Args:
2119 | analysis_results: Dictionary of analysis results from multiple analyzers
2120 |
2121 | Returns:
2122 | List of cross-analyzer insight dictionaries
2123 | """
2124 | insights = []
2125 |
2126 | try:
2127 | # Extract successful results for cross-analysis
2128 | successful_results = {}
2129 | for task_id, result in analysis_results.items():
2130 | # Handle both TaskResult objects and direct dictionaries
2131 | if hasattr(result, 'data') and result.data and hasattr(result, 'status') and result.status == "success":
2132 | analysis_type = result.data.get("analysis_type", "unknown")
2133 | successful_results[analysis_type] = result.data
2134 | elif isinstance(result, dict) and result.get("status") == "success":
2135 | analysis_type = result.get("analysis_type", "unknown")
2136 | successful_results[analysis_type] = result
2137 |
2138 | logger.info(f"Generating cross-analyzer insights from {len(successful_results)} successful analyses")
2139 |
2140 | # Insight 1: Storage cost optimization correlation
2141 | if "general_spend" in successful_results and "storage_class" in successful_results:
2142 | spend_data = successful_results["general_spend"].get("data", {})
2143 | storage_data = successful_results["storage_class"].get("data", {})
2144 |
2145 | total_storage_cost = spend_data.get("total_storage_cost", 0)
2146 | storage_recommendations = successful_results["storage_class"].get("recommendations", [])
2147 | storage_savings = sum(rec.get("potential_savings", 0) for rec in storage_recommendations)
2148 |
2149 | if total_storage_cost > 0 and storage_savings > 0:
2150 | savings_percentage = (storage_savings / total_storage_cost) * 100
2151 |
2152 | insights.append({
2153 | "type": "cost_optimization_correlation",
2154 | "title": "Storage Class Optimization Impact",
2155 | "description": f"Storage class optimization could reduce total storage costs by {savings_percentage:.1f}% (${storage_savings:.2f} out of ${total_storage_cost:.2f})",
2156 | "recommendation": "Prioritize storage class optimization as it offers significant cost reduction potential",
2157 | "analyses_involved": ["general_spend", "storage_class"],
2158 | "metrics": {
2159 | "total_storage_cost": total_storage_cost,
2160 | "potential_storage_savings": storage_savings,
2161 | "savings_percentage": savings_percentage
2162 | },
2163 | "priority": "high" if savings_percentage > 20 else "medium",
2164 | "confidence": "high"
2165 | })
2166 |
2167 | # Insight 2: Governance and cost correlation
2168 | if "governance" in successful_results and "multipart_cleanup" in successful_results:
2169 | governance_violations = len(successful_results["governance"].get("recommendations", []))
2170 | multipart_savings = sum(
2171 | rec.get("potential_savings", 0)
2172 | for rec in successful_results["multipart_cleanup"].get("recommendations", [])
2173 | )
2174 |
2175 | if governance_violations > 0 and multipart_savings > 0:
2176 | insights.append({
2177 | "type": "governance_cost_correlation",
2178 | "title": "Governance Gaps Leading to Cost Waste",
2179 | "description": f"Found {governance_violations} governance violations and ${multipart_savings:.2f} in multipart upload waste",
2180 | "recommendation": "Implement lifecycle policies to prevent future cost waste from incomplete uploads",
2181 | "analyses_involved": ["governance", "multipart_cleanup"],
2182 | "metrics": {
2183 | "governance_violations": governance_violations,
2184 | "multipart_waste_cost": multipart_savings
2185 | },
2186 | "priority": "high" if multipart_savings > 100 else "medium",
2187 | "confidence": "high"
2188 | })
2189 |
2190 | # Insight 3: Archive optimization opportunity
2191 | if "storage_class" in successful_results and "archive_optimization" in successful_results:
2192 | storage_recs = successful_results["storage_class"].get("recommendations", [])
2193 | archive_recs = successful_results["archive_optimization"].get("recommendations", [])
2194 |
2195 | # Look for overlapping optimization opportunities
2196 | storage_savings = sum(rec.get("potential_savings", 0) for rec in storage_recs)
2197 | archive_savings = sum(rec.get("potential_savings", 0) for rec in archive_recs)
2198 |
2199 | if storage_savings > 0 and archive_savings > 0:
2200 | combined_savings = storage_savings + archive_savings
2201 |
2202 | insights.append({
2203 | "type": "combined_optimization_opportunity",
2204 | "title": "Combined Storage and Archive Optimization",
2205 | "description": f"Combining storage class optimization (${storage_savings:.2f}) with archive strategies (${archive_savings:.2f}) could save ${combined_savings:.2f} total",
2206 | "recommendation": "Implement a phased approach: optimize storage classes first, then implement archive policies for long-term data",
2207 | "analyses_involved": ["storage_class", "archive_optimization"],
2208 | "metrics": {
2209 | "storage_class_savings": storage_savings,
2210 | "archive_savings": archive_savings,
2211 | "combined_savings": combined_savings
2212 | },
2213 | "priority": "high" if combined_savings > 500 else "medium",
2214 | "confidence": "medium"
2215 | })
2216 |
2217 | # Insight 4: API cost vs storage cost balance
2218 | if "general_spend" in successful_results and "api_cost" in successful_results:
2219 | spend_data = successful_results["general_spend"].get("data", {})
2220 | api_recs = successful_results["api_cost"].get("recommendations", [])
2221 |
2222 | total_api_cost = spend_data.get("total_api_cost", 0)
2223 | total_storage_cost = spend_data.get("total_storage_cost", 0)
2224 | api_savings = sum(rec.get("potential_savings", 0) for rec in api_recs)
2225 |
2226 | if total_api_cost > 0 and total_storage_cost > 0:
2227 | api_percentage = (total_api_cost / (total_api_cost + total_storage_cost)) * 100
2228 |
2229 | if api_percentage > 30: # API costs are significant
2230 | insights.append({
2231 | "type": "cost_distribution_analysis",
2232 | "title": "High API Cost Ratio Detected",
2233 | "description": f"API costs represent {api_percentage:.1f}% of total S3 costs (${total_api_cost:.2f} API vs ${total_storage_cost:.2f} storage)",
2234 | "recommendation": "Focus on API cost optimization through caching, request consolidation, and CloudFront integration",
2235 | "analyses_involved": ["general_spend", "api_cost"],
2236 | "metrics": {
2237 | "api_cost_percentage": api_percentage,
2238 | "total_api_cost": total_api_cost,
2239 | "total_storage_cost": total_storage_cost,
2240 | "api_optimization_potential": api_savings
2241 | },
2242 | "priority": "high" if api_percentage > 50 else "medium",
2243 | "confidence": "high"
2244 | })
2245 |
2246 | # Insight 5: Comprehensive optimization priority ranking
2247 | if len(successful_results) >= 3:
2248 | # Calculate total potential savings by analysis type
2249 | savings_by_analysis = {}
2250 | for analysis_type, result in successful_results.items():
2251 | recommendations = result.get("recommendations", [])
2252 | total_savings = sum(rec.get("potential_savings", 0) for rec in recommendations)
2253 | savings_by_analysis[analysis_type] = total_savings
2254 |
2255 | # Sort by savings potential
2256 | sorted_savings = sorted(savings_by_analysis.items(), key=lambda x: x[1], reverse=True)
2257 |
2258 | if sorted_savings and sorted_savings[0][1] > 0:
2259 | top_analysis = sorted_savings[0][0]
2260 | top_savings = sorted_savings[0][1]
2261 | total_savings = sum(savings_by_analysis.values())
2262 |
2263 | insights.append({
2264 | "type": "optimization_priority_ranking",
2265 | "title": "Optimization Priority Recommendations",
2266 | "description": f"Based on potential savings analysis, prioritize {top_analysis} optimization (${top_savings:.2f} of ${total_savings:.2f} total potential)",
2267 | "recommendation": f"Start with {top_analysis} optimization for maximum impact, then proceed with other optimizations in order of savings potential",
2268 | "analyses_involved": list(successful_results.keys()),
2269 | "metrics": {
2270 | "priority_ranking": [{"analysis": analysis, "savings": savings} for analysis, savings in sorted_savings],
2271 | "total_potential_savings": total_savings,
2272 | "top_opportunity": {"analysis": top_analysis, "savings": top_savings}
2273 | },
2274 | "priority": "high",
2275 | "confidence": "high"
2276 | })
2277 |
2278 | # Insight 6: Data freshness and analysis reliability
2279 | analysis_timestamps = {}
2280 | for analysis_type, result in successful_results.items():
2281 | timestamp = result.get("timestamp")
2282 | if timestamp:
2283 | analysis_timestamps[analysis_type] = timestamp
2284 |
2285 | if len(analysis_timestamps) > 1:
2286 | # Check for timestamp consistency (all analyses should be recent)
2287 | from datetime import datetime, timedelta
2288 | current_time = datetime.now()
2289 | old_analyses = []
2290 |
2291 | for analysis_type, timestamp_str in analysis_timestamps.items():
2292 | try:
2293 | analysis_time = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
2294 | if current_time - analysis_time > timedelta(hours=24):
2295 | old_analyses.append(analysis_type)
2296 | except:
2297 | pass
2298 |
2299 | if old_analyses:
2300 | insights.append({
2301 | "type": "data_freshness_warning",
2302 | "title": "Analysis Data Freshness Concern",
2303 | "description": f"Some analyses may be using stale data: {', '.join(old_analyses)}",
2304 | "recommendation": "Re-run analyses with stale data to ensure recommendations are based on current information",
2305 | "analyses_involved": old_analyses,
2306 | "metrics": {
2307 | "stale_analyses": old_analyses,
2308 | "total_analyses": len(analysis_timestamps)
2309 | },
2310 | "priority": "medium",
2311 | "confidence": "medium"
2312 | })
2313 |
2314 | logger.info(f"Generated {len(insights)} cross-analyzer insights")
2315 | return insights
2316 |
2317 | except Exception as e:
2318 | logger.error(f"Error generating cross-analyzer insights: {str(e)}")
2319 | return [{
2320 | "type": "insight_generation_error",
2321 | "title": "Cross-Analyzer Insight Generation Failed",
2322 | "description": f"Failed to generate cross-analyzer insights: {str(e)}",
2323 | "recommendation": "Review analysis results manually for optimization opportunities",
2324 | "analyses_involved": list(analysis_results.keys()) if analysis_results else [],
2325 | "priority": "low",
2326 | "confidence": "low"
2327 | }]
2328 |
2329 | def aggregate_results_with_insights(self, results: Dict[str, Any], include_cross_analysis: bool = True) -> Dict[str, Any]:
2330 | """
2331 | Enhanced result aggregation with cross-analyzer insights and comprehensive analysis.
2332 |
2333 | Args:
2334 | results: Dictionary of analysis results by task ID
2335 | include_cross_analysis: Whether to include cross-analysis insights
2336 |
2337 | Returns:
2338 | Enhanced aggregated analysis results
2339 | """
2340 | try:
2341 | # Start with base aggregation from analysis engine
2342 | base_aggregation = self.analysis_engine.aggregate_analysis_results(results, include_cross_analysis)
2343 |
2344 | # Enhance with orchestrator-level insights
2345 | if include_cross_analysis:
2346 | cross_insights = self.generate_cross_analyzer_insights(results)
2347 | base_aggregation["cross_analysis_insights"] = cross_insights
2348 |
2349 | # Update optimization opportunities with cross-analyzer insights
2350 | insight_opportunities = []
2351 | for insight in cross_insights:
2352 | if insight.get("type") in ["cost_optimization_correlation", "combined_optimization_opportunity", "optimization_priority_ranking"]:
2353 | opportunity = {
2354 | "title": insight["title"],
2355 | "description": insight["description"],
2356 | "recommendation": insight["recommendation"],
2357 | "potential_savings": insight.get("metrics", {}).get("combined_savings") or insight.get("metrics", {}).get("potential_storage_savings", 0),
2358 | "priority": insight["priority"],
2359 | "source_analysis": "cross_analyzer_insight",
2360 | "analyses_involved": insight["analyses_involved"],
2361 | "confidence": insight.get("confidence", "medium"),
2362 | "insight_type": insight["type"]
2363 | }
2364 | insight_opportunities.append(opportunity)
2365 |
2366 | # Merge with existing opportunities
2367 | existing_opportunities = base_aggregation.get("optimization_opportunities", [])
2368 | all_opportunities = existing_opportunities + insight_opportunities
2369 |
2370 | # Sort by potential savings and priority
2371 | priority_weights = {"high": 3, "medium": 2, "low": 1}
2372 | all_opportunities.sort(
2373 | key=lambda x: (
2374 | priority_weights.get(x.get("priority", "medium"), 2),
2375 | x.get("potential_savings", 0)
2376 | ),
2377 | reverse=True
2378 | )
2379 |
2380 | base_aggregation["optimization_opportunities"] = all_opportunities
2381 |
2382 | # Add orchestrator-specific metadata
2383 | base_aggregation["orchestrator_metadata"] = {
2384 | "orchestrator_class": "S3OptimizationOrchestrator",
2385 | "session_id": self.session_id,
2386 | "region": self.region,
2387 | "performance_optimizations_enabled": True,
2388 | "cross_analyzer_insights_count": len(base_aggregation.get("cross_analysis_insights", [])),
2389 | "enhanced_aggregation": True,
2390 | "aggregation_timestamp": datetime.now().isoformat()
2391 | }
2392 |
2393 | # Add performance summary if available
2394 | if self.performance_monitor:
2395 | try:
2396 | performance_summary = self.get_performance_summary()
2397 | base_aggregation["performance_summary"] = performance_summary
2398 | except Exception as e:
2399 | logger.warning(f"Could not include performance summary: {str(e)}")
2400 |
2401 | return base_aggregation
2402 |
2403 | except Exception as e:
2404 | logger.error(f"Error in enhanced result aggregation: {str(e)}")
2405 | # Fall back to base aggregation
2406 | try:
2407 | return self.analysis_engine.aggregate_analysis_results(results, include_cross_analysis)
2408 | except Exception as fallback_error:
2409 | logger.error(f"Fallback aggregation also failed: {str(fallback_error)}")
2410 | return {
2411 | "status": "error",
2412 | "message": f"Result aggregation failed: {str(e)}",
2413 | "fallback_error": str(fallback_error),
2414 | "timestamp": datetime.now().isoformat()
2415 | }
2416 |
2417 | def get_engine_status(self) -> Dict[str, Any]:
2418 | """
2419 | Get status information about the analysis engine.
2420 |
2421 | Returns:
2422 | Dictionary containing engine status
2423 | """
2424 | return self.analysis_engine.get_engine_status()
2425 |
2426 | def cleanup_session(self):
2427 | """Clean up the orchestrator session."""
2428 | try:
2429 | if self.service_orchestrator:
2430 | self.service_orchestrator.cleanup_session()
2431 | if self.analysis_engine:
2432 | self.analysis_engine.cleanup()
2433 | logger.info("S3OptimizationOrchestrator session cleaned up")
2434 | except Exception as e:
2435 | logger.error(f"Error cleaning up S3OptimizationOrchestrator session: {str(e)}")
2436 |
2437 | def get_session_info(self) -> Dict[str, Any]:
2438 | """Get information about the current session."""
2439 | try:
2440 | session_info = self.service_orchestrator.get_session_info()
2441 | session_info["analysis_engine_status"] = self.analysis_engine.get_engine_status()
2442 | return session_info
2443 | except Exception as e:
2444 | logger.error(f"Error getting session info: {str(e)}")
2445 | return {"error": f"Failed to get session info: {str(e)}"}
2446 |
2447 | def get_orchestrator_status(self) -> Dict[str, Any]:
2448 | """
2449 | Get comprehensive status of the S3OptimizationOrchestrator including analyzer integration.
2450 |
2451 | Returns:
2452 | Dictionary containing complete orchestrator status
2453 | """
2454 | try:
2455 | status = {
2456 | "orchestrator_info": {
2457 | "class_name": "S3OptimizationOrchestrator",
2458 | "session_id": self.session_id,
2459 | "region": self.region,
2460 | "initialized_at": datetime.now().isoformat(),
2461 | "performance_optimizations_enabled": True
2462 | },
2463 | "analyzer_integration": {},
2464 | "service_integration": {},
2465 | "performance_components": {},
2466 | "session_integration": {},
2467 | "capabilities": {},
2468 | "health_status": "unknown"
2469 | }
2470 |
2471 | # Get analyzer integration status
2472 | try:
2473 | analyzer_registry_info = self.get_analyzer_registry()
2474 | status["analyzer_integration"] = {
2475 | "status": "active" if analyzer_registry_info.get("registry_status") == "active" else "inactive",
2476 | "total_analyzers": analyzer_registry_info.get("total_analyzers", 0),
2477 | "registered_analyzers": list(analyzer_registry_info.get("registered_analyzers", {}).keys()),
2478 | "analyzer_health": analyzer_registry_info.get("analyzer_health", {}),
2479 | "analysis_priorities_configured": len(self.analysis_engine.analysis_priorities) > 0
2480 | }
2481 | except Exception as e:
2482 | status["analyzer_integration"] = {"status": "error", "error": str(e)}
2483 |
2484 | # Get service integration status
2485 | try:
2486 | engine_status = self.analysis_engine.get_engine_status()
2487 | status["service_integration"] = {
2488 | "s3_service": engine_status.get("services_status", {}).get("s3_service", False),
2489 | "pricing_service": engine_status.get("services_status", {}).get("pricing_service", False),
2490 | "storage_lens_service": engine_status.get("services_status", {}).get("storage_lens_service", False),
2491 | "service_orchestrator": self.service_orchestrator is not None,
2492 | "analysis_engine": self.analysis_engine is not None
2493 | }
2494 | except Exception as e:
2495 | status["service_integration"] = {"status": "error", "error": str(e)}
2496 |
2497 | # Get performance components status
2498 | try:
2499 | status["performance_components"] = {
2500 | "performance_monitor": self.performance_monitor is not None,
2501 | "memory_manager": self.memory_manager is not None,
2502 | "timeout_handler": self.timeout_handler is not None,
2503 | "pricing_cache": self.pricing_cache is not None,
2504 | "bucket_metadata_cache": self.bucket_metadata_cache is not None,
2505 | "analysis_results_cache": self.analysis_results_cache is not None
2506 | }
2507 |
2508 | # Add cache statistics if available
2509 | if self.pricing_cache:
2510 | status["performance_components"]["pricing_cache_stats"] = self.pricing_cache.get_statistics()
2511 | if self.bucket_metadata_cache:
2512 | status["performance_components"]["bucket_cache_stats"] = self.bucket_metadata_cache.get_statistics()
2513 | if self.analysis_results_cache:
2514 | status["performance_components"]["results_cache_stats"] = self.analysis_results_cache.get_statistics()
2515 |
2516 | except Exception as e:
2517 | status["performance_components"] = {"status": "error", "error": str(e)}
2518 |
2519 | # Get session integration status
2520 | try:
2521 | session_info = self.service_orchestrator.get_session_info()
2522 | stored_tables = self.get_stored_tables()
2523 | s3_tables = [table for table in stored_tables if table.startswith('s3_')]
2524 |
2525 | status["session_integration"] = {
2526 | "session_active": session_info.get("error") is None,
2527 | "session_id": self.session_id,
2528 | "total_tables": len(stored_tables),
2529 | "s3_analysis_tables": len(s3_tables),
2530 | "session_manager_available": self.session_manager is not None,
2531 | "cross_analysis_ready": len(s3_tables) > 0
2532 | }
2533 | except Exception as e:
2534 | status["session_integration"] = {"status": "error", "error": str(e)}
2535 |
2536 | # Get capabilities
2537 | try:
2538 | available_analyses = self.analysis_engine.get_available_analyses()
2539 | status["capabilities"] = {
2540 | "total_analysis_types": len(available_analyses),
2541 | "available_analyses": [analysis["analysis_type"] for analysis in available_analyses],
2542 | "parallel_execution": True,
2543 | "cross_analyzer_insights": True,
2544 | "performance_optimizations": True,
2545 | "dynamic_analyzer_loading": True,
2546 | "comprehensive_error_handling": True,
2547 | "session_sql_integration": True,
2548 | "intelligent_caching": True,
2549 | "memory_management": True,
2550 | "progressive_timeouts": True
2551 | }
2552 | except Exception as e:
2553 | status["capabilities"] = {"status": "error", "error": str(e)}
2554 |
2555 | # Determine overall health status
2556 | try:
2557 | analyzer_ok = status["analyzer_integration"].get("status") == "active"
2558 | services_ok = all(status["service_integration"].values()) if isinstance(status["service_integration"], dict) else False
2559 | performance_ok = any(status["performance_components"].values()) if isinstance(status["performance_components"], dict) else False
2560 | session_ok = status["session_integration"].get("session_active", False)
2561 |
2562 | if analyzer_ok and services_ok and performance_ok and session_ok:
2563 | status["health_status"] = "healthy"
2564 | elif analyzer_ok and services_ok:
2565 | status["health_status"] = "functional"
2566 | else:
2567 | status["health_status"] = "degraded"
2568 |
2569 | except Exception as e:
2570 | status["health_status"] = "unknown"
2571 | status["health_check_error"] = str(e)
2572 |
2573 | return status
2574 |
2575 | except Exception as e:
2576 | logger.error(f"Error getting orchestrator status: {str(e)}")
2577 | return {
2578 | "orchestrator_info": {
2579 | "class_name": "S3OptimizationOrchestrator",
2580 | "session_id": self.session_id,
2581 | "region": self.region
2582 | },
2583 | "status": "error",
2584 | "error": str(e),
2585 | "timestamp": datetime.now().isoformat()
2586 | }
2587 |
2588 | def get_parallel_execution_status(self) -> Dict[str, Any]:
2589 | """
2590 | Get detailed status of parallel execution framework integration.
2591 |
2592 | Returns:
2593 | Dictionary containing parallel execution status and metrics
2594 | """
2595 | try:
2596 | # Get parallel executor status
2597 | parallel_executor = self.service_orchestrator.parallel_executor
2598 | executor_status = parallel_executor.get_status()
2599 |
2600 | # Get session manager status
2601 | session_info = self.session_manager.get_session_info(self.session_id)
2602 |
2603 | # Get analysis engine status
2604 | engine_status = self.analysis_engine.get_engine_status()
2605 |
2606 | # Get stored tables for cross-analysis
2607 | stored_tables = self.get_stored_tables()
2608 | s3_tables = [table for table in stored_tables if table.startswith('s3_')]
2609 |
2610 | integration_status = {
2611 | "integration_info": {
2612 | "orchestrator_class": "S3OptimizationOrchestrator",
2613 | "session_id": self.session_id,
2614 | "region": self.region,
2615 | "integration_complete": True,
2616 | "timestamp": datetime.now().isoformat()
2617 | },
2618 | "parallel_executor": {
2619 | "status": "active" if executor_status.get("executor_alive", False) else "inactive",
2620 | "max_workers": executor_status.get("max_workers", 0),
2621 | "active_tasks": executor_status.get("active_tasks", 0),
2622 | "completed_tasks": executor_status.get("completed_tasks", 0),
2623 | "status_breakdown": executor_status.get("status_breakdown", {})
2624 | },
2625 | "session_sql_integration": {
2626 | "session_active": session_info.get("error") is None,
2627 | "session_created_at": session_info.get("created_at"),
2628 | "session_last_accessed": session_info.get("last_accessed"),
2629 | "total_tables": len(stored_tables),
2630 | "s3_analysis_tables": len(s3_tables),
2631 | "stored_table_names": s3_tables[:10] # Show first 10 S3 tables
2632 | },
2633 | "analysis_engine": {
2634 | "registered_analyzers": len(engine_status.get("registered_analyzers", {})),
2635 | "analyzer_types": list(engine_status.get("registered_analyzers", {}).keys()),
2636 | "execution_history_count": len(engine_status.get("execution_history", [])),
2637 | "services_available": engine_status.get("services_status", {})
2638 | },
2639 | "task_prioritization": {
2640 | "prioritization_enabled": True,
2641 | "priority_factors": [
2642 | "cost_impact",
2643 | "execution_time_estimate",
2644 | "dependency_level",
2645 | "context_adjustments"
2646 | ],
2647 | "analysis_priorities": self.analysis_engine.analysis_priorities
2648 | },
2649 | "cross_analysis_capabilities": {
2650 | "cross_analysis_enabled": True,
2651 | "aggregation_queries_available": len(s3_tables) > 0,
2652 | "insight_generation_enabled": True,
2653 | "available_query_types": [
2654 | "recommendations_by_priority",
2655 | "top_optimization_opportunities",
2656 | "analysis_execution_summary",
2657 | "total_savings_by_analysis",
2658 | "cross_analysis_insights"
2659 | ] if s3_tables else []
2660 | },
2661 | "integration_health": {
2662 | "all_components_active": all([
2663 | executor_status.get("executor_alive", False),
2664 | session_info.get("error") is None,
2665 | len(engine_status.get("registered_analyzers", {})) > 0
2666 | ]),
2667 | "ready_for_analysis": True,
2668 | "estimated_capacity": executor_status.get("max_workers", 0),
2669 | "last_health_check": datetime.now().isoformat()
2670 | }
2671 | }
2672 |
2673 | return integration_status
2674 |
2675 | except Exception as e:
2676 | logger.error(f"Error getting parallel execution status: {str(e)}")
2677 | return {
2678 | "integration_info": {
2679 | "orchestrator_class": "S3OptimizationOrchestrator",
2680 | "session_id": self.session_id,
2681 | "integration_complete": False,
2682 | "error": str(e),
2683 | "timestamp": datetime.now().isoformat()
2684 | },
2685 | "error": f"Failed to get parallel execution status: {str(e)}"
2686 | }
2687 |
2688 | def validate_analyzer_integration(self) -> Dict[str, Any]:
2689 | """
2690 | Validate that all analyzers are properly integrated and functional.
2691 |
2692 | Returns:
2693 | Dictionary containing analyzer integration validation results
2694 | """
2695 | validation_results = {
2696 | "validation_timestamp": datetime.now().isoformat(),
2697 | "overall_status": "unknown",
2698 | "analyzer_validations": {},
2699 | "integration_tests": {},
2700 | "recommendations": []
2701 | }
2702 |
2703 | try:
2704 | # Validate each registered analyzer
2705 | for analysis_type in self.analysis_engine.analyzer_registry.list_analyzers():
2706 | analyzer = self.analysis_engine.analyzer_registry.get(analysis_type)
2707 |
2708 | analyzer_validation = {
2709 | "status": "unknown",
2710 | "checks": {},
2711 | "errors": [],
2712 | "warnings": []
2713 | }
2714 |
2715 | # Check 1: Analyzer instance validity
2716 | try:
2717 | if analyzer and hasattr(analyzer, 'analyze') and hasattr(analyzer, 'get_recommendations'):
2718 | analyzer_validation["checks"]["instance_valid"] = True
2719 | else:
2720 | analyzer_validation["checks"]["instance_valid"] = False
2721 | analyzer_validation["errors"].append("Analyzer missing required methods")
2722 | except Exception as e:
2723 | analyzer_validation["checks"]["instance_valid"] = False
2724 | analyzer_validation["errors"].append(f"Instance validation error: {str(e)}")
2725 |
2726 | # Check 2: Service dependencies
2727 | try:
2728 | services_available = {
2729 | "s3_service": analyzer.s3_service is not None,
2730 | "pricing_service": analyzer.pricing_service is not None,
2731 | "storage_lens_service": analyzer.storage_lens_service is not None
2732 | }
2733 | analyzer_validation["checks"]["services"] = services_available
2734 |
2735 | missing_services = [service for service, available in services_available.items() if not available]
2736 | if missing_services:
2737 | analyzer_validation["warnings"].append(f"Missing services: {', '.join(missing_services)}")
2738 | except Exception as e:
2739 | analyzer_validation["checks"]["services"] = {}
2740 | analyzer_validation["errors"].append(f"Service check error: {str(e)}")
2741 |
2742 | # Check 3: Performance optimization components
2743 | try:
2744 | performance_components = {
2745 | "performance_monitor": hasattr(analyzer, 'performance_monitor') and analyzer.performance_monitor is not None,
2746 | "memory_manager": hasattr(analyzer, 'memory_manager') and analyzer.memory_manager is not None
2747 | }
2748 | analyzer_validation["checks"]["performance_components"] = performance_components
2749 |
2750 | missing_components = [comp for comp, available in performance_components.items() if not available]
2751 | if missing_components:
2752 | analyzer_validation["warnings"].append(f"Missing performance components: {', '.join(missing_components)}")
2753 | except Exception as e:
2754 | analyzer_validation["checks"]["performance_components"] = {}
2755 | analyzer_validation["errors"].append(f"Performance component check error: {str(e)}")
2756 |
2757 | # Check 4: Priority configuration
2758 | try:
2759 | priority_info = self.analysis_engine.analysis_priorities.get(analysis_type, {})
2760 | analyzer_validation["checks"]["priority_configured"] = bool(priority_info)
2761 |
2762 | if not priority_info:
2763 | analyzer_validation["warnings"].append("No priority configuration found")
2764 | else:
2765 | required_fields = ["priority", "cost_impact", "execution_time_estimate"]
2766 | missing_fields = [field for field in required_fields if field not in priority_info]
2767 | if missing_fields:
2768 | analyzer_validation["warnings"].append(f"Missing priority fields: {', '.join(missing_fields)}")
2769 | except Exception as e:
2770 | analyzer_validation["checks"]["priority_configured"] = False
2771 | analyzer_validation["errors"].append(f"Priority check error: {str(e)}")
2772 |
2773 | # Check 5: Parameter validation capability
2774 | try:
2775 | test_params = {"region": "us-east-1", "lookback_days": 30}
2776 | validation_result = analyzer.validate_parameters(**test_params)
2777 | analyzer_validation["checks"]["parameter_validation"] = validation_result.get("valid", False)
2778 |
2779 | if not validation_result.get("valid", False):
2780 | analyzer_validation["errors"].append("Parameter validation failed")
2781 | except Exception as e:
2782 | analyzer_validation["checks"]["parameter_validation"] = False
2783 | analyzer_validation["errors"].append(f"Parameter validation check error: {str(e)}")
2784 |
2785 | # Determine analyzer status
2786 | if analyzer_validation["errors"]:
2787 | analyzer_validation["status"] = "invalid"
2788 | elif analyzer_validation["warnings"]:
2789 | analyzer_validation["status"] = "partial"
2790 | else:
2791 | analyzer_validation["status"] = "valid"
2792 |
2793 | validation_results["analyzer_validations"][analysis_type] = analyzer_validation
2794 |
2795 | # Run integration tests
2796 | validation_results["integration_tests"] = self._run_analyzer_integration_tests()
2797 |
2798 | # Determine overall status
2799 | analyzer_statuses = [val["status"] for val in validation_results["analyzer_validations"].values()]
2800 |
2801 | if all(status == "valid" for status in analyzer_statuses):
2802 | if validation_results["integration_tests"].get("all_tests_passed", False):
2803 | validation_results["overall_status"] = "valid"
2804 | else:
2805 | validation_results["overall_status"] = "partial"
2806 | validation_results["recommendations"].append("Some integration tests failed")
2807 | elif any(status == "invalid" for status in analyzer_statuses):
2808 | validation_results["overall_status"] = "invalid"
2809 | validation_results["recommendations"].append("Critical analyzer validation failures detected")
2810 | else:
2811 | validation_results["overall_status"] = "partial"
2812 | validation_results["recommendations"].append("Some analyzers have warnings or missing components")
2813 |
2814 | # Add specific recommendations
2815 | for analysis_type, analyzer_val in validation_results["analyzer_validations"].items():
2816 | if analyzer_val["status"] == "invalid":
2817 | validation_results["recommendations"].append(f"Fix critical issues with {analysis_type} analyzer")
2818 | elif analyzer_val["warnings"]:
2819 | validation_results["recommendations"].append(f"Address warnings for {analysis_type} analyzer")
2820 |
2821 | return validation_results
2822 |
2823 | except Exception as e:
2824 | logger.error(f"Error during analyzer integration validation: {str(e)}")
2825 | validation_results["overall_status"] = "error"
2826 | validation_results["error"] = str(e)
2827 | return validation_results
2828 |
2829 | def _run_analyzer_integration_tests(self) -> Dict[str, Any]:
2830 | """
2831 | Run integration tests for analyzer functionality.
2832 |
2833 | Returns:
2834 | Dictionary containing integration test results
2835 | """
2836 | test_results = {
2837 | "tests_run": 0,
2838 | "tests_passed": 0,
2839 | "tests_failed": 0,
2840 | "test_details": {},
2841 | "all_tests_passed": False
2842 | }
2843 |
2844 | try:
2845 | # Test 1: Analyzer registry functionality
2846 | test_results["tests_run"] += 1
2847 | try:
2848 | analyzer_list = self.analysis_engine.analyzer_registry.list_analyzers()
2849 | if len(analyzer_list) > 0:
2850 | test_results["tests_passed"] += 1
2851 | test_results["test_details"]["registry_functionality"] = {
2852 | "status": "passed",
2853 | "message": f"Registry contains {len(analyzer_list)} analyzers"
2854 | }
2855 | else:
2856 | test_results["tests_failed"] += 1
2857 | test_results["test_details"]["registry_functionality"] = {
2858 | "status": "failed",
2859 | "message": "No analyzers registered"
2860 | }
2861 | except Exception as e:
2862 | test_results["tests_failed"] += 1
2863 | test_results["test_details"]["registry_functionality"] = {
2864 | "status": "failed",
2865 | "message": f"Registry test error: {str(e)}"
2866 | }
2867 |
2868 | # Test 2: Priority system functionality
2869 | test_results["tests_run"] += 1
2870 | try:
2871 | available_analyses = self.analysis_engine.get_available_analyses()
2872 | if len(available_analyses) > 0 and all("priority" in analysis for analysis in available_analyses):
2873 | test_results["tests_passed"] += 1
2874 | test_results["test_details"]["priority_system"] = {
2875 | "status": "passed",
2876 | "message": f"Priority system working for {len(available_analyses)} analyses"
2877 | }
2878 | else:
2879 | test_results["tests_failed"] += 1
2880 | test_results["test_details"]["priority_system"] = {
2881 | "status": "failed",
2882 | "message": "Priority system not properly configured"
2883 | }
2884 | except Exception as e:
2885 | test_results["tests_failed"] += 1
2886 | test_results["test_details"]["priority_system"] = {
2887 | "status": "failed",
2888 | "message": f"Priority system test error: {str(e)}"
2889 | }
2890 |
2891 | # Test 3: Task creation functionality
2892 | test_results["tests_run"] += 1
2893 | try:
2894 | analyzer_types = self.analysis_engine.analyzer_registry.list_analyzers()
2895 | if analyzer_types:
2896 | tasks = self.analysis_engine.create_parallel_analysis_tasks(
2897 | analyzer_types[:2], # Test with first 2 analyzers
2898 | region="us-east-1",
2899 | lookback_days=7
2900 | )
2901 | if len(tasks) > 0:
2902 | test_results["tests_passed"] += 1
2903 | test_results["test_details"]["task_creation"] = {
2904 | "status": "passed",
2905 | "message": f"Successfully created {len(tasks)} parallel tasks"
2906 | }
2907 | else:
2908 | test_results["tests_failed"] += 1
2909 | test_results["test_details"]["task_creation"] = {
2910 | "status": "failed",
2911 | "message": "No tasks created"
2912 | }
2913 | else:
2914 | test_results["tests_failed"] += 1
2915 | test_results["test_details"]["task_creation"] = {
2916 | "status": "failed",
2917 | "message": "No analyzers available for task creation"
2918 | }
2919 | except Exception as e:
2920 | test_results["tests_failed"] += 1
2921 | test_results["test_details"]["task_creation"] = {
2922 | "status": "failed",
2923 | "message": f"Task creation test error: {str(e)}"
2924 | }
2925 |
2926 | # Test 4: Cross-analyzer insight generation (mock test)
2927 | test_results["tests_run"] += 1
2928 | try:
2929 | # Create mock results for testing
2930 | mock_results = {
2931 | "task1": {
2932 | "status": "success",
2933 | "analysis_type": "general_spend",
2934 | "data": {"total_storage_cost": 1000},
2935 | "recommendations": [{"potential_savings": 100}]
2936 | },
2937 | "task2": {
2938 | "status": "success",
2939 | "analysis_type": "storage_class",
2940 | "data": {"optimization_opportunities": 5},
2941 | "recommendations": [{"potential_savings": 200}]
2942 | }
2943 | }
2944 |
2945 | insights = self.generate_cross_analyzer_insights(mock_results)
2946 | if len(insights) > 0:
2947 | test_results["tests_passed"] += 1
2948 | test_results["test_details"]["cross_analyzer_insights"] = {
2949 | "status": "passed",
2950 | "message": f"Generated {len(insights)} cross-analyzer insights"
2951 | }
2952 | else:
2953 | test_results["tests_failed"] += 1
2954 | test_results["test_details"]["cross_analyzer_insights"] = {
2955 | "status": "failed",
2956 | "message": "No cross-analyzer insights generated"
2957 | }
2958 | except Exception as e:
2959 | test_results["tests_failed"] += 1
2960 | test_results["test_details"]["cross_analyzer_insights"] = {
2961 | "status": "failed",
2962 | "message": f"Cross-analyzer insights test error: {str(e)}"
2963 | }
2964 |
2965 | # Determine overall test status
2966 | test_results["all_tests_passed"] = test_results["tests_failed"] == 0
2967 |
2968 | return test_results
2969 |
2970 | except Exception as e:
2971 | logger.error(f"Error running analyzer integration tests: {str(e)}")
2972 | return {
2973 | "tests_run": 0,
2974 | "tests_passed": 0,
2975 | "tests_failed": 1,
2976 | "test_details": {"error": {"status": "failed", "message": str(e)}},
2977 | "all_tests_passed": False
2978 | }
2979 |
2980 | def validate_integration(self) -> Dict[str, Any]:
2981 | """
2982 | Validate that all components of the parallel execution framework are properly integrated.
2983 |
2984 | Returns:
2985 | Dictionary containing validation results
2986 | """
2987 | validation_results = {
2988 | "validation_timestamp": datetime.now().isoformat(),
2989 | "overall_status": "unknown",
2990 | "component_validations": {},
2991 | "integration_tests": {},
2992 | "recommendations": []
2993 | }
2994 |
2995 | try:
2996 | # Validate ServiceOrchestrator integration
2997 | try:
2998 | orchestrator_info = self.service_orchestrator.get_session_info()
2999 | validation_results["component_validations"]["service_orchestrator"] = {
3000 | "status": "valid",
3001 | "session_active": True,
3002 | "message": "ServiceOrchestrator integration successful"
3003 | }
3004 | except Exception as e:
3005 | validation_results["component_validations"]["service_orchestrator"] = {
3006 | "status": "invalid",
3007 | "error": str(e),
3008 | "message": "ServiceOrchestrator integration failed"
3009 | }
3010 | validation_results["recommendations"].append("Check ServiceOrchestrator initialization")
3011 |
3012 | # Validate ParallelExecutor integration
3013 | try:
3014 | executor_status = self.service_orchestrator.parallel_executor.get_status()
3015 | if executor_status.get("executor_alive", False):
3016 | validation_results["component_validations"]["parallel_executor"] = {
3017 | "status": "valid",
3018 | "max_workers": executor_status.get("max_workers", 0),
3019 | "message": "ParallelExecutor integration successful"
3020 | }
3021 | else:
3022 | validation_results["component_validations"]["parallel_executor"] = {
3023 | "status": "invalid",
3024 | "message": "ParallelExecutor is not active"
3025 | }
3026 | validation_results["recommendations"].append("Restart ParallelExecutor")
3027 | except Exception as e:
3028 | validation_results["component_validations"]["parallel_executor"] = {
3029 | "status": "invalid",
3030 | "error": str(e),
3031 | "message": "ParallelExecutor integration failed"
3032 | }
3033 | validation_results["recommendations"].append("Check ParallelExecutor configuration")
3034 |
3035 | # Validate SessionManager integration
3036 | try:
3037 | session_info = self.session_manager.get_session_info(self.session_id)
3038 | if session_info.get("error") is None:
3039 | validation_results["component_validations"]["session_manager"] = {
3040 | "status": "valid",
3041 | "session_id": self.session_id,
3042 | "message": "SessionManager integration successful"
3043 | }
3044 | else:
3045 | validation_results["component_validations"]["session_manager"] = {
3046 | "status": "invalid",
3047 | "error": session_info.get("error"),
3048 | "message": "SessionManager session invalid"
3049 | }
3050 | validation_results["recommendations"].append("Recreate session or check SessionManager")
3051 | except Exception as e:
3052 | validation_results["component_validations"]["session_manager"] = {
3053 | "status": "invalid",
3054 | "error": str(e),
3055 | "message": "SessionManager integration failed"
3056 | }
3057 | validation_results["recommendations"].append("Check SessionManager initialization")
3058 |
3059 | # Validate S3AnalysisEngine integration
3060 | try:
3061 | engine_status = self.analysis_engine.get_engine_status()
3062 | analyzer_count = len(engine_status.get("registered_analyzers", {}))
3063 | if analyzer_count > 0:
3064 | validation_results["component_validations"]["analysis_engine"] = {
3065 | "status": "valid",
3066 | "registered_analyzers": analyzer_count,
3067 | "message": "S3AnalysisEngine integration successful"
3068 | }
3069 | else:
3070 | validation_results["component_validations"]["analysis_engine"] = {
3071 | "status": "invalid",
3072 | "message": "No analyzers registered in S3AnalysisEngine"
3073 | }
3074 | validation_results["recommendations"].append("Check analyzer registration")
3075 | except Exception as e:
3076 | validation_results["component_validations"]["analysis_engine"] = {
3077 | "status": "invalid",
3078 | "error": str(e),
3079 | "message": "S3AnalysisEngine integration failed"
3080 | }
3081 | validation_results["recommendations"].append("Check S3AnalysisEngine initialization")
3082 |
3083 | # Run integration tests
3084 | validation_results["integration_tests"] = self._run_integration_tests()
3085 |
3086 | # Determine overall status
3087 | all_valid = all(
3088 | comp.get("status") == "valid"
3089 | for comp in validation_results["component_validations"].values()
3090 | )
3091 |
3092 | if all_valid and validation_results["integration_tests"].get("all_tests_passed", False):
3093 | validation_results["overall_status"] = "valid"
3094 | elif all_valid:
3095 | validation_results["overall_status"] = "partial"
3096 | validation_results["recommendations"].append("Some integration tests failed")
3097 | else:
3098 | validation_results["overall_status"] = "invalid"
3099 | validation_results["recommendations"].append("Critical component validation failures")
3100 |
3101 | return validation_results
3102 |
3103 | except Exception as e:
3104 | logger.error(f"Error during integration validation: {str(e)}")
3105 | validation_results["overall_status"] = "error"
3106 | validation_results["error"] = str(e)
3107 | return validation_results
3108 |
3109 | def _run_integration_tests(self) -> Dict[str, Any]:
3110 | """
3111 | Run integration tests to verify parallel execution framework functionality.
3112 |
3113 | Returns:
3114 | Dictionary containing test results
3115 | """
3116 | test_results = {
3117 | "test_timestamp": datetime.now().isoformat(),
3118 | "tests_run": 0,
3119 | "tests_passed": 0,
3120 | "tests_failed": 0,
3121 | "test_details": {},
3122 | "all_tests_passed": False
3123 | }
3124 |
3125 | try:
3126 | # Test 1: Session data storage and retrieval
3127 | test_results["tests_run"] += 1
3128 | try:
3129 | test_data = [{"test_key": "test_value", "timestamp": datetime.now().isoformat()}]
3130 | success = self.session_manager.store_data(self.session_id, "integration_test", test_data)
3131 |
3132 | if success:
3133 | query_result = self.service_orchestrator.query_session_data(
3134 | "SELECT * FROM integration_test LIMIT 1"
3135 | )
3136 | if query_result and len(query_result) > 0:
3137 | test_results["tests_passed"] += 1
3138 | test_results["test_details"]["session_storage"] = {
3139 | "status": "passed",
3140 | "message": "Session storage and retrieval working"
3141 | }
3142 | else:
3143 | test_results["tests_failed"] += 1
3144 | test_results["test_details"]["session_storage"] = {
3145 | "status": "failed",
3146 | "message": "Data stored but query failed"
3147 | }
3148 | else:
3149 | test_results["tests_failed"] += 1
3150 | test_results["test_details"]["session_storage"] = {
3151 | "status": "failed",
3152 | "message": "Failed to store test data"
3153 | }
3154 | except Exception as e:
3155 | test_results["tests_failed"] += 1
3156 | test_results["test_details"]["session_storage"] = {
3157 | "status": "failed",
3158 | "error": str(e)
3159 | }
3160 |
3161 | # Test 2: Analyzer registry functionality
3162 | test_results["tests_run"] += 1
3163 | try:
3164 | available_analyzers = self.analysis_engine.analyzer_registry.list_analyzers()
3165 | if len(available_analyzers) >= 6: # Should have all 6 S3 analyzers
3166 | test_results["tests_passed"] += 1
3167 | test_results["test_details"]["analyzer_registry"] = {
3168 | "status": "passed",
3169 | "analyzer_count": len(available_analyzers),
3170 | "analyzers": available_analyzers
3171 | }
3172 | else:
3173 | test_results["tests_failed"] += 1
3174 | test_results["test_details"]["analyzer_registry"] = {
3175 | "status": "failed",
3176 | "message": f"Expected 6 analyzers, found {len(available_analyzers)}"
3177 | }
3178 | except Exception as e:
3179 | test_results["tests_failed"] += 1
3180 | test_results["test_details"]["analyzer_registry"] = {
3181 | "status": "failed",
3182 | "error": str(e)
3183 | }
3184 |
3185 | # Test 3: Task prioritization
3186 | test_results["tests_run"] += 1
3187 | try:
3188 | test_analyses = ["general_spend", "storage_class", "governance"]
3189 | priority_order = self.analysis_engine._calculate_advanced_task_priority(test_analyses)
3190 |
3191 | if len(priority_order) == len(test_analyses) and all("priority_score" in item for item in priority_order):
3192 | test_results["tests_passed"] += 1
3193 | test_results["test_details"]["task_prioritization"] = {
3194 | "status": "passed",
3195 | "priority_order": [item["analysis_type"] for item in priority_order]
3196 | }
3197 | else:
3198 | test_results["tests_failed"] += 1
3199 | test_results["test_details"]["task_prioritization"] = {
3200 | "status": "failed",
3201 | "message": "Task prioritization failed"
3202 | }
3203 | except Exception as e:
3204 | test_results["tests_failed"] += 1
3205 | test_results["test_details"]["task_prioritization"] = {
3206 | "status": "failed",
3207 | "error": str(e)
3208 | }
3209 |
3210 | # Determine overall test status
3211 | test_results["all_tests_passed"] = test_results["tests_failed"] == 0
3212 |
3213 | return test_results
3214 |
3215 | except Exception as e:
3216 | logger.error(f"Error running integration tests: {str(e)}")
3217 | test_results["error"] = str(e)
3218 | return test_results
3219 | # =============================================================================
3220 | # MCP Wrapper Functions
3221 | # =============================================================================
3222 |
3223 | async def run_s3_comprehensive_analysis(arguments: Dict[str, Any]) -> List[Any]:
3224 | """Run comprehensive S3 cost optimization analysis."""
3225 | try:
3226 | region = arguments.get('region')
3227 | orchestrator = S3OptimizationOrchestrator(region=region)
3228 |
3229 | # Execute comprehensive analysis
3230 | result = await orchestrator.execute_comprehensive_analysis(**arguments)
3231 |
3232 | # Add documentation links
3233 | result = add_documentation_links(result, "s3")
3234 |
3235 | return [{
3236 | "type": "text",
3237 | "text": json.dumps(result, indent=2, default=str)
3238 | }]
3239 |
3240 | except Exception as e:
3241 | return [{
3242 | "type": "text",
3243 | "text": json.dumps({
3244 | "status": "error",
3245 | "error_code": getattr(e, 'code', 'UnknownError'),
3246 | "message": str(e),
3247 | "context": "comprehensive_analysis"
3248 | }, indent=2)
3249 | }]
3250 |
3251 | async def run_s3_general_spend_analysis(arguments: Dict[str, Any]) -> List[Any]:
3252 | """Analyze overall S3 spending patterns and usage."""
3253 | try:
3254 | region = arguments.get('region')
3255 | orchestrator = S3OptimizationOrchestrator(region=region)
3256 |
3257 | # Execute general spend analysis
3258 | result = await orchestrator.execute_analysis("general_spend", **arguments)
3259 |
3260 | # Add documentation links
3261 | result = add_documentation_links(result, "s3")
3262 |
3263 | return [{
3264 | "type": "text",
3265 | "text": json.dumps(result, indent=2, default=str)
3266 | }]
3267 |
3268 | except Exception as e:
3269 | return [{
3270 | "type": "text",
3271 | "text": json.dumps({
3272 | "status": "error",
3273 | "error_code": getattr(e, 'code', 'UnknownError'),
3274 | "message": str(e),
3275 | "context": "run_s3_general_spend_analysis"
3276 | }, indent=2)
3277 | }]
3278 |
3279 | async def run_s3_storage_class_selection(arguments: Dict[str, Any]) -> List[Any]:
3280 | """Provide guidance on choosing the most cost-effective storage class."""
3281 | try:
3282 | region = arguments.get('region')
3283 | orchestrator = S3OptimizationOrchestrator(region=region)
3284 |
3285 | # Execute storage class analysis (covers both selection and validation)
3286 | result = await orchestrator.execute_analysis("storage_class", **arguments)
3287 |
3288 | # Add documentation links
3289 | result = add_documentation_links(result, "s3")
3290 |
3291 | return [{
3292 | "type": "text",
3293 | "text": json.dumps(result, indent=2, default=str)
3294 | }]
3295 |
3296 | except Exception as e:
3297 | return [{
3298 | "type": "text",
3299 | "text": json.dumps({
3300 | "status": "error",
3301 | "error_code": getattr(e, 'code', 'UnknownError'),
3302 | "message": str(e),
3303 | "context": "storage_class_selection"
3304 | }, indent=2)
3305 | }]
3306 |
3307 | async def run_s3_storage_class_validation(arguments: Dict[str, Any]) -> List[Any]:
3308 | """Validate that existing data is stored in the most appropriate storage class."""
3309 | try:
3310 | region = arguments.get('region')
3311 | orchestrator = S3OptimizationOrchestrator(region=region)
3312 |
3313 | # Execute storage class analysis (covers both selection and validation)
3314 | result = await orchestrator.execute_analysis("storage_class", **arguments)
3315 |
3316 | # Add documentation links
3317 | result = add_documentation_links(result, "s3")
3318 |
3319 | return [{
3320 | "type": "text",
3321 | "text": json.dumps(result, indent=2, default=str)
3322 | }]
3323 |
3324 | except Exception as e:
3325 | return [{
3326 | "type": "text",
3327 | "text": json.dumps({
3328 | "status": "error",
3329 | "error_code": getattr(e, 'code', 'UnknownError'),
3330 | "message": str(e),
3331 | "context": "storage_class_validation"
3332 | }, indent=2)
3333 | }]
3334 |
3335 | async def run_s3_archive_optimization(arguments: Dict[str, Any]) -> List[Any]:
3336 | """Identify and optimize long-term archive data storage."""
3337 | try:
3338 | region = arguments.get('region')
3339 | orchestrator = S3OptimizationOrchestrator(region=region)
3340 |
3341 | # Execute archive optimization
3342 | result = await orchestrator.execute_analysis("archive_optimization", **arguments)
3343 |
3344 | # Add documentation links
3345 | result = add_documentation_links(result, "s3")
3346 |
3347 | return [{
3348 | "type": "text",
3349 | "text": json.dumps(result, indent=2, default=str)
3350 | }]
3351 |
3352 | except Exception as e:
3353 | return [{
3354 | "type": "text",
3355 | "text": json.dumps({
3356 | "status": "error",
3357 | "error_code": getattr(e, 'code', 'UnknownError'),
3358 | "message": str(e),
3359 | "context": "archive_optimization"
3360 | }, indent=2)
3361 | }]
3362 |
3363 | async def run_s3_api_cost_minimization(arguments: Dict[str, Any]) -> List[Any]:
3364 | """Minimize S3 API request charges through access pattern optimization."""
3365 | try:
3366 | region = arguments.get('region')
3367 | orchestrator = S3OptimizationOrchestrator(region=region)
3368 |
3369 | # Execute API cost analysis
3370 | result = await orchestrator.execute_analysis("api_cost", **arguments)
3371 |
3372 | # Add documentation links
3373 | result = add_documentation_links(result, "s3")
3374 |
3375 | return [{
3376 | "type": "text",
3377 | "text": json.dumps(result, indent=2, default=str)
3378 | }]
3379 |
3380 | except Exception as e:
3381 | return [{
3382 | "type": "text",
3383 | "text": json.dumps({
3384 | "status": "error",
3385 | "error_code": getattr(e, 'code', 'UnknownError'),
3386 | "message": str(e),
3387 | "context": "api_cost_minimization"
3388 | }, indent=2)
3389 | }]
3390 |
3391 | async def run_s3_multipart_cleanup(arguments: Dict[str, Any]) -> List[Any]:
3392 | """Identify and clean up incomplete multipart uploads."""
3393 | try:
3394 | region = arguments.get('region')
3395 | orchestrator = S3OptimizationOrchestrator(region=region)
3396 |
3397 | # Execute multipart cleanup analysis
3398 | result = await orchestrator.execute_analysis("multipart_cleanup", **arguments)
3399 |
3400 | # Add documentation links
3401 | result = add_documentation_links(result, "s3")
3402 |
3403 | return [{
3404 | "type": "text",
3405 | "text": json.dumps(result, indent=2, default=str)
3406 | }]
3407 |
3408 | except Exception as e:
3409 | return [{
3410 | "type": "text",
3411 | "text": json.dumps({
3412 | "status": "error",
3413 | "error_code": getattr(e, 'code', 'UnknownError'),
3414 | "message": str(e),
3415 | "context": "multipart_cleanup"
3416 | }, indent=2)
3417 | }]
3418 |
3419 | async def run_s3_governance_check(arguments: Dict[str, Any]) -> List[Any]:
3420 | """Implement S3 cost controls and governance policy compliance checking."""
3421 | try:
3422 | region = arguments.get('region')
3423 | orchestrator = S3OptimizationOrchestrator(region=region)
3424 |
3425 | # Execute governance check
3426 | result = await orchestrator.execute_analysis("governance", **arguments)
3427 |
3428 | # Add documentation links
3429 | result = add_documentation_links(result, "s3")
3430 |
3431 | return [{
3432 | "type": "text",
3433 | "text": json.dumps(result, indent=2, default=str)
3434 | }]
3435 |
3436 | except Exception as e:
3437 | return [{
3438 | "type": "text",
3439 | "text": json.dumps({
3440 | "status": "error",
3441 | "error_code": getattr(e, 'code', 'UnknownError'),
3442 | "message": str(e),
3443 | "context": "run_s3_governance_check"
3444 | }, indent=2)
3445 | }]
3446 |
3447 | async def run_s3_comprehensive_optimization_tool(arguments: Dict[str, Any]) -> List[Any]:
3448 | """Run comprehensive S3 optimization with unified tool."""
3449 | try:
3450 | from .s3_comprehensive_optimization_tool import S3ComprehensiveOptimizationTool
3451 |
3452 | region = arguments.get('region')
3453 | tool = S3ComprehensiveOptimizationTool(region=region)
3454 |
3455 | # Execute comprehensive optimization
3456 | result = await tool.execute_comprehensive_optimization(**arguments)
3457 |
3458 | # Add documentation links
3459 | result = add_documentation_links(result, "s3")
3460 |
3461 | return [{
3462 | "type": "text",
3463 | "text": json.dumps(result, indent=2, default=str)
3464 | }]
3465 |
3466 | except Exception as e:
3467 | return [{
3468 | "type": "text",
3469 | "text": json.dumps({
3470 | "status": "error",
3471 | "error_code": getattr(e, 'code', 'UnknownError'),
3472 | "message": str(e),
3473 | "context": "run_s3_comprehensive_optimization_tool.execution"
3474 | }, indent=2)
3475 | }]
3476 |
3477 | async def run_s3_quick_analysis(arguments: Dict[str, Any]) -> List[Any]:
3478 | """Run a quick S3 analysis focusing on the most impactful optimizations."""
3479 | try:
3480 | region = arguments.get('region')
3481 | orchestrator = S3OptimizationOrchestrator(region=region)
3482 |
3483 | # Execute a subset of high-impact analyses with short timeout
3484 | quick_analyses = ["general_spend", "multipart_cleanup", "governance"]
3485 |
3486 | results = {}
3487 | for analysis_type in quick_analyses:
3488 | try:
3489 | result = await orchestrator.execute_analysis(
3490 | analysis_type,
3491 | timeout_seconds=10, # Quick timeout
3492 | **arguments
3493 | )
3494 | results[analysis_type] = result
3495 | except Exception as e:
3496 | results[analysis_type] = {
3497 | "status": "error",
3498 | "message": str(e)
3499 | }
3500 |
3501 | # Aggregate quick results
3502 | quick_result = {
3503 | "status": "success",
3504 | "analysis_type": "quick_analysis",
3505 | "results": results,
3506 | "message": f"Quick analysis completed for {len(quick_analyses)} analyses"
3507 | }
3508 |
3509 | # Add documentation links
3510 | quick_result = add_documentation_links(quick_result, "s3")
3511 |
3512 | return [{
3513 | "type": "text",
3514 | "text": json.dumps(quick_result, indent=2, default=str)
3515 | }]
3516 |
3517 | except Exception as e:
3518 | return [{
3519 | "type": "text",
3520 | "text": json.dumps({
3521 | "status": "error",
3522 | "error_code": getattr(e, 'code', 'UnknownError'),
3523 | "message": str(e),
3524 | "context": "quick_analysis"
3525 | }, indent=2)
3526 | }]
3527 |
3528 | async def run_s3_bucket_analysis(arguments: Dict[str, Any]) -> List[Any]:
3529 | """Analyze specific S3 buckets for optimization opportunities."""
3530 | try:
3531 | region = arguments.get('region')
3532 | bucket_names = arguments.get('bucket_names', [])
3533 |
3534 | if not bucket_names:
3535 | return [{
3536 | "type": "text",
3537 | "text": json.dumps({
3538 | "status": "error",
3539 | "message": "bucket_names parameter is required",
3540 | "context": "bucket_analysis"
3541 | }, indent=2)
3542 | }]
3543 |
3544 | orchestrator = S3OptimizationOrchestrator(region=region)
3545 |
3546 | # Execute comprehensive analysis for specific buckets
3547 | result = await orchestrator.execute_comprehensive_analysis(**arguments)
3548 |
3549 | # Add documentation links
3550 | result = add_documentation_links(result, "s3")
3551 |
3552 | return [{
3553 | "type": "text",
3554 | "text": json.dumps(result, indent=2, default=str)
3555 | }]
3556 |
3557 | except Exception as e:
3558 | return [{
3559 | "type": "text",
3560 | "text": json.dumps({
3561 | "status": "error",
3562 | "error_code": getattr(e, 'code', 'UnknownError'),
3563 | "message": str(e),
3564 | "context": "bucket_analysis"
3565 | }, indent=2)
3566 | }]
```