This is page 3 of 14. Use http://codebase.md/aws-samples/sample-cfm-tips-mcp?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .gitignore
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── diagnose_cost_optimization_hub_v2.py
├── LICENSE
├── logging_config.py
├── mcp_runbooks.json
├── mcp_server_with_runbooks.py
├── playbooks
│ ├── __init__.py
│ ├── aws_lambda
│ │ ├── __init__.py
│ │ └── lambda_optimization.py
│ ├── cloudtrail
│ │ ├── __init__.py
│ │ └── cloudtrail_optimization.py
│ ├── cloudtrail_optimization.py
│ ├── cloudwatch
│ │ ├── __init__.py
│ │ ├── aggregation_queries.py
│ │ ├── alarms_and_dashboards_analyzer.py
│ │ ├── analysis_engine.py
│ │ ├── base_analyzer.py
│ │ ├── cloudwatch_optimization_analyzer.py
│ │ ├── cloudwatch_optimization_tool.py
│ │ ├── cloudwatch_optimization.py
│ │ ├── cost_controller.py
│ │ ├── general_spend_analyzer.py
│ │ ├── logs_optimization_analyzer.py
│ │ ├── metrics_optimization_analyzer.py
│ │ ├── optimization_orchestrator.py
│ │ └── result_processor.py
│ ├── comprehensive_optimization.py
│ ├── ebs
│ │ ├── __init__.py
│ │ └── ebs_optimization.py
│ ├── ebs_optimization.py
│ ├── ec2
│ │ ├── __init__.py
│ │ └── ec2_optimization.py
│ ├── ec2_optimization.py
│ ├── lambda_optimization.py
│ ├── rds
│ │ ├── __init__.py
│ │ └── rds_optimization.py
│ ├── rds_optimization.py
│ └── s3
│ ├── __init__.py
│ ├── analyzers
│ │ ├── __init__.py
│ │ ├── api_cost_analyzer.py
│ │ ├── archive_optimization_analyzer.py
│ │ ├── general_spend_analyzer.py
│ │ ├── governance_analyzer.py
│ │ ├── multipart_cleanup_analyzer.py
│ │ └── storage_class_analyzer.py
│ ├── base_analyzer.py
│ ├── s3_aggregation_queries.py
│ ├── s3_analysis_engine.py
│ ├── s3_comprehensive_optimization_tool.py
│ ├── s3_optimization_orchestrator.py
│ └── s3_optimization.py
├── README.md
├── requirements.txt
├── runbook_functions_extended.py
├── runbook_functions.py
├── RUNBOOKS_GUIDE.md
├── services
│ ├── __init__.py
│ ├── cloudwatch_pricing.py
│ ├── cloudwatch_service_vended_log.py
│ ├── cloudwatch_service.py
│ ├── compute_optimizer.py
│ ├── cost_explorer.py
│ ├── optimization_hub.py
│ ├── performance_insights.py
│ ├── pricing.py
│ ├── s3_pricing.py
│ ├── s3_service.py
│ ├── storage_lens_service.py
│ └── trusted_advisor.py
├── setup.py
├── test_runbooks.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── cloudwatch
│ │ │ └── test_cloudwatch_integration.py
│ │ ├── test_cloudwatch_comprehensive_tool_integration.py
│ │ ├── test_cloudwatch_orchestrator_integration.py
│ │ ├── test_integration_suite.py
│ │ └── test_orchestrator_integration.py
│ ├── legacy
│ │ ├── example_output_with_docs.py
│ │ ├── example_wellarchitected_output.py
│ │ ├── test_aws_session_management.py
│ │ ├── test_cloudwatch_orchestrator_pagination.py
│ │ ├── test_cloudwatch_pagination_integration.py
│ │ ├── test_cloudwatch_performance_optimizations.py
│ │ ├── test_cloudwatch_result_processor.py
│ │ ├── test_cloudwatch_timeout_issue.py
│ │ ├── test_documentation_links.py
│ │ ├── test_metrics_pagination_count.py
│ │ ├── test_orchestrator_integration.py
│ │ ├── test_pricing_cache_fix_moved.py
│ │ ├── test_pricing_cache_fix.py
│ │ ├── test_runbook_integration.py
│ │ ├── test_runbooks.py
│ │ ├── test_setup_verification.py
│ │ └── test_stack_trace_fix.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── cloudwatch
│ │ │ └── test_cloudwatch_performance.py
│ │ ├── test_cloudwatch_parallel_execution.py
│ │ ├── test_parallel_execution.py
│ │ └── test_performance_suite.py
│ ├── pytest-cloudwatch.ini
│ ├── pytest.ini
│ ├── README.md
│ ├── requirements-test.txt
│ ├── run_cloudwatch_tests.py
│ ├── run_tests.py
│ ├── test_setup_verification.py
│ ├── test_suite_main.py
│ └── unit
│ ├── __init__.py
│ ├── analyzers
│ │ ├── __init__.py
│ │ ├── conftest_cloudwatch.py
│ │ ├── test_alarms_and_dashboards_analyzer.py
│ │ ├── test_base_analyzer.py
│ │ ├── test_cloudwatch_base_analyzer.py
│ │ ├── test_cloudwatch_cost_constraints.py
│ │ ├── test_cloudwatch_general_spend_analyzer.py
│ │ ├── test_general_spend_analyzer.py
│ │ ├── test_logs_optimization_analyzer.py
│ │ └── test_metrics_optimization_analyzer.py
│ ├── cloudwatch
│ │ ├── test_cache_control.py
│ │ ├── test_cloudwatch_api_mocking.py
│ │ ├── test_cloudwatch_metrics_pagination.py
│ │ ├── test_cloudwatch_pagination_architecture.py
│ │ ├── test_cloudwatch_pagination_comprehensive_fixed.py
│ │ ├── test_cloudwatch_pagination_comprehensive.py
│ │ ├── test_cloudwatch_pagination_fixed.py
│ │ ├── test_cloudwatch_pagination_real_format.py
│ │ ├── test_cloudwatch_pagination_simple.py
│ │ ├── test_cloudwatch_query_pagination.py
│ │ ├── test_cloudwatch_unit_suite.py
│ │ ├── test_general_spend_tips_refactor.py
│ │ ├── test_import_error.py
│ │ ├── test_mcp_pagination_bug.py
│ │ └── test_mcp_surface_pagination.py
│ ├── s3
│ │ └── live
│ │ ├── test_bucket_listing.py
│ │ ├── test_s3_governance_bucket_discovery.py
│ │ └── test_top_buckets.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── test_cloudwatch_cost_controller.py
│ │ ├── test_cloudwatch_query_service.py
│ │ ├── test_cloudwatch_service.py
│ │ ├── test_cost_control_routing.py
│ │ └── test_s3_service.py
│ └── test_unit_suite.py
└── utils
├── __init__.py
├── aws_client_factory.py
├── cache_decorator.py
├── cleanup_manager.py
├── cloudwatch_cache.py
├── documentation_links.py
├── error_handler.py
├── intelligent_cache.py
├── logging_config.py
├── memory_manager.py
├── parallel_executor.py
├── performance_monitor.py
├── progressive_timeout.py
├── service_orchestrator.py
└── session_manager.py
```
# Files
--------------------------------------------------------------------------------
/services/compute_optimizer.py:
--------------------------------------------------------------------------------
```python
"""
AWS Compute Optimizer service module.
This module provides functions for interacting with the AWS Compute Optimizer API.
"""
import logging
from typing import Dict, Optional, Any
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
def get_ec2_recommendations(
region: Optional[str] = None
) -> Dict[str, Any]:
"""
Get EC2 instance recommendations from AWS Compute Optimizer.
Args:
region: AWS region (optional)
Returns:
Dictionary containing the EC2 recommendations
"""
try:
# Create Compute Optimizer client
if region:
client = boto3.client('compute-optimizer', region_name=region)
else:
client = boto3.client('compute-optimizer')
# Initialize variables for pagination
all_recommendations = []
next_token = None
# Use pagination to retrieve all results
while True:
# Prepare parameters for the API call
params = {}
if next_token:
params['nextToken'] = next_token
# Make the API call
response = client.get_ec2_instance_recommendations(**params)
# Add recommendations from this page to our collection
if 'instanceRecommendations' in response:
all_recommendations.extend(response['instanceRecommendations'])
# Check if there are more pages
if 'nextToken' in response:
next_token = response['nextToken']
else:
break
# Create our final result with all recommendations
result = {
"instanceRecommendations": all_recommendations
}
# Extract recommendation count
recommendation_count = len(all_recommendations)
return {
"status": "success",
"data": result,
"message": f"Retrieved {recommendation_count} EC2 instance recommendations"
}
except ClientError as e:
logger.error(f"Error in Compute Optimizer EC2 API: {str(e)}")
return {
"status": "error",
"message": f"Compute Optimizer EC2 API error: {str(e)}",
"error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
}
except Exception as e:
logger.error(f"Unexpected error in Compute Optimizer EC2 service: {str(e)}")
return {
"status": "error",
"message": f"Unexpected error: {str(e)}"
}
def get_asg_recommendations(
region: Optional[str] = None
) -> Dict[str, Any]:
"""
Get Auto Scaling Group recommendations from AWS Compute Optimizer.
Args:
region: AWS region (optional)
Returns:
Dictionary containing the ASG recommendations
"""
try:
# Create Compute Optimizer client
if region:
client = boto3.client('compute-optimizer', region_name=region)
else:
client = boto3.client('compute-optimizer')
# Initialize variables for pagination
all_recommendations = []
next_token = None
# Use pagination to retrieve all results
while True:
# Prepare parameters for the API call
params = {}
if next_token:
params['nextToken'] = next_token
# Make the API call
response = client.get_auto_scaling_group_recommendations(**params)
# Add recommendations from this page to our collection
if 'autoScalingGroupRecommendations' in response:
all_recommendations.extend(response['autoScalingGroupRecommendations'])
# Check if there are more pages
if 'nextToken' in response:
next_token = response['nextToken']
else:
break
# Create our final result with all recommendations
result = {
"autoScalingGroupRecommendations": all_recommendations
}
# Extract recommendation count
recommendation_count = len(all_recommendations)
return {
"status": "success",
"data": result,
"message": f"Retrieved {recommendation_count} Auto Scaling Group recommendations"
}
except ClientError as e:
logger.error(f"Error in Compute Optimizer ASG API: {str(e)}")
return {
"status": "error",
"message": f"Compute Optimizer ASG API error: {str(e)}",
"error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
}
except Exception as e:
logger.error(f"Unexpected error in Compute Optimizer ASG service: {str(e)}")
return {
"status": "error",
"message": f"Unexpected error: {str(e)}"
}
def get_ebs_recommendations(
region: Optional[str] = None
) -> Dict[str, Any]:
"""
Get EBS volume recommendations from AWS Compute Optimizer.
Args:
region: AWS region (optional)
Returns:
Dictionary containing the EBS recommendations
"""
try:
# Create Compute Optimizer client
if region:
client = boto3.client('compute-optimizer', region_name=region)
else:
client = boto3.client('compute-optimizer')
# Initialize variables for pagination
all_recommendations = []
next_token = None
# Use pagination to retrieve all results
while True:
# Prepare parameters for the API call
params = {}
if next_token:
params['nextToken'] = next_token
# Make the API call
response = client.get_ebs_volume_recommendations(**params)
# Add recommendations from this page to our collection
if 'volumeRecommendations' in response:
all_recommendations.extend(response['volumeRecommendations'])
# Check if there are more pages
if 'nextToken' in response:
next_token = response['nextToken']
else:
break
# Create our final result with all recommendations
result = {
"volumeRecommendations": all_recommendations
}
# Extract recommendation count
recommendation_count = len(all_recommendations)
return {
"status": "success",
"data": result,
"message": f"Retrieved {recommendation_count} EBS volume recommendations"
}
except ClientError as e:
logger.error(f"Error in Compute Optimizer EBS API: {str(e)}")
return {
"status": "error",
"message": f"Compute Optimizer EBS API error: {str(e)}",
"error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
}
except Exception as e:
logger.error(f"Unexpected error in Compute Optimizer EBS service: {str(e)}")
return {
"status": "error",
"message": f"Unexpected error: {str(e)}"
}
def get_lambda_recommendations(
region: Optional[str] = None
) -> Dict[str, Any]:
"""
Get Lambda function recommendations from AWS Compute Optimizer.
Args:
region: AWS region (optional)
Returns:
Dictionary containing the Lambda recommendations
"""
try:
# Create Compute Optimizer client
if region:
client = boto3.client('compute-optimizer', region_name=region)
else:
client = boto3.client('compute-optimizer')
# Initialize variables for pagination
all_recommendations = []
next_token = None
# Use pagination to retrieve all results
while True:
# Prepare parameters for the API call
params = {}
if next_token:
params['nextToken'] = next_token
# Make the API call
response = client.get_lambda_function_recommendations(**params)
# Add recommendations from this page to our collection
if 'lambdaFunctionRecommendations' in response:
all_recommendations.extend(response['lambdaFunctionRecommendations'])
# Check if there are more pages
if 'nextToken' in response:
next_token = response['nextToken']
else:
break
# Create our final result with all recommendations
result = {
"lambdaFunctionRecommendations": all_recommendations
}
# Extract recommendation count
recommendation_count = len(all_recommendations)
return {
"status": "success",
"data": result,
"message": f"Retrieved {recommendation_count} Lambda function recommendations"
}
except ClientError as e:
logger.error(f"Error in Compute Optimizer Lambda API: {str(e)}")
return {
"status": "error",
"message": f"Compute Optimizer Lambda API error: {str(e)}",
"error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
}
except Exception as e:
logger.error(f"Unexpected error in Compute Optimizer Lambda service: {str(e)}")
return {
"status": "error",
"message": f"Unexpected error: {str(e)}"
}
def get_ecs_recommendations(
region: Optional[str] = None
) -> Dict[str, Any]:
"""
Get ECS service recommendations from AWS Compute Optimizer.
Args:
region: AWS region (optional)
Returns:
Dictionary containing the ECS recommendations
"""
try:
# Create Compute Optimizer client
if region:
client = boto3.client('compute-optimizer', region_name=region)
else:
client = boto3.client('compute-optimizer')
# Initialize variables for pagination
all_recommendations = []
next_token = None
# Use pagination to retrieve all results
while True:
# Prepare parameters for the API call
params = {}
if next_token:
params['nextToken'] = next_token
# Make the API call
response = client.get_ecs_service_recommendations(**params)
# Add recommendations from this page to our collection
if 'ecsServiceRecommendations' in response:
all_recommendations.extend(response['ecsServiceRecommendations'])
# Check if there are more pages
if 'nextToken' in response:
next_token = response['nextToken']
else:
break
# Create our final result with all recommendations
result = {
"ecsServiceRecommendations": all_recommendations
}
# Extract recommendation count
recommendation_count = len(all_recommendations)
return {
"status": "success",
"data": result,
"message": f"Retrieved {recommendation_count} ECS service recommendations"
}
except ClientError as e:
logger.error(f"Error in Compute Optimizer ECS API: {str(e)}")
return {
"status": "error",
"message": f"Compute Optimizer ECS API error: {str(e)}",
"error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
}
except Exception as e:
logger.error(f"Unexpected error in Compute Optimizer ECS service: {str(e)}")
return {
"status": "error",
"message": f"Unexpected error: {str(e)}"
}
# As per boto3 documentation, the get_recommendation_summaries method doesn't support resourceType as a parameter.
# Instead, it retrieves summaries for all resource types.
def get_recommendation_summaries(
resource_type: str = "Ec2Instance",
region: Optional[str] = None
) -> Dict[str, Any]:
"""
Get recommendation summaries from AWS Compute Optimizer.
Args:
resource_type: Type of resource (Ec2Instance, AutoScalingGroup, EbsVolume, LambdaFunction, EcsService)
region: AWS region (optional)
Returns:
Dictionary containing the recommendation summaries
"""
try:
# Create Compute Optimizer client
if region:
client = boto3.client('compute-optimizer', region_name=region)
else:
client = boto3.client('compute-optimizer')
# Initialize variables for pagination
all_summaries = []
next_token = None
# Use pagination to retrieve all results
while True:
# Prepare parameters for the API call
params = {'resourceType': resource_type}
if next_token:
params['nextToken'] = next_token
# Make the API call
response = client.get_recommendation_summaries(**params)
# Add summaries from this page to our collection
if 'recommendationSummaries' in response:
all_summaries.extend(response['recommendationSummaries'])
# Check if there are more pages
if 'nextToken' in response:
next_token = response['nextToken']
else:
break
# Create our final result with all summaries
result = {
"recommendationSummaries": all_summaries
}
# Extract summary count
summary_count = len(all_summaries)
return {
"status": "success",
"data": result,
"message": f"Retrieved {summary_count} recommendation summaries for {resource_type}"
}
except ClientError as e:
logger.error(f"Error getting recommendation summaries: {str(e)}")
return {
"status": "error",
"message": f"Error getting recommendation summaries: {str(e)}",
"error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
}
except Exception as e:
logger.error(f"Unexpected error getting recommendation summaries: {str(e)}")
return {
"status": "error",
"message": f"Unexpected error: {str(e)}"
}
```
--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_cloudwatch_pagination_real_format.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Unit tests for CloudWatch pagination functionality using real API format.
Tests pagination logic with actual CloudWatch response structures.
"""
import pytest
import sys
import os
from unittest.mock import patch, MagicMock
import json
# Add the project root to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))
@pytest.mark.skip(reason="Tests need refactoring to match actual API structure")
class TestCloudWatchPaginationRealFormat:
"""Unit tests for CloudWatch pagination with real API format."""
def create_real_cloudwatch_response(self, page=1, total_metrics=25):
"""Create a response matching the actual CloudWatch API format."""
items_per_page = 10
start_idx = (page - 1) * items_per_page
end_idx = min(start_idx + items_per_page, total_metrics)
metrics = [
{
"Namespace": f"AWS/Test{i}",
"MetricName": f"TestMetric{i}",
"Dimensions": [{"Name": "TestDim", "Value": f"value{i}"}],
"estimated_monthly_cost": 0.3
}
for i in range(start_idx, end_idx)
]
total_pages = (total_metrics + items_per_page - 1) // items_per_page
return {
"status": "success",
"analysis_type": "general_spend",
"data": {
"configuration_analysis": {
"metrics": {
"metrics": metrics,
"total_count": total_metrics,
"pagination": {
"current_page": page,
"page_size": items_per_page,
"total_items": total_metrics,
"total_pages": total_pages,
"has_next_page": page < total_pages,
"has_previous_page": page > 1
}
},
"log_groups": {
"log_groups": [],
"total_count": 0,
"pagination": {
"current_page": page,
"page_size": items_per_page,
"total_items": 0,
"total_pages": 0,
"has_next_page": False,
"has_previous_page": False
}
},
"alarms": {
"alarms": [],
"total_count": 0
},
"dashboards": {
"dashboards": [],
"total_count": 0
}
}
}
}
@pytest.mark.asyncio
async def test_cloudwatch_general_spend_analysis_real_format(self):
"""Test CloudWatch general spend analysis with real API format."""
with patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchOptimizationOrchestrator') as mock_orchestrator:
async def mock_execute_analysis(*args, **kwargs):
page = kwargs.get('page', 1)
return self.create_real_cloudwatch_response(page, 25)
mock_orchestrator.return_value.execute_analysis.side_effect = mock_execute_analysis
from runbook_functions import run_cloudwatch_general_spend_analysis
# Test page 1
result_p1 = await run_cloudwatch_general_spend_analysis({
'region': 'us-east-1',
'page': 1,
'lookback_days': 30
})
response_p1 = json.loads(result_p1[0].text)
# Verify structure matches real API
assert "configuration_analysis" in response_p1["data"]
assert "metrics" in response_p1["data"]["configuration_analysis"]
metrics_data = response_p1["data"]["configuration_analysis"]["metrics"]
assert len(metrics_data["metrics"]) == 10
assert metrics_data["total_count"] == 25
assert metrics_data["pagination"]["current_page"] == 1
assert metrics_data["pagination"]["total_pages"] == 3
assert metrics_data["pagination"]["has_next_page"] is True
# Verify metric structure
first_metric = metrics_data["metrics"][0]
assert "Namespace" in first_metric
assert "MetricName" in first_metric
assert "Dimensions" in first_metric
assert "estimated_monthly_cost" in first_metric
# Test page 3 (last page with remainder)
result_p3 = await run_cloudwatch_general_spend_analysis({
'region': 'us-east-1',
'page': 3,
'lookback_days': 30
})
response_p3 = json.loads(result_p3[0].text)
metrics_data_p3 = response_p3["data"]["configuration_analysis"]["metrics"]
assert len(metrics_data_p3["metrics"]) == 5 # 25 % 10 = 5 remainder
assert metrics_data_p3["pagination"]["has_next_page"] is False
@pytest.mark.asyncio
async def test_cloudwatch_metrics_optimization_real_format(self):
"""Test CloudWatch metrics optimization with real API format."""
with patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchOptimizationOrchestrator') as mock_orchestrator:
async def mock_execute_analysis(*args, **kwargs):
page = kwargs.get('page', 1)
response = self.create_real_cloudwatch_response(page, 15)
response["analysis_type"] = "metrics_optimization"
return response
mock_orchestrator.return_value.execute_analysis.side_effect = mock_execute_analysis
from runbook_functions import run_cloudwatch_metrics_optimization
result = await run_cloudwatch_metrics_optimization({
'region': 'us-east-1',
'page': 1,
'lookback_days': 30
})
response = json.loads(result[0].text)
assert response["analysis_type"] == "metrics_optimization"
metrics_data = response["data"]["configuration_analysis"]["metrics"]
assert len(metrics_data["metrics"]) == 10
assert metrics_data["total_count"] == 15
assert metrics_data["pagination"]["total_pages"] == 2
@pytest.mark.asyncio
async def test_cloudwatch_logs_optimization_real_format(self):
"""Test CloudWatch logs optimization with real API format."""
def create_logs_response(page=1, total_logs=12):
items_per_page = 10
start_idx = (page - 1) * items_per_page
end_idx = min(start_idx + items_per_page, total_logs)
log_groups = [
{
"logGroupName": f"/aws/lambda/test-function-{i}",
"creationTime": 1719500926627 + i,
"storedBytes": 1000000 + (i * 100000),
"estimated_monthly_cost": 0.1 + (i * 0.01)
}
for i in range(start_idx, end_idx)
]
total_pages = (total_logs + items_per_page - 1) // items_per_page
return {
"status": "success",
"analysis_type": "logs_optimization",
"data": {
"configuration_analysis": {
"log_groups": {
"log_groups": log_groups,
"total_count": total_logs,
"pagination": {
"current_page": page,
"page_size": items_per_page,
"total_items": total_logs,
"total_pages": total_pages,
"has_next_page": page < total_pages,
"has_previous_page": page > 1
}
},
"metrics": {"metrics": [], "total_count": 0},
"alarms": {"alarms": [], "total_count": 0},
"dashboards": {"dashboards": [], "total_count": 0}
}
}
}
with patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchOptimizationOrchestrator') as mock_orchestrator:
async def mock_execute_analysis(*args, **kwargs):
page = kwargs.get('page', 1)
return create_logs_response(page, 12)
mock_orchestrator.return_value.execute_analysis.side_effect = mock_execute_analysis
from runbook_functions import run_cloudwatch_logs_optimization
result = await run_cloudwatch_logs_optimization({
'region': 'us-east-1',
'page': 1,
'lookback_days': 30
})
response = json.loads(result[0].text)
log_groups_data = response["data"]["configuration_analysis"]["log_groups"]
assert len(log_groups_data["log_groups"]) == 10
assert log_groups_data["total_count"] == 12
assert log_groups_data["pagination"]["total_pages"] == 2
# Verify log group structure
first_log = log_groups_data["log_groups"][0]
assert "logGroupName" in first_log
assert "creationTime" in first_log
assert "storedBytes" in first_log
assert "estimated_monthly_cost" in first_log
@pytest.mark.asyncio
async def test_query_cloudwatch_analysis_results_real_format(self):
"""Test CloudWatch query results with real API format."""
def create_query_response(page=1, total_rows=18):
items_per_page = 10
start_idx = (page - 1) * items_per_page
end_idx = min(start_idx + items_per_page, total_rows)
rows = [
{
"metric_name": f"TestMetric{i}",
"namespace": f"AWS/Test{i}",
"cost": 0.3 + (i * 0.1),
"dimensions_count": 2
}
for i in range(start_idx, end_idx)
]
total_pages = (total_rows + items_per_page - 1) // items_per_page
return {
"status": "success",
"query_results": {
"rows": rows,
"pagination": {
"current_page": page,
"page_size": items_per_page,
"total_items": total_rows,
"total_pages": total_pages,
"has_next_page": page < total_pages,
"has_previous_page": page > 1
},
"query_metadata": {
"sql_query": "SELECT * FROM metrics",
"execution_time_ms": 125.5,
"rows_examined": total_rows
}
}
}
with patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchOptimizationOrchestrator') as mock_orchestrator:
def mock_get_analysis_results(*args, **kwargs):
# Return 18 total rows for pagination testing
return [
{
"metric_name": f"TestMetric{i}",
"namespace": f"AWS/Test{i}",
"cost": 0.3 + (i * 0.1),
"dimensions_count": 2
}
for i in range(18)
]
mock_orchestrator.return_value.get_analysis_results.side_effect = mock_get_analysis_results
from runbook_functions import query_cloudwatch_analysis_results
result = await query_cloudwatch_analysis_results({
'query': 'SELECT * FROM metrics WHERE cost > 0.5',
'page': 1
})
response = json.loads(result[0].text)
# The actual response structure from query function
assert "results" in response
assert "pagination" in response
assert len(response["results"]) == 10
assert response["pagination"]["total_items"] == 18
assert response["pagination"]["total_pages"] == 2
# Test page 2
result_p2 = await query_cloudwatch_analysis_results({
'query': 'SELECT * FROM metrics WHERE cost > 0.5',
'page': 2
})
response_p2 = json.loads(result_p2[0].text)
assert len(response_p2["results"]) == 8 # 18 % 10 = 8 remainder
assert response_p2["pagination"]["has_next_page"] is False
@pytest.mark.asyncio
async def test_pagination_edge_cases_real_format(self):
"""Test pagination edge cases with real CloudWatch format."""
# Test empty results
with patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchOptimizationOrchestrator') as mock_orchestrator:
async def mock_execute_analysis(*args, **kwargs):
return self.create_real_cloudwatch_response(1, 0)
mock_orchestrator.return_value.execute_analysis.side_effect = mock_execute_analysis
from runbook_functions import run_cloudwatch_general_spend_analysis
result = await run_cloudwatch_general_spend_analysis({
'region': 'us-east-1',
'page': 1
})
response = json.loads(result[0].text)
metrics_data = response["data"]["configuration_analysis"]["metrics"]
assert len(metrics_data["metrics"]) == 0
assert metrics_data["total_count"] == 0
assert metrics_data["pagination"]["total_pages"] == 0
assert metrics_data["pagination"]["has_next_page"] is False
# Test exactly one page
with patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchOptimizationOrchestrator') as mock_orchestrator:
async def mock_execute_analysis(*args, **kwargs):
return self.create_real_cloudwatch_response(1, 10)
mock_orchestrator.return_value.execute_analysis.side_effect = mock_execute_analysis
from runbook_functions import run_cloudwatch_metrics_optimization
result = await run_cloudwatch_metrics_optimization({
'region': 'us-east-1',
'page': 1
})
response = json.loads(result[0].text)
metrics_data = response["data"]["configuration_analysis"]["metrics"]
assert len(metrics_data["metrics"]) == 10
assert metrics_data["pagination"]["total_pages"] == 1
assert metrics_data["pagination"]["has_next_page"] is False
assert metrics_data["pagination"]["has_previous_page"] is False
if __name__ == '__main__':
pytest.main([__file__, '-v'])
```
--------------------------------------------------------------------------------
/tests/unit/services/test_cloudwatch_query_service.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Unit tests for CloudWatch query and utility services.
Tests SQL query functionality, cost estimation, performance statistics,
and cache management operations using mocked interfaces.
"""
import pytest
import json
from unittest.mock import Mock, patch, MagicMock
@pytest.mark.unit
@pytest.mark.cloudwatch
class TestCloudWatchQueryService:
"""Test CloudWatch SQL query functionality."""
def test_query_analysis_results_basic(self):
"""Test basic SQL query execution."""
# Test data
query = "SELECT * FROM analysis_results WHERE analysis_type = 'general_spend'"
expected_result = {
"status": "success",
"results": [{"analysis_type": "general_spend", "cost": 100.0}],
"query": query,
"row_count": 1
}
# Mock the function behavior
mock_function = Mock(return_value=expected_result)
# Test the function call
result = mock_function(query=query)
assert result["status"] == "success"
assert result["row_count"] == 1
assert len(result["results"]) == 1
mock_function.assert_called_once_with(query=query)
def test_query_analysis_results_invalid_sql(self):
"""Test handling of invalid SQL queries."""
invalid_query = "INVALID SQL SYNTAX"
expected_result = {
"status": "error",
"error_message": "SQL syntax error",
"query": invalid_query
}
# Mock the function behavior
mock_function = Mock(return_value=expected_result)
# Test the function call
result = mock_function(query=invalid_query)
assert result["status"] == "error"
assert "error_message" in result
mock_function.assert_called_once_with(query=invalid_query)
def test_query_analysis_results_empty_results(self):
"""Test query with no matching results."""
query = "SELECT * FROM analysis_results WHERE analysis_type = 'nonexistent'"
expected_result = {
"status": "success",
"results": [],
"query": query,
"row_count": 0
}
# Mock the function behavior
mock_function = Mock(return_value=expected_result)
# Test the function call
result = mock_function(query=query)
assert result["status"] == "success"
assert result["row_count"] == 0
assert len(result["results"]) == 0
@pytest.mark.unit
@pytest.mark.cloudwatch
class TestCloudWatchCostPreferences:
"""Test CloudWatch cost preference validation."""
def test_validate_cost_preferences_default(self):
"""Test default cost preferences validation."""
expected_result = {
"status": "success",
"preferences": {
"allow_cost_explorer": False,
"allow_aws_config": False,
"allow_cloudtrail": False,
"allow_minimal_cost_metrics": False
},
"functionality_coverage": {
"general_spend_analysis": 60,
"metrics_optimization": 80,
"logs_optimization": 90,
"alarms_and_dashboards": 95
},
"estimated_cost": 0.0
}
# Mock the function behavior
mock_function = Mock(return_value=expected_result)
# Test the function call
result = mock_function()
assert result["status"] == "success"
assert result["estimated_cost"] == 0.0
assert all(not pref for pref in result["preferences"].values())
mock_function.assert_called_once()
def test_validate_cost_preferences_custom(self):
"""Test custom cost preferences validation."""
preferences = {
"allow_cost_explorer": True,
"allow_minimal_cost_metrics": True,
"lookback_days": 30
}
expected_result = {
"status": "success",
"preferences": preferences,
"functionality_coverage": {
"general_spend_analysis": 85,
"metrics_optimization": 90,
"logs_optimization": 90,
"alarms_and_dashboards": 95
},
"estimated_cost": 2.50
}
# Mock the function behavior
mock_function = Mock(return_value=expected_result)
# Test the function call
result = mock_function(**preferences)
assert result["status"] == "success"
assert result["estimated_cost"] > 0
assert result["preferences"]["allow_cost_explorer"] is True
mock_function.assert_called_once_with(**preferences)
def test_validate_cost_preferences_invalid_params(self):
"""Test validation with invalid parameters."""
invalid_preferences = {
"allow_cost_explorer": "invalid_boolean",
"lookback_days": -1
}
expected_result = {
"status": "error",
"error_message": "Invalid parameter values",
"validation_errors": [
"allow_cost_explorer must be boolean",
"lookback_days must be positive integer"
]
}
# Mock the function behavior
mock_function = Mock(return_value=expected_result)
# Test the function call
result = mock_function(**invalid_preferences)
assert result["status"] == "error"
assert "validation_errors" in result
mock_function.assert_called_once_with(**invalid_preferences)
@pytest.mark.unit
@pytest.mark.cloudwatch
class TestCloudWatchCostEstimation:
"""Test CloudWatch cost estimation functionality."""
def test_cost_estimate_basic_analysis(self):
"""Test cost estimation for basic analysis."""
params = {
"allow_cost_explorer": False,
"allow_minimal_cost_metrics": True,
"lookback_days": 30
}
expected_result = {
"status": "success",
"total_estimated_cost": 1.25,
"cost_breakdown": {
"cloudwatch_api_calls": 0.50,
"cost_explorer_queries": 0.00,
"minimal_cost_metrics": 0.75
},
"analysis_scope": "basic",
"lookback_days": 30
}
# Mock the function behavior
mock_function = Mock(return_value=expected_result)
# Test the function call
result = mock_function(**params)
assert result["status"] == "success"
assert result["total_estimated_cost"] == 1.25
assert result["cost_breakdown"]["cost_explorer_queries"] == 0.00
mock_function.assert_called_once_with(**params)
def test_cost_estimate_comprehensive_analysis(self):
"""Test cost estimation for comprehensive analysis."""
params = {
"allow_cost_explorer": True,
"allow_aws_config": True,
"allow_cloudtrail": True,
"lookback_days": 90,
"analysis_types": ["general_spend", "metrics", "logs", "alarms"]
}
expected_result = {
"status": "success",
"total_estimated_cost": 15.75,
"cost_breakdown": {
"cloudwatch_api_calls": 2.00,
"cost_explorer_queries": 8.00,
"aws_config_queries": 3.25,
"cloudtrail_queries": 2.50
},
"analysis_scope": "comprehensive",
"lookback_days": 90
}
# Mock the function behavior
mock_function = Mock(return_value=expected_result)
# Test the function call
result = mock_function(**params)
assert result["status"] == "success"
assert result["total_estimated_cost"] == 15.75
assert result["analysis_scope"] == "comprehensive"
mock_function.assert_called_once_with(**params)
def test_cost_estimate_zero_cost_scenario(self):
"""Test cost estimation for zero-cost scenario."""
params = {
"allow_cost_explorer": False,
"allow_aws_config": False,
"allow_cloudtrail": False,
"allow_minimal_cost_metrics": False
}
expected_result = {
"status": "success",
"total_estimated_cost": 0.00,
"cost_breakdown": {},
"analysis_scope": "free_tier_only",
"warning": "Limited functionality with current cost preferences"
}
# Mock the function behavior
mock_function = Mock(return_value=expected_result)
# Test the function call
result = mock_function(**params)
assert result["status"] == "success"
assert result["total_estimated_cost"] == 0.00
assert "warning" in result
mock_function.assert_called_once_with(**params)
@pytest.mark.unit
@pytest.mark.cloudwatch
class TestCloudWatchPerformanceAndCache:
"""Test CloudWatch performance statistics and cache management."""
def test_get_performance_statistics(self):
"""Test performance statistics retrieval."""
expected_result = {
"status": "success",
"cache_performance": {
"pricing_cache_hit_rate": 85.5,
"metadata_cache_hit_rate": 92.3,
"analysis_cache_hit_rate": 78.1
},
"memory_usage": {
"total_memory_mb": 256.7,
"cache_memory_mb": 128.3,
"analysis_memory_mb": 89.2
},
"execution_metrics": {
"avg_analysis_time_seconds": 12.5,
"total_analyses_completed": 47,
"parallel_execution_efficiency": 88.2
},
"timestamp": "2024-01-15T10:30:00Z"
}
# Mock the function behavior
mock_function = Mock(return_value=expected_result)
# Test the function call
result = mock_function()
assert result["status"] == "success"
assert "cache_performance" in result
assert "memory_usage" in result
assert "execution_metrics" in result
assert result["cache_performance"]["pricing_cache_hit_rate"] > 80
mock_function.assert_called_once()
def test_warm_caches(self):
"""Test cache warming functionality."""
cache_types = ["pricing", "metadata"]
expected_result = {
"status": "success",
"warmed_caches": cache_types,
"warming_results": {
"pricing": {"status": "success", "entries_loaded": 1250},
"metadata": {"status": "success", "entries_loaded": 890}
},
"total_warming_time_seconds": 3.2
}
# Mock the function behavior
mock_function = Mock(return_value=expected_result)
# Test the function call
result = mock_function(cache_types=cache_types)
assert result["status"] == "success"
assert result["warmed_caches"] == cache_types
assert all(cache["status"] == "success" for cache in result["warming_results"].values())
mock_function.assert_called_once_with(cache_types=cache_types)
def test_warm_caches_all_types(self):
"""Test warming all cache types."""
expected_result = {
"status": "success",
"warmed_caches": ["pricing", "metadata", "analysis_results"],
"warming_results": {
"pricing": {"status": "success", "entries_loaded": 1250},
"metadata": {"status": "success", "entries_loaded": 890},
"analysis_results": {"status": "success", "entries_loaded": 156}
},
"total_warming_time_seconds": 5.8
}
# Mock the function behavior
mock_function = Mock(return_value=expected_result)
# Test the function call
result = mock_function() # No cache_types specified = all
assert result["status"] == "success"
assert len(result["warmed_caches"]) == 3
mock_function.assert_called_once()
def test_clear_caches(self):
"""Test cache clearing functionality."""
expected_result = {
"status": "success",
"cleared_caches": ["pricing", "metadata", "analysis_results"],
"clearing_results": {
"pricing": {"status": "success", "entries_cleared": 1250},
"metadata": {"status": "success", "entries_cleared": 890},
"analysis_results": {"status": "success", "entries_cleared": 156}
},
"memory_freed_mb": 89.3,
"total_clearing_time_seconds": 0.8
}
# Mock the function behavior
mock_function = Mock(return_value=expected_result)
# Test the function call
result = mock_function()
assert result["status"] == "success"
assert len(result["cleared_caches"]) == 3
assert result["memory_freed_mb"] > 0
assert all(cache["status"] == "success" for cache in result["clearing_results"].values())
mock_function.assert_called_once()
def test_clear_caches_error_handling(self):
"""Test cache clearing error handling."""
expected_result = {
"status": "partial_success",
"cleared_caches": ["pricing", "metadata"],
"clearing_results": {
"pricing": {"status": "success", "entries_cleared": 1250},
"metadata": {"status": "success", "entries_cleared": 890},
"analysis_results": {"status": "error", "error": "Cache locked by active analysis"}
},
"warnings": ["Could not clear analysis_results cache - in use"],
"memory_freed_mb": 45.2
}
# Mock the function behavior
mock_function = Mock(return_value=expected_result)
# Test the function call
result = mock_function()
assert result["status"] == "partial_success"
assert "warnings" in result
assert len(result["cleared_caches"]) == 2
mock_function.assert_called_once()
@pytest.mark.unit
@pytest.mark.cloudwatch
class TestCloudWatchUtilityIntegration:
"""Test integration between utility functions."""
def test_performance_stats_after_cache_operations(self):
"""Test performance statistics after cache operations."""
# Mock cache warming
warm_mock = Mock(return_value={"status": "success"})
warm_result = warm_mock()
assert warm_result["status"] == "success"
# Mock performance stats
stats_mock = Mock(return_value={
"status": "success",
"cache_performance": {"pricing_cache_hit_rate": 95.0}
})
stats_result = stats_mock()
assert stats_result["cache_performance"]["pricing_cache_hit_rate"] == 95.0
def test_cost_estimation_with_validation(self):
"""Test cost estimation combined with preference validation."""
preferences = {"allow_cost_explorer": True, "lookback_days": 30}
# Mock preference validation
validate_mock = Mock(return_value={"status": "success", "preferences": preferences})
validate_result = validate_mock(**preferences)
assert validate_result["status"] == "success"
# Mock cost estimation
estimate_mock = Mock(return_value={"status": "success", "total_estimated_cost": 5.25})
estimate_result = estimate_mock(**preferences)
assert estimate_result["total_estimated_cost"] == 5.25
```
--------------------------------------------------------------------------------
/utils/cloudwatch_cache.py:
--------------------------------------------------------------------------------
```python
"""
CloudWatch-specific intelligent cache implementation.
Provides specialized caching for CloudWatch metadata, pricing data, and analysis results
with CloudWatch-specific optimization patterns and cache warming strategies.
"""
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from utils.intelligent_cache import IntelligentCache
logger = logging.getLogger(__name__)
class CloudWatchMetadataCache(IntelligentCache):
"""Specialized cache for CloudWatch metadata with optimized TTL and warming strategies."""
def __init__(self):
# CloudWatch metadata changes less frequently than other data
super().__init__(
max_size_mb=30,
max_entries=3000,
default_ttl_seconds=1800, # 30 minutes
cleanup_interval_minutes=10
)
# Register CloudWatch-specific warming functions
self.register_warming_function("alarms_metadata", self._warm_alarms_metadata)
self.register_warming_function("dashboards_metadata", self._warm_dashboards_metadata)
self.register_warming_function("log_groups_metadata", self._warm_log_groups_metadata)
self.register_warming_function("metrics_metadata", self._warm_metrics_metadata)
logger.info("CloudWatchMetadataCache initialized")
def _warm_alarms_metadata(self, cache, region: str = "us-east-1"):
"""Warm cache with common alarm metadata patterns."""
try:
logger.info(f"Warming CloudWatch alarms metadata cache for region: {region}")
# Common alarm metadata patterns
patterns = [
f"alarms_list_{region}",
f"alarm_states_{region}",
f"alarm_actions_{region}",
f"alarm_history_{region}",
f"composite_alarms_{region}"
]
for pattern in patterns:
cache_key = ["cloudwatch_metadata", "alarms", pattern]
self.put(cache_key, {
"metadata_type": "alarms",
"pattern": pattern,
"region": region,
"warmed_at": datetime.now().isoformat(),
"cache_version": "1.0"
}, ttl_seconds=2400) # 40 minutes for metadata
logger.info(f"Warmed {len(patterns)} alarm metadata patterns")
except Exception as e:
logger.error(f"Error warming alarms metadata cache: {str(e)}")
def _warm_dashboards_metadata(self, cache, region: str = "us-east-1"):
"""Warm cache with common dashboard metadata patterns."""
try:
logger.info(f"Warming CloudWatch dashboards metadata cache for region: {region}")
patterns = [
f"dashboards_list_{region}",
f"dashboard_widgets_{region}",
f"dashboard_metrics_{region}",
f"dashboard_permissions_{region}"
]
for pattern in patterns:
cache_key = ["cloudwatch_metadata", "dashboards", pattern]
self.put(cache_key, {
"metadata_type": "dashboards",
"pattern": pattern,
"region": region,
"warmed_at": datetime.now().isoformat(),
"cache_version": "1.0"
}, ttl_seconds=2400)
logger.info(f"Warmed {len(patterns)} dashboard metadata patterns")
except Exception as e:
logger.error(f"Error warming dashboards metadata cache: {str(e)}")
def _warm_log_groups_metadata(self, cache, region: str = "us-east-1"):
"""Warm cache with common log groups metadata patterns."""
try:
logger.info(f"Warming CloudWatch log groups metadata cache for region: {region}")
patterns = [
f"log_groups_list_{region}",
f"log_group_retention_{region}",
f"log_group_sizes_{region}",
f"log_streams_{region}",
f"log_group_metrics_{region}"
]
for pattern in patterns:
cache_key = ["cloudwatch_metadata", "log_groups", pattern]
self.put(cache_key, {
"metadata_type": "log_groups",
"pattern": pattern,
"region": region,
"warmed_at": datetime.now().isoformat(),
"cache_version": "1.0"
}, ttl_seconds=1800) # 30 minutes for log groups (change more frequently)
logger.info(f"Warmed {len(patterns)} log groups metadata patterns")
except Exception as e:
logger.error(f"Error warming log groups metadata cache: {str(e)}")
def _warm_metrics_metadata(self, cache, region: str = "us-east-1"):
"""Warm cache with common metrics metadata patterns."""
try:
logger.info(f"Warming CloudWatch metrics metadata cache for region: {region}")
patterns = [
f"metrics_list_{region}",
f"custom_metrics_{region}",
f"metrics_namespaces_{region}",
f"metrics_dimensions_{region}",
f"metrics_statistics_{region}"
]
for pattern in patterns:
cache_key = ["cloudwatch_metadata", "metrics", pattern]
self.put(cache_key, {
"metadata_type": "metrics",
"pattern": pattern,
"region": region,
"warmed_at": datetime.now().isoformat(),
"cache_version": "1.0"
}, ttl_seconds=3600) # 1 hour for metrics metadata
logger.info(f"Warmed {len(patterns)} metrics metadata patterns")
except Exception as e:
logger.error(f"Error warming metrics metadata cache: {str(e)}")
def get_alarm_metadata(self, region: str, alarm_name: str = None) -> Optional[Dict[str, Any]]:
"""Get cached alarm metadata with intelligent key generation."""
if alarm_name:
cache_key = ["cloudwatch_metadata", "alarms", f"alarm_{alarm_name}_{region}"]
else:
cache_key = ["cloudwatch_metadata", "alarms", f"alarms_list_{region}"]
return self.get(cache_key)
def put_alarm_metadata(self, region: str, data: Dict[str, Any], alarm_name: str = None, ttl_seconds: int = None):
"""Cache alarm metadata with intelligent key generation."""
if alarm_name:
cache_key = ["cloudwatch_metadata", "alarms", f"alarm_{alarm_name}_{region}"]
else:
cache_key = ["cloudwatch_metadata", "alarms", f"alarms_list_{region}"]
self.put(cache_key, data, ttl_seconds=ttl_seconds or 2400,
tags={"type": "alarm_metadata", "region": region})
def get_dashboard_metadata(self, region: str, dashboard_name: str = None) -> Optional[Dict[str, Any]]:
"""Get cached dashboard metadata with intelligent key generation."""
if dashboard_name:
cache_key = ["cloudwatch_metadata", "dashboards", f"dashboard_{dashboard_name}_{region}"]
else:
cache_key = ["cloudwatch_metadata", "dashboards", f"dashboards_list_{region}"]
return self.get(cache_key)
def put_dashboard_metadata(self, region: str, data: Dict[str, Any], dashboard_name: str = None, ttl_seconds: int = None):
"""Cache dashboard metadata with intelligent key generation."""
if dashboard_name:
cache_key = ["cloudwatch_metadata", "dashboards", f"dashboard_{dashboard_name}_{region}"]
else:
cache_key = ["cloudwatch_metadata", "dashboards", f"dashboards_list_{region}"]
self.put(cache_key, data, ttl_seconds=ttl_seconds or 2400,
tags={"type": "dashboard_metadata", "region": region})
def get_log_group_metadata(self, region: str, log_group_name: str = None) -> Optional[Dict[str, Any]]:
"""Get cached log group metadata with intelligent key generation."""
if log_group_name:
cache_key = ["cloudwatch_metadata", "log_groups", f"log_group_{log_group_name}_{region}"]
else:
cache_key = ["cloudwatch_metadata", "log_groups", f"log_groups_list_{region}"]
return self.get(cache_key)
def put_log_group_metadata(self, region: str, data: Dict[str, Any], log_group_name: str = None, ttl_seconds: int = None):
"""Cache log group metadata with intelligent key generation."""
if log_group_name:
cache_key = ["cloudwatch_metadata", "log_groups", f"log_group_{log_group_name}_{region}"]
else:
cache_key = ["cloudwatch_metadata", "log_groups", f"log_groups_list_{region}"]
self.put(cache_key, data, ttl_seconds=ttl_seconds or 1800,
tags={"type": "log_group_metadata", "region": region})
def get_metrics_metadata(self, region: str, namespace: str = None) -> Optional[Dict[str, Any]]:
"""Get cached metrics metadata with intelligent key generation."""
if namespace:
cache_key = ["cloudwatch_metadata", "metrics", f"metrics_{namespace}_{region}"]
else:
cache_key = ["cloudwatch_metadata", "metrics", f"metrics_list_{region}"]
return self.get(cache_key)
def put_metrics_metadata(self, region: str, data: Dict[str, Any], namespace: str = None, ttl_seconds: int = None):
"""Cache metrics metadata with intelligent key generation."""
if namespace:
cache_key = ["cloudwatch_metadata", "metrics", f"metrics_{namespace}_{region}"]
else:
cache_key = ["cloudwatch_metadata", "metrics", f"metrics_list_{region}"]
self.put(cache_key, data, ttl_seconds=ttl_seconds or 3600,
tags={"type": "metrics_metadata", "region": region})
def invalidate_region_metadata(self, region: str) -> int:
"""Invalidate all metadata for a specific region."""
return self.invalidate_by_tags({"region": region})
def invalidate_metadata_type(self, metadata_type: str) -> int:
"""Invalidate all metadata of a specific type."""
return self.invalidate_by_tags({"type": f"{metadata_type}_metadata"})
class CloudWatchAnalysisCache(IntelligentCache):
"""Specialized cache for CloudWatch analysis results with analysis-specific optimizations."""
def __init__(self):
# Analysis results can be large but are valuable to cache longer
super().__init__(
max_size_mb=150,
max_entries=1500,
default_ttl_seconds=3600, # 1 hour
cleanup_interval_minutes=15
)
# Register analysis-specific warming functions
self.register_warming_function("common_analysis_patterns", self._warm_common_patterns)
self.register_warming_function("cost_analysis_templates", self._warm_cost_templates)
logger.info("CloudWatchAnalysisCache initialized")
def _warm_common_patterns(self, cache, region: str = "us-east-1"):
"""Warm cache with common analysis patterns."""
try:
logger.info(f"Warming common CloudWatch analysis patterns for region: {region}")
patterns = [
f"general_spend_pattern_{region}",
f"logs_optimization_pattern_{region}",
f"metrics_optimization_pattern_{region}",
f"alarms_dashboards_pattern_{region}",
f"comprehensive_pattern_{region}"
]
for pattern in patterns:
cache_key = ["cloudwatch_analysis", "patterns", pattern]
self.put(cache_key, {
"pattern_type": "analysis",
"pattern": pattern,
"region": region,
"warmed_at": datetime.now().isoformat(),
"cache_version": "1.0"
}, ttl_seconds=7200) # 2 hours for patterns
logger.info(f"Warmed {len(patterns)} analysis patterns")
except Exception as e:
logger.error(f"Error warming analysis patterns cache: {str(e)}")
def _warm_cost_templates(self, cache, region: str = "us-east-1"):
"""Warm cache with cost analysis templates."""
try:
logger.info(f"Warming cost analysis templates for region: {region}")
templates = [
f"cost_breakdown_template_{region}",
f"savings_opportunities_template_{region}",
f"cost_trends_template_{region}",
f"optimization_recommendations_template_{region}"
]
for template in templates:
cache_key = ["cloudwatch_analysis", "cost_templates", template]
self.put(cache_key, {
"template_type": "cost_analysis",
"template": template,
"region": region,
"warmed_at": datetime.now().isoformat(),
"cache_version": "1.0"
}, ttl_seconds=5400) # 1.5 hours for cost templates
logger.info(f"Warmed {len(templates)} cost analysis templates")
except Exception as e:
logger.error(f"Error warming cost templates cache: {str(e)}")
def get_analysis_result(self, analysis_type: str, region: str, parameters_hash: str) -> Optional[Dict[str, Any]]:
"""Get cached analysis result with intelligent key generation."""
cache_key = ["cloudwatch_analysis", analysis_type, f"{region}_{parameters_hash}"]
return self.get(cache_key)
def put_analysis_result(self, analysis_type: str, region: str, parameters_hash: str,
result: Dict[str, Any], ttl_seconds: int = None):
"""Cache analysis result with intelligent key generation."""
cache_key = ["cloudwatch_analysis", analysis_type, f"{region}_{parameters_hash}"]
# Different TTL based on analysis type
if not ttl_seconds:
ttl_mapping = {
'general_spend': 3600, # 1 hour
'logs_optimization': 1800, # 30 minutes
'metrics_optimization': 1800, # 30 minutes
'alarms_and_dashboards': 2400, # 40 minutes
'comprehensive': 3600 # 1 hour
}
ttl_seconds = ttl_mapping.get(analysis_type, 1800)
self.put(cache_key, result, ttl_seconds=ttl_seconds,
tags={"type": "analysis_result", "analysis_type": analysis_type, "region": region})
def invalidate_analysis_type(self, analysis_type: str) -> int:
"""Invalidate all cached results for a specific analysis type."""
return self.invalidate_by_tags({"analysis_type": analysis_type})
def invalidate_region_analyses(self, region: str) -> int:
"""Invalidate all cached analyses for a specific region."""
return self.invalidate_by_tags({"region": region})
# Global cache instances
_cloudwatch_metadata_cache = None
_cloudwatch_analysis_cache = None
def get_cloudwatch_metadata_cache() -> CloudWatchMetadataCache:
"""Get the global CloudWatch metadata cache instance."""
global _cloudwatch_metadata_cache
if _cloudwatch_metadata_cache is None:
_cloudwatch_metadata_cache = CloudWatchMetadataCache()
return _cloudwatch_metadata_cache
def get_cloudwatch_analysis_cache() -> CloudWatchAnalysisCache:
"""Get the global CloudWatch analysis cache instance."""
global _cloudwatch_analysis_cache
if _cloudwatch_analysis_cache is None:
_cloudwatch_analysis_cache = CloudWatchAnalysisCache()
return _cloudwatch_analysis_cache
```
--------------------------------------------------------------------------------
/playbooks/cloudtrail/cloudtrail_optimization.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
CloudTrail Optimization Playbook
This playbook checks for multiple management event trails in AWS CloudTrail,
which could represent a cost optimization opportunity.
Includes both core optimization functions and MCP runbook functions.
Multiple trails capturing the same management events can lead to unnecessary costs.
"""
import asyncio
import json
import boto3
import logging
import time
from datetime import datetime
from botocore.exceptions import ClientError
from typing import Dict, List, Any, Optional
from mcp.types import TextContent
from utils.error_handler import ResponseFormatter, handle_aws_error
from utils.documentation_links import add_documentation_links
from utils.memory_manager import get_memory_manager
from utils.performance_monitor import get_performance_monitor
# Configure logging
logger = logging.getLogger(__name__)
class CloudTrailOptimization:
"""
CloudTrail optimization playbook to identify cost-saving opportunities
related to duplicate management event trails.
"""
def __init__(self, region=None):
"""
Initialize the CloudTrail optimization playbook.
Args:
region (str, optional): AWS region to analyze. If None, uses the default region.
"""
self.region = region
self.client = boto3.client('cloudtrail', region_name=region) if region else boto3.client('cloudtrail')
# Initialize performance optimization components
self.memory_manager = get_memory_manager()
self.performance_monitor = get_performance_monitor()
def analyze_trails(self):
"""
Analyze CloudTrail trails to identify multiple management event trails.
Returns:
dict: Analysis results including optimization recommendations.
"""
# Start memory tracking
memory_tracker = self.memory_manager.start_memory_tracking("cloudtrail_analysis")
try:
# Get all trails using pagination
trails = []
next_token = None
while True:
# Prepare pagination parameters
params = {}
if next_token:
params['NextToken'] = next_token
# Make the API call
response = self.client.list_trails(**params)
trails.extend(response.get('Trails', []))
# Check if there are more results
if 'NextToken' in response:
next_token = response['NextToken']
else:
break
# Get detailed information for each trail
management_event_trails = []
for trail in trails:
trail_arn = trail.get('TrailARN')
trail_name = trail.get('Name')
# Get trail status and configuration
trail_info = self.client.get_trail(Name=trail_arn)
trail_status = self.client.get_trail_status(Name=trail_arn)
# Check if the trail is logging management events
event_selectors = self.client.get_event_selectors(TrailName=trail_arn)
has_management_events = False
for selector in event_selectors.get('EventSelectors', []):
# Only include if management events are explicitly enabled
if selector.get('IncludeManagementEvents') is True:
has_management_events = True
break
# Only include trails that actually have management events enabled
if has_management_events:
management_event_trails.append({
'name': trail_name,
'arn': trail_arn,
'is_multi_region': trail_info.get('Trail', {}).get('IsMultiRegionTrail', False),
'is_organization': trail_info.get('Trail', {}).get('IsOrganizationTrail', False),
'logging_enabled': trail_status.get('IsLogging', False),
'region': self.region or 'default'
})
# Analyze results
result = {
'status': 'success',
'analysis_type': 'CloudTrail Optimization',
'timestamp': datetime.now().isoformat(),
'region': self.region or 'default',
'data': {
'total_trails': len(management_event_trails),
'management_event_trails': len(management_event_trails),
'trails_details': management_event_trails
},
'recommendations': []
}
# Generate recommendations based on findings
if len(management_event_trails) > 1:
# Multiple management event trails found - potential optimization opportunity
estimated_savings = (len(management_event_trails) - 1) * 2 # $2 per trail per month after the first one
result['message'] = f"Found {len(management_event_trails)} trails capturing management events. Consider consolidation."
result['recommendations'] = [
"Consolidate multiple management event trails into a single trail to reduce costs",
f"Potential monthly savings: ${estimated_savings:.2f}",
"Ensure the consolidated trail captures all required events and regions",
"Consider using CloudTrail Lake for more cost-effective querying of events"
]
result['optimization_opportunity'] = True
result['estimated_monthly_savings'] = estimated_savings
else:
result['message'] = "No duplicate management event trails found."
result['optimization_opportunity'] = False
result['estimated_monthly_savings'] = 0
# Stop memory tracking and add stats to result
memory_stats = self.memory_manager.stop_memory_tracking("cloudtrail_analysis")
if memory_stats:
result['memory_usage'] = memory_stats
return result
except ClientError as e:
logger.error(f"Error analyzing CloudTrail trails: {e}")
self.memory_manager.stop_memory_tracking("cloudtrail_analysis")
return {
'status': 'error',
'message': f"Failed to analyze CloudTrail trails: {str(e)}",
'error': str(e)
}
def generate_report(self, format='json'):
"""
Generate a CloudTrail optimization report showing only trails with management events.
Args:
format (str): Output format ('json' or 'markdown')
Returns:
dict or str: Report in the specified format
"""
analysis_result = self.analyze_trails()
if format.lower() == 'markdown':
# Generate markdown report
md_report = f"# CloudTrail Optimization Report - Management Events Only\n\n"
md_report += f"**Region**: {analysis_result.get('region', 'All regions')}\n"
md_report += f"**Analysis Date**: {analysis_result.get('timestamp')}\n\n"
# Only show trails with management events enabled
management_trails = analysis_result.get('data', {}).get('trails_details', [])
md_report += f"## Summary\n"
md_report += f"- Trails with management events enabled: {len(management_trails)}\n"
if analysis_result.get('optimization_opportunity', False):
md_report += f"- Optimization opportunity: **YES**\n"
md_report += f"- Estimated monthly savings: **${analysis_result.get('estimated_monthly_savings', 0):.2f}**\n"
else:
md_report += f"- Optimization opportunity: No\n"
if management_trails:
md_report += f"\n## Management Event Trails ({len(management_trails)})\n"
for trail in management_trails:
md_report += f"\n### {trail.get('name')}\n"
md_report += f"- ARN: {trail.get('arn')}\n"
md_report += f"- Multi-region: {'Yes' if trail.get('is_multi_region') else 'No'}\n"
md_report += f"- Organization trail: {'Yes' if trail.get('is_organization') else 'No'}\n"
md_report += f"- Logging enabled: {'Yes' if trail.get('logging_enabled') else 'No'}\n"
if len(management_trails) > 1:
md_report += f"\n## Recommendations\n"
for rec in analysis_result.get('recommendations', []):
md_report += f"- {rec}\n"
else:
md_report += f"\n## Management Event Trails\nNo trails with management events enabled found.\n"
return md_report
else:
# Return JSON format with only management event trails
filtered_result = analysis_result.copy()
filtered_result['data']['trails_shown'] = 'management_events_only'
return filtered_result
def run_cloudtrail_optimization(region=None):
"""
Run the CloudTrail optimization playbook.
Args:
region (str, optional): AWS region to analyze
Returns:
dict: Analysis results
"""
optimizer = CloudTrailOptimization(region=region)
return optimizer.analyze_trails()
def generate_cloudtrail_report(region=None, format='json'):
"""
Generate a CloudTrail optimization report.
Args:
region (str, optional): AWS region to analyze
format (str): Output format ('json' or 'markdown')
Returns:
dict or str: Report in the specified format
"""
optimizer = CloudTrailOptimization(region=region)
return optimizer.generate_report(format=format)
def get_management_trails(region=None):
"""
Get CloudTrail trails that have management events enabled.
Args:
region (str, optional): AWS region to analyze
Returns:
list: List of trails with management events enabled
"""
try:
client = boto3.client('cloudtrail', region_name=region) if region else boto3.client('cloudtrail')
# Get all trails using pagination
trails = []
next_token = None
while True:
# Prepare pagination parameters
params = {}
if next_token:
params['NextToken'] = next_token
# Make the API call
response = client.list_trails(**params)
trails.extend(response.get('Trails', []))
# Check if there are more results
if 'NextToken' in response:
next_token = response['NextToken']
else:
break
management_trails = []
for trail in trails:
trail_arn = trail.get('TrailARN')
trail_name = trail.get('Name')
try:
# Get trail configuration
trail_info = client.get_trail(Name=trail_arn)
# Check event selectors to see if management events are enabled
event_selectors = client.get_event_selectors(TrailName=trail_arn)
has_management_events = False
for selector in event_selectors.get('EventSelectors', []):
# Check if this selector explicitly includes management events
if selector.get('IncludeManagementEvents') is True:
has_management_events = True
break
# Only include trails that actually have management events enabled
if has_management_events:
management_trails.append({
'name': trail_name,
'arn': trail_arn,
'region': region or trail.get('HomeRegion', 'us-east-1'),
'is_multi_region': trail_info.get('Trail', {}).get('IsMultiRegionTrail', False),
'is_organization_trail': trail_info.get('Trail', {}).get('IsOrganizationTrail', False)
})
except ClientError as e:
logger.warning(f"Could not get details for trail {trail_name}: {e}")
continue
print(management_trails)
return management_trails
except ClientError as e:
logger.error(f"Error getting management trails: {e}")
return []
if __name__ == "__main__":
# Run the playbook directly if executed as a script
result = run_cloudtrail_optimization()
print(result)
# MCP Runbook Functions
# These functions provide MCP-compatible interfaces for the CloudTrail optimization playbook
@handle_aws_error
async def get_management_trails_mcp(arguments: Dict[str, Any]) -> List[TextContent]:
"""Get CloudTrail management trails."""
start_time = time.time()
try:
region = arguments.get("region", "us-east-1")
result = get_management_trails(region=region)
# Add documentation links
result = add_documentation_links(result, "cloudtrail")
execution_time = time.time() - start_time
return ResponseFormatter.to_text_content(
ResponseFormatter.success_response(
data=result,
message=f"Retrieved {len(result.get('management_trails', []))} CloudTrail management trails",
analysis_type="cloudtrail_management_trails",
execution_time=execution_time
)
)
except Exception as e:
logger.error(f"Error getting CloudTrail management trails: {str(e)}")
raise
@handle_aws_error
async def run_cloudtrail_trails_analysis_mcp(arguments: Dict[str, Any]) -> List[TextContent]:
"""Run CloudTrail trails analysis."""
start_time = time.time()
try:
region = arguments.get("region", "us-east-1")
result = run_cloudtrail_optimization(region=region)
# Add documentation links
result = add_documentation_links(result, "cloudtrail")
execution_time = time.time() - start_time
return ResponseFormatter.to_text_content(
ResponseFormatter.success_response(
data=result,
message="CloudTrail trails analysis completed successfully",
analysis_type="cloudtrail_analysis",
execution_time=execution_time
)
)
except Exception as e:
logger.error(f"Error in CloudTrail trails analysis: {str(e)}")
raise
@handle_aws_error
async def generate_cloudtrail_report_mcp(arguments: Dict[str, Any]) -> List[TextContent]:
"""Generate CloudTrail optimization report."""
start_time = time.time()
try:
region = arguments.get("region", "us-east-1")
output_format = arguments.get("output_format", "json")
result = generate_cloudtrail_report(region=region, format=output_format)
# Add documentation links
result = add_documentation_links(result, "cloudtrail")
execution_time = time.time() - start_time
return ResponseFormatter.to_text_content(
ResponseFormatter.success_response(
data=result,
message="CloudTrail optimization report generated successfully",
analysis_type="cloudtrail_report",
execution_time=execution_time
)
)
except Exception as e:
logger.error(f"Error generating CloudTrail report: {str(e)}")
raise
```
--------------------------------------------------------------------------------
/utils/service_orchestrator.py:
--------------------------------------------------------------------------------
```python
"""
Service Orchestrator for CFM Tips MCP Server
Coordinates parallel execution of AWS service calls with session storage.
"""
import logging
import time
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime
from .session_manager import get_session_manager
from .parallel_executor import get_parallel_executor, create_task, TaskResult
logger = logging.getLogger(__name__)
class ServiceOrchestrator:
"""Orchestrates AWS service calls with parallel execution and session storage."""
def __init__(self, session_id: Optional[str] = None):
self.session_manager = get_session_manager()
self.parallel_executor = get_parallel_executor()
self.session_id = session_id or self.session_manager.create_session()
logger.info(f"ServiceOrchestrator initialized with session {self.session_id}")
def execute_parallel_analysis(self,
service_calls: List[Dict[str, Any]],
store_results: bool = True,
timeout: float = 60.0) -> Dict[str, Any]:
"""
Execute multiple AWS service calls in parallel and optionally store results.
Args:
service_calls: List of service call definitions
store_results: Whether to store results in session database
timeout: Maximum time to wait for all tasks
Returns:
Dictionary containing execution results and summary
"""
start_time = time.time()
# Create parallel tasks
tasks = []
for i, call_def in enumerate(service_calls):
task_id = f"{call_def['service']}_{call_def['operation']}_{i}_{int(time.time())}"
task = create_task(
task_id=task_id,
service=call_def['service'],
operation=call_def['operation'],
function=call_def['function'],
args=call_def.get('args', ()),
kwargs=call_def.get('kwargs', {}),
timeout=call_def.get('timeout', 30.0),
priority=call_def.get('priority', 1)
)
tasks.append(task)
# Submit tasks for parallel execution
task_ids = self.parallel_executor.submit_batch(tasks)
# Wait for completion
results = self.parallel_executor.wait_for_tasks(task_ids, timeout)
# Process results
execution_summary = {
'total_tasks': len(task_ids),
'successful': 0,
'failed': 0,
'timeout': 0,
'total_execution_time': time.time() - start_time,
'results': {}
}
stored_tables = []
for task_id, result in results.items():
execution_summary['results'][task_id] = {
'service': result.service,
'operation': result.operation,
'status': result.status,
'execution_time': result.execution_time,
'error': result.error
}
# Update counters
if result.status == 'success':
execution_summary['successful'] += 1
# Store successful results in session database
if store_results and result.data:
# Create a safe table name by removing special characters
safe_service = ''.join(c for c in result.service if c.isalnum() or c == '_')
safe_operation = ''.join(c for c in result.operation if c.isalnum() or c == '_')
table_name = f"{safe_service}_{safe_operation}_{int(result.timestamp.timestamp())}"
# Convert result data to list of dictionaries if needed
data_to_store = self._prepare_data_for_storage(result.data, result.service, result.operation)
if data_to_store:
success = self.session_manager.store_data(
self.session_id,
table_name,
data_to_store
)
if success:
stored_tables.append(table_name)
execution_summary['results'][task_id]['stored_table'] = table_name
else:
logger.warning(f"Failed to store data for task {task_id}")
elif result.status == 'timeout':
execution_summary['timeout'] += 1
else:
execution_summary['failed'] += 1
execution_summary['stored_tables'] = stored_tables
logger.info(f"Parallel analysis completed: {execution_summary['successful']}/{execution_summary['total_tasks']} successful")
return execution_summary
def _prepare_data_for_storage(self, data: Any, service: str, operation: str) -> List[Dict[str, Any]]:
"""Prepare data for storage in session database with consistent 'value' column format."""
try:
import json
# Always store data in a consistent format with 'value' column containing JSON
if isinstance(data, dict):
if 'data' in data and isinstance(data['data'], dict):
# Check if data contains lists of items (like underutilized_instances)
data_dict = data['data']
result = []
# Look for common list fields that should be stored as individual records
list_fields = ['underutilized_instances', 'unused_volumes', 'idle_instances', 'underutilized_functions']
for field in list_fields:
if field in data_dict and isinstance(data_dict[field], list):
# Store each item in the list as a separate record
for item in data_dict[field]:
result.append({
'value': json.dumps(item),
'service': service,
'operation': operation,
'item_type': field
})
break # Only process the first matching field
# If no list fields found, store the entire data dict
if not result:
result.append({
'value': json.dumps(data_dict),
'service': service,
'operation': operation
})
return result
elif 'data' in data and isinstance(data['data'], list):
# Standard service response format - store each item as JSON in value column
result = []
for item in data['data']:
result.append({
'value': json.dumps(item),
'service': service,
'operation': operation
})
return result
else:
# Store entire dictionary as JSON
return [{
'value': json.dumps(data),
'service': service,
'operation': operation
}]
elif isinstance(data, list):
# Store each list item as JSON in value column
result = []
for i, item in enumerate(data):
result.append({
'value': json.dumps(item),
'service': service,
'operation': operation,
'index': i
})
return result
else:
# Single value - store as JSON
return [{
'value': json.dumps(data),
'service': service,
'operation': operation
}]
except Exception as e:
logger.error(f"Error preparing data for storage: {e}")
return []
def query_session_data(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
"""Execute SQL query on session data."""
try:
return self.session_manager.execute_query(self.session_id, query, params)
except Exception as e:
logger.error(f"Error executing session query: {e}")
return []
def get_stored_tables(self) -> List[str]:
"""Get list of tables stored in the session."""
try:
results = self.query_session_data(
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
)
return [row['name'] for row in results]
except Exception as e:
logger.error(f"Error getting stored tables: {e}")
return []
def _fix_wildcard_query(self, query_sql: str) -> str:
"""Fix wildcard table names in SQL queries by replacing with actual table names."""
try:
# Get all table names in the session
stored_tables = self.get_stored_tables()
# Common wildcard patterns to replace
wildcard_patterns = [
'ec2_underutilized_instances_*',
'ebs_underutilized_volumes_*',
'ebs_unused_volumes_*',
'rds_underutilized_instances_*',
'rds_idle_instances_*',
'lambda_underutilized_functions_*',
'lambda_unused_functions_*'
]
fixed_query = query_sql
for pattern in wildcard_patterns:
if pattern in fixed_query:
# Find matching table names
prefix = pattern.replace('_*', '_')
matching_tables = [table for table in stored_tables if table.startswith(prefix)]
if matching_tables:
# Use the first matching table (most recent)
actual_table = matching_tables[0]
fixed_query = fixed_query.replace(pattern, f'"{actual_table}"')
logger.debug(f"Replaced {pattern} with {actual_table}")
else:
# If no matching table, create a dummy query that returns empty results
logger.warning(f"No matching table found for pattern {pattern}")
# Replace the entire FROM clause with a dummy table
fixed_query = fixed_query.replace(
f"FROM {pattern}",
"FROM (SELECT NULL as value WHERE 1=0)"
)
return fixed_query
except Exception as e:
logger.error(f"Error fixing wildcard query: {e}")
return query_sql
def aggregate_results(self, aggregation_queries: List[Dict[str, str]]) -> Dict[str, Any]:
"""Execute aggregation queries across stored data."""
aggregated_results = {}
for query_def in aggregation_queries:
query_name = query_def['name']
query_sql = query_def['query']
try:
# Fix wildcard patterns in the query
fixed_query = self._fix_wildcard_query(query_sql)
results = self.query_session_data(fixed_query)
aggregated_results[query_name] = {
'status': 'success',
'data': results,
'row_count': len(results)
}
except Exception as e:
aggregated_results[query_name] = {
'status': 'error',
'error': str(e),
'data': []
}
logger.error(f"Error executing aggregation query '{query_name}': {e}")
return aggregated_results
def create_comprehensive_report(self,
service_calls: List[Dict[str, Any]],
aggregation_queries: Optional[List[Dict[str, str]]] = None) -> Dict[str, Any]:
"""Create a comprehensive report with parallel execution and aggregation."""
# Execute parallel analysis
execution_results = self.execute_parallel_analysis(service_calls)
# Execute aggregation queries if provided
aggregated_data = {}
if aggregation_queries:
aggregated_data = self.aggregate_results(aggregation_queries)
# Create comprehensive report
report = {
'report_metadata': {
'session_id': self.session_id,
'generated_at': datetime.now().isoformat(),
'execution_summary': execution_results,
'stored_tables': self.get_stored_tables()
},
'service_results': execution_results['results'],
'aggregated_analysis': aggregated_data,
'recommendations': self._generate_recommendations(execution_results, aggregated_data)
}
return report
def _generate_recommendations(self,
execution_results: Dict[str, Any],
aggregated_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate recommendations based on analysis results."""
recommendations = []
# Analyze execution results for patterns
successful_services = []
failed_services = []
for task_id, result in execution_results['results'].items():
if result['status'] == 'success':
successful_services.append(result['service'])
else:
failed_services.append(result['service'])
# Generate recommendations based on successful analyses
if successful_services:
recommendations.append({
'type': 'analysis_success',
'priority': 'info',
'title': 'Successful Service Analysis',
'description': f"Successfully analyzed {len(set(successful_services))} AWS services",
'services': list(set(successful_services))
})
# Generate recommendations for failed analyses
if failed_services:
recommendations.append({
'type': 'analysis_failure',
'priority': 'warning',
'title': 'Failed Service Analysis',
'description': f"Failed to analyze {len(set(failed_services))} AWS services",
'services': list(set(failed_services)),
'action': 'Review service permissions and configuration'
})
# Add aggregation-based recommendations
for query_name, query_result in aggregated_data.items():
if query_result['status'] == 'success' and query_result['row_count'] > 0:
recommendations.append({
'type': 'data_insight',
'priority': 'medium',
'title': f'Data Available: {query_name}',
'description': f"Found {query_result['row_count']} records for analysis",
'query': query_name
})
return recommendations
def cleanup_session(self):
"""Clean up the current session."""
try:
self.session_manager.close_session(self.session_id)
logger.info(f"Cleaned up session {self.session_id}")
except Exception as e:
logger.error(f"Error cleaning up session {self.session_id}: {e}")
def get_session_info(self) -> Dict[str, Any]:
"""Get information about the current session."""
return self.session_manager.get_session_info(self.session_id)
```
--------------------------------------------------------------------------------
/playbooks/s3/base_analyzer.py:
--------------------------------------------------------------------------------
```python
"""
Base Analyzer Interface for S3 Optimization
Abstract base class defining the interface for all S3 optimization analyzers.
"""
import logging
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from datetime import datetime
logger = logging.getLogger(__name__)
class BaseAnalyzer(ABC):
"""Base class for all S3 analyzers."""
def __init__(self, s3_service=None, pricing_service=None, storage_lens_service=None,
performance_monitor=None, memory_manager=None):
"""
Initialize BaseAnalyzer with performance optimization components.
Args:
s3_service: S3Service instance for AWS S3 operations
pricing_service: S3Pricing instance for cost calculations
storage_lens_service: StorageLensService instance for Storage Lens data
performance_monitor: Performance monitoring instance
memory_manager: Memory management instance
"""
self.s3_service = s3_service
self.pricing_service = pricing_service
self.storage_lens_service = storage_lens_service
self.performance_monitor = performance_monitor
self.memory_manager = memory_manager
self.logger = logging.getLogger(self.__class__.__name__)
# Analysis metadata
self.analysis_type = self.__class__.__name__.replace('Analyzer', '').lower()
self.version = "1.0.0"
self.last_execution = None
self.execution_count = 0
@abstractmethod
async def analyze(self, **kwargs) -> Dict[str, Any]:
"""
Execute the analysis.
Args:
**kwargs: Analysis-specific parameters
Returns:
Dictionary containing analysis results
"""
pass
@abstractmethod
def get_recommendations(self, analysis_results: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Generate recommendations from analysis results.
Args:
analysis_results: Results from the analyze method
Returns:
List of recommendation dictionaries
"""
pass
def validate_parameters(self, **kwargs) -> Dict[str, Any]:
"""
Validate input parameters for the analysis.
Args:
**kwargs: Parameters to validate
Returns:
Dictionary with validation results
"""
validation_result = {
"valid": True,
"errors": [],
"warnings": []
}
# Common parameter validation
region = kwargs.get('region')
if region and not isinstance(region, str):
validation_result["errors"].append("Region must be a string")
validation_result["valid"] = False
lookback_days = kwargs.get('lookback_days', 30)
if not isinstance(lookback_days, int) or lookback_days <= 0:
validation_result["errors"].append("lookback_days must be a positive integer")
validation_result["valid"] = False
elif lookback_days > 365:
validation_result["warnings"].append("lookback_days > 365 may result in large datasets")
bucket_names = kwargs.get('bucket_names')
if bucket_names is not None:
if not isinstance(bucket_names, list):
validation_result["errors"].append("bucket_names must be a list")
validation_result["valid"] = False
elif not all(isinstance(name, str) for name in bucket_names):
validation_result["errors"].append("All bucket names must be strings")
validation_result["valid"] = False
timeout_seconds = kwargs.get('timeout_seconds', 60)
if not isinstance(timeout_seconds, (int, float)) or timeout_seconds <= 0:
validation_result["errors"].append("timeout_seconds must be a positive number")
validation_result["valid"] = False
return validation_result
def prepare_analysis_context(self, **kwargs) -> Dict[str, Any]:
"""
Prepare analysis context with common parameters.
Args:
**kwargs: Analysis parameters
Returns:
Dictionary containing analysis context
"""
context = {
"analysis_type": self.analysis_type,
"analyzer_version": self.version,
"region": kwargs.get('region'),
"session_id": kwargs.get('session_id'),
"lookback_days": kwargs.get('lookback_days', 30),
"include_cost_analysis": kwargs.get('include_cost_analysis', True),
"bucket_names": kwargs.get('bucket_names'),
"timeout_seconds": kwargs.get('timeout_seconds', 60),
"started_at": datetime.now().isoformat(),
"execution_id": f"{self.analysis_type}_{int(datetime.now().timestamp())}"
}
return context
def handle_analysis_error(self, error: Exception, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle analysis errors with consistent error reporting.
Args:
error: Exception that occurred
context: Analysis context
Returns:
Dictionary containing error information
"""
error_message = str(error)
self.logger.error(f"Analysis error in {self.analysis_type}: {error_message}")
return {
"status": "error",
"analysis_type": self.analysis_type,
"error_message": error_message,
"error_type": error.__class__.__name__,
"context": context,
"timestamp": datetime.now().isoformat(),
"recommendations": [{
"type": "error_resolution",
"priority": "high",
"title": f"Analysis Error: {self.analysis_type}",
"description": f"Analysis failed with error: {error_message}",
"action_items": [
"Check AWS credentials and permissions",
"Verify network connectivity",
"Review input parameters",
"Check service quotas and limits"
]
}]
}
def create_recommendation(self,
rec_type: str,
priority: str,
title: str,
description: str,
potential_savings: Optional[float] = None,
implementation_effort: str = "medium",
affected_resources: Optional[List[str]] = None,
action_items: Optional[List[str]] = None) -> Dict[str, Any]:
"""
Create a standardized recommendation dictionary.
Args:
rec_type: Type of recommendation
priority: Priority level (high, medium, low)
title: Recommendation title
description: Detailed description
potential_savings: Estimated cost savings
implementation_effort: Implementation effort level
affected_resources: List of affected resources
action_items: List of action items
Returns:
Standardized recommendation dictionary
"""
recommendation = {
"type": rec_type,
"priority": priority,
"title": title,
"description": description,
"implementation_effort": implementation_effort,
"analyzer": self.analysis_type,
"created_at": datetime.now().isoformat()
}
if potential_savings is not None:
recommendation["potential_savings"] = potential_savings
recommendation["potential_savings_formatted"] = f"${potential_savings:.2f}"
if affected_resources:
recommendation["affected_resources"] = affected_resources
recommendation["resource_count"] = len(affected_resources)
if action_items:
recommendation["action_items"] = action_items
return recommendation
def log_analysis_start(self, context: Dict[str, Any]):
"""Log analysis start with context and performance monitoring."""
self.execution_count += 1
self.last_execution = datetime.now()
self.logger.info(f"Starting {self.analysis_type} analysis (execution #{self.execution_count})")
self.logger.debug(f"Analysis context: {context}")
# Record performance metrics if available
if self.performance_monitor:
self.performance_monitor.record_metric(
f"analyzer_{self.analysis_type}_started",
1,
tags={"analyzer": self.analysis_type, "execution": str(self.execution_count)}
)
def log_analysis_complete(self, context: Dict[str, Any], result: Dict[str, Any]):
"""Log analysis completion with results summary and performance metrics."""
status = result.get('status', 'unknown')
execution_time = result.get('execution_time', 0)
recommendation_count = len(result.get('recommendations', []))
self.logger.info(
f"Completed {self.analysis_type} analysis - "
f"Status: {status}, Time: {execution_time:.2f}s, "
f"Recommendations: {recommendation_count}"
)
# Record performance metrics if available
if self.performance_monitor:
self.performance_monitor.record_metric(
f"analyzer_{self.analysis_type}_completed",
1,
tags={
"analyzer": self.analysis_type,
"status": status,
"execution": str(self.execution_count)
}
)
self.performance_monitor.record_metric(
f"analyzer_{self.analysis_type}_execution_time",
execution_time,
tags={"analyzer": self.analysis_type}
)
self.performance_monitor.record_metric(
f"analyzer_{self.analysis_type}_recommendations",
recommendation_count,
tags={"analyzer": self.analysis_type}
)
def get_analyzer_info(self) -> Dict[str, Any]:
"""Get information about this analyzer."""
return {
"analysis_type": self.analysis_type,
"class_name": self.__class__.__name__,
"version": self.version,
"execution_count": self.execution_count,
"last_execution": self.last_execution.isoformat() if self.last_execution else None,
"services": {
"s3_service": self.s3_service is not None,
"pricing_service": self.pricing_service is not None,
"storage_lens_service": self.storage_lens_service is not None
}
}
async def execute_with_error_handling(self, **kwargs) -> Dict[str, Any]:
"""
Execute analysis with comprehensive error handling and performance monitoring.
Args:
**kwargs: Analysis parameters
Returns:
Dictionary containing analysis results or error information
"""
context = self.prepare_analysis_context(**kwargs)
# Start memory tracking if memory manager is available
memory_tracker_name = None
if self.memory_manager:
memory_tracker_name = f"analyzer_{self.analysis_type}_{int(datetime.now().timestamp())}"
self.memory_manager.start_memory_tracking(memory_tracker_name)
try:
# Validate parameters
validation = self.validate_parameters(**kwargs)
if not validation["valid"]:
error_result = {
"status": "error",
"analysis_type": self.analysis_type,
"error_message": "Parameter validation failed",
"validation_errors": validation["errors"],
"context": context,
"timestamp": datetime.now().isoformat()
}
# Stop memory tracking
if memory_tracker_name and self.memory_manager:
memory_stats = self.memory_manager.stop_memory_tracking(memory_tracker_name)
if memory_stats:
error_result["memory_usage"] = memory_stats
return error_result
# Log warnings if any
for warning in validation.get("warnings", []):
self.logger.warning(f"Parameter warning: {warning}")
# Log analysis start
self.log_analysis_start(context)
# Register large object for memory management if available
if self.memory_manager:
try:
self.memory_manager.register_large_object(
f"analysis_context_{self.analysis_type}_{int(datetime.now().timestamp())}",
context,
size_mb=0.1, # Small object
cleanup_callback=lambda: self.logger.debug(f"Cleaned up {self.analysis_type} analysis context")
)
except Exception as e:
self.logger.warning(f"Could not register large object with memory manager: {str(e)}")
# Execute analysis
result = await self.analyze(**kwargs)
# Ensure result has required fields
if "status" not in result:
result["status"] = "success"
if "analysis_type" not in result:
result["analysis_type"] = self.analysis_type
if "timestamp" not in result:
result["timestamp"] = datetime.now().isoformat()
# Generate recommendations if not present
if "recommendations" not in result:
result["recommendations"] = self.get_recommendations(result)
# Add memory usage statistics
if memory_tracker_name and self.memory_manager:
memory_stats = self.memory_manager.stop_memory_tracking(memory_tracker_name)
if memory_stats:
result["memory_usage"] = memory_stats
# Log completion
self.log_analysis_complete(context, result)
return result
except Exception as e:
# Stop memory tracking on error
if memory_tracker_name and self.memory_manager:
memory_stats = self.memory_manager.stop_memory_tracking(memory_tracker_name)
error_result = self.handle_analysis_error(e, context)
# Add memory stats to error result if available
if memory_tracker_name and self.memory_manager and 'memory_stats' in locals():
error_result["memory_usage"] = memory_stats
return error_result
class AnalyzerRegistry:
"""Registry for managing analyzer instances."""
def __init__(self):
self._analyzers: Dict[str, BaseAnalyzer] = {}
self.logger = logging.getLogger(__name__)
def register(self, analyzer: BaseAnalyzer):
"""Register an analyzer instance."""
analysis_type = analyzer.analysis_type
self._analyzers[analysis_type] = analyzer
self.logger.info(f"Registered analyzer: {analysis_type}")
def get(self, analysis_type: str) -> Optional[BaseAnalyzer]:
"""Get an analyzer by type."""
return self._analyzers.get(analysis_type)
def list_analyzers(self) -> List[str]:
"""List all registered analyzer types."""
return list(self._analyzers.keys())
def get_analyzer_info(self) -> Dict[str, Any]:
"""Get information about all registered analyzers."""
return {
analysis_type: analyzer.get_analyzer_info()
for analysis_type, analyzer in self._analyzers.items()
}
# Global analyzer registry
_analyzer_registry = AnalyzerRegistry()
def get_analyzer_registry() -> AnalyzerRegistry:
"""Get the global analyzer registry."""
return _analyzer_registry
```
--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------
```python
"""
Pytest configuration and fixtures for S3 optimization testing.
This module provides common fixtures and configuration for all tests,
including mocked AWS services, test data factories, and performance monitoring.
"""
import pytest
import asyncio
import logging
from unittest.mock import Mock, MagicMock, patch
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import boto3
from moto import mock_aws
import json
# Configure logging for tests
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
@pytest.fixture(scope="session")
def event_loop():
"""Create an instance of the default event loop for the test session."""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture
def mock_aws_credentials():
"""Mock AWS credentials for testing."""
with patch.dict('os.environ', {
'AWS_ACCESS_KEY_ID': 'testing',
'AWS_SECRET_ACCESS_KEY': 'testing',
'AWS_SECURITY_TOKEN': 'testing',
'AWS_SESSION_TOKEN': 'testing',
'AWS_DEFAULT_REGION': 'us-east-1'
}):
yield
@pytest.fixture
def mock_s3_client(mock_aws_credentials):
"""Mock S3 client with moto."""
with mock_aws():
yield boto3.client('s3', region_name='us-east-1')
@pytest.fixture
def mock_cloudwatch_client(mock_aws_credentials):
"""Mock CloudWatch client with moto."""
with mock_aws():
yield boto3.client('cloudwatch', region_name='us-east-1')
@pytest.fixture
def mock_ce_client(mock_aws_credentials):
"""Mock Cost Explorer client with moto."""
with mock_aws():
yield boto3.client('ce', region_name='us-east-1')
@pytest.fixture
def mock_s3control_client(mock_aws_credentials):
"""Mock S3 Control client with moto."""
with mock_aws():
yield boto3.client('s3control', region_name='us-east-1')
@pytest.fixture
def sample_buckets():
"""Sample bucket data for testing."""
return [
{
"Name": "test-bucket-1",
"CreationDate": datetime.now() - timedelta(days=30),
"Region": "us-east-1"
},
{
"Name": "test-bucket-2",
"CreationDate": datetime.now() - timedelta(days=60),
"Region": "us-west-2"
},
{
"Name": "test-bucket-3",
"CreationDate": datetime.now() - timedelta(days=90),
"Region": "eu-west-1"
}
]
@pytest.fixture
def sample_cost_explorer_data():
"""Sample Cost Explorer response data."""
return {
"ResultsByTime": [
{
"TimePeriod": {
"Start": "2024-01-01",
"End": "2024-01-02"
},
"Groups": [
{
"Keys": ["S3-Storage-Standard"],
"Metrics": {
"UnblendedCost": {"Amount": "10.50", "Unit": "USD"},
"UsageQuantity": {"Amount": "1000", "Unit": "GB-Mo"}
}
},
{
"Keys": ["S3-Storage-StandardIA"],
"Metrics": {
"UnblendedCost": {"Amount": "5.25", "Unit": "USD"},
"UsageQuantity": {"Amount": "500", "Unit": "GB-Mo"}
}
}
]
}
]
}
@pytest.fixture
def sample_storage_lens_config():
"""Sample Storage Lens configuration."""
return {
"StorageLensConfiguration": {
"Id": "test-config",
"AccountLevel": {
"StorageMetrics": {"IsEnabled": True},
"BucketLevel": {
"ActivityMetrics": {"IsEnabled": True},
"CostOptimizationMetrics": {"IsEnabled": True},
"DetailedStatusCodesMetrics": {"IsEnabled": False},
"AdvancedCostOptimizationMetrics": {"IsEnabled": False},
"AdvancedDataProtectionMetrics": {"IsEnabled": False}
}
},
"IsEnabled": True,
"DataExport": {
"S3BucketDestination": {
"Bucket": "storage-lens-export-bucket",
"Prefix": "exports/",
"Format": "CSV"
}
}
}
}
@pytest.fixture
def mock_s3_service():
"""Mock S3Service instance."""
service = Mock()
service.region = "us-east-1"
service._operation_call_count = {}
# Mock async methods
async def mock_list_buckets():
return {
"status": "success",
"data": {
"Buckets": [
{"Name": "test-bucket-1", "CreationDate": datetime.now(), "Region": "us-east-1"},
{"Name": "test-bucket-2", "CreationDate": datetime.now(), "Region": "us-west-2"}
]
}
}
async def mock_get_bucket_metrics(bucket_name, days=30):
return {
"status": "success",
"data": {
"SizeBytes": 1000000000,
"SizeGB": 1.0,
"ObjectCount": 1000,
"AllRequests": 5000,
"GetRequests": 4000,
"PutRequests": 1000
}
}
service.list_buckets = mock_list_buckets
service.get_bucket_metrics = mock_get_bucket_metrics
service.get_operation_stats = Mock(return_value={"list_buckets": 1, "get_bucket_location": 2})
return service
@pytest.fixture
def mock_storage_lens_service():
"""Mock StorageLensService instance."""
service = Mock()
service.account_id = "123456789012"
service.region = "us-east-1"
# Mock async methods
async def mock_get_storage_metrics(config_id="default-account-dashboard"):
return {
"status": "success",
"data": {
"ConfigurationId": config_id,
"AccountId": "123456789012",
"IsEnabled": True,
"CostOptimizationMetrics": True,
"DetailedStatusCodesMetrics": False
}
}
async def mock_get_cost_optimization_metrics(config_id="default-account-dashboard"):
return {
"status": "success",
"data": {
"ConfigurationId": config_id,
"CostOptimizationMetricsEnabled": True,
"MultipartUploadTrackingAvailable": True
}
}
service.get_storage_metrics = mock_get_storage_metrics
service.get_cost_optimization_metrics = mock_get_cost_optimization_metrics
return service
@pytest.fixture
def mock_pricing_service():
"""Mock S3Pricing service instance."""
service = Mock()
service.region = "us-east-1"
def mock_get_storage_pricing():
return {
"status": "success",
"storage_pricing": {
"STANDARD": 0.023,
"STANDARD_IA": 0.0125,
"ONEZONE_IA": 0.01,
"GLACIER": 0.004,
"DEEP_ARCHIVE": 0.00099
}
}
def mock_estimate_request_costs(requests):
total_cost = sum(count * 0.0004 for count in requests.values())
return {
"status": "success",
"total_cost": total_cost,
"breakdown": {req_type: count * 0.0004 for req_type, count in requests.items()}
}
service.get_storage_pricing = mock_get_storage_pricing
service.estimate_request_costs = mock_estimate_request_costs
return service
@pytest.fixture
def mock_performance_monitor():
"""Mock performance monitor."""
monitor = Mock()
monitor.start_analysis_monitoring = Mock(return_value="test_session_123")
monitor.end_analysis_monitoring = Mock()
monitor.record_metric = Mock()
monitor.record_cache_hit = Mock()
monitor.record_cache_miss = Mock()
return monitor
@pytest.fixture
def mock_memory_manager():
"""Mock memory manager."""
manager = Mock()
manager.start_memory_tracking = Mock(return_value="test_tracker_123")
manager.stop_memory_tracking = Mock(return_value={"peak_memory_mb": 50.0, "avg_memory_mb": 30.0})
manager.register_large_object = Mock()
manager.add_cache_reference = Mock()
manager.set_performance_monitor = Mock()
return manager
@pytest.fixture
def mock_timeout_handler():
"""Mock timeout handler."""
handler = Mock()
handler.get_timeout_for_analysis = Mock(return_value=60.0)
handler.record_execution_time = Mock()
handler.get_complexity_level = Mock(return_value="medium")
handler.set_performance_monitor = Mock()
return handler
@pytest.fixture
def mock_cache():
"""Mock cache instance."""
cache = Mock()
cache.get = Mock(return_value=None) # Cache miss by default
cache.put = Mock()
cache.invalidate = Mock()
cache.set_performance_monitor = Mock()
return cache
@pytest.fixture
def mock_service_orchestrator():
"""Mock ServiceOrchestrator instance."""
orchestrator = Mock()
orchestrator.session_id = "test_session_123"
orchestrator.execute_parallel_analysis = Mock(return_value={
"status": "success",
"successful": 5,
"total_tasks": 6,
"results": {
"general_spend": {"status": "success", "data": {}},
"storage_class": {"status": "success", "data": {}},
"archive_optimization": {"status": "success", "data": {}},
"api_cost": {"status": "success", "data": {}},
"multipart_cleanup": {"status": "success", "data": {}}
},
"stored_tables": ["general_spend_results", "storage_class_results"]
})
orchestrator.query_session_data = Mock(return_value=[])
orchestrator.get_stored_tables = Mock(return_value=["test_table_1", "test_table_2"])
return orchestrator
@pytest.fixture
def cost_constraint_validator():
"""Fixture for validating no-cost constraints."""
class CostConstraintValidator:
def __init__(self):
self.forbidden_operations = {
'list_objects', 'list_objects_v2', 'head_object',
'get_object', 'get_object_attributes', 'select_object_content'
}
self.allowed_operations = {
'list_buckets', 'get_bucket_location', 'get_bucket_lifecycle_configuration',
'get_bucket_versioning', 'get_bucket_tagging', 'list_multipart_uploads'
}
self.operation_calls = []
def validate_operation(self, operation_name: str):
"""Validate that an operation doesn't incur costs."""
self.operation_calls.append(operation_name)
if operation_name in self.forbidden_operations:
raise ValueError(f"FORBIDDEN: Operation {operation_name} would incur costs")
return operation_name in self.allowed_operations
def get_operation_summary(self):
"""Get summary of operations called."""
return {
"total_operations": len(self.operation_calls),
"unique_operations": len(set(self.operation_calls)),
"operations": list(set(self.operation_calls)),
"forbidden_called": [op for op in self.operation_calls if op in self.forbidden_operations],
"allowed_called": [op for op in self.operation_calls if op in self.allowed_operations]
}
return CostConstraintValidator()
@pytest.fixture
def performance_test_data():
"""Test data for performance testing."""
return {
"small_dataset": {
"bucket_count": 5,
"object_count_per_bucket": 100,
"expected_max_time": 10.0
},
"medium_dataset": {
"bucket_count": 50,
"object_count_per_bucket": 1000,
"expected_max_time": 30.0
},
"large_dataset": {
"bucket_count": 500,
"object_count_per_bucket": 10000,
"expected_max_time": 120.0
}
}
@pytest.fixture
def timeout_test_scenarios():
"""Test scenarios for timeout handling."""
return {
"quick_analysis": {
"analysis_type": "general_spend",
"timeout_seconds": 5.0,
"expected_behavior": "complete"
},
"medium_analysis": {
"analysis_type": "comprehensive",
"timeout_seconds": 30.0,
"expected_behavior": "complete"
},
"timeout_scenario": {
"analysis_type": "comprehensive",
"timeout_seconds": 1.0,
"expected_behavior": "timeout"
}
}
class TestDataFactory:
"""Factory for creating test data."""
@staticmethod
def create_bucket_data(count: int = 3) -> List[Dict[str, Any]]:
"""Create sample bucket data."""
buckets = []
for i in range(count):
buckets.append({
"Name": f"test-bucket-{i+1}",
"CreationDate": datetime.now() - timedelta(days=30*(i+1)),
"Region": ["us-east-1", "us-west-2", "eu-west-1"][i % 3]
})
return buckets
@staticmethod
def create_cost_data(days: int = 30) -> Dict[str, Any]:
"""Create sample cost data."""
results = []
for i in range(days):
date = (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
results.append({
"TimePeriod": {"Start": date, "End": date},
"Groups": [
{
"Keys": ["S3-Storage-Standard"],
"Metrics": {
"UnblendedCost": {"Amount": str(10.0 + i * 0.5), "Unit": "USD"},
"UsageQuantity": {"Amount": str(1000 + i * 10), "Unit": "GB-Mo"}
}
}
]
})
return {"ResultsByTime": results}
@staticmethod
def create_analysis_result(analysis_type: str, status: str = "success") -> Dict[str, Any]:
"""Create sample analysis result."""
return {
"status": status,
"analysis_type": analysis_type,
"data": {
"total_cost": 100.0,
"optimization_opportunities": 3,
"potential_savings": 25.0
},
"recommendations": [
{
"type": "cost_optimization",
"priority": "high",
"title": f"Optimize {analysis_type}",
"potential_savings": 25.0
}
],
"execution_time": 5.0,
"timestamp": datetime.now().isoformat()
}
@pytest.fixture
def test_data_factory():
"""Test data factory fixture."""
return TestDataFactory()
# Performance testing utilities
class PerformanceTracker:
"""Track performance metrics during tests."""
def __init__(self):
self.metrics = {}
self.start_times = {}
def start_timer(self, name: str):
"""Start timing an operation."""
self.start_times[name] = datetime.now()
def end_timer(self, name: str) -> float:
"""End timing and return duration."""
if name in self.start_times:
duration = (datetime.now() - self.start_times[name]).total_seconds()
self.metrics[name] = duration
return duration
return 0.0
def get_metrics(self) -> Dict[str, float]:
"""Get all recorded metrics."""
return self.metrics.copy()
def assert_performance(self, name: str, max_time: float):
"""Assert that an operation completed within time limit."""
if name in self.metrics:
assert self.metrics[name] <= max_time, f"Operation {name} took {self.metrics[name]:.2f}s, expected <= {max_time}s"
else:
raise ValueError(f"No timing data for operation: {name}")
@pytest.fixture
def performance_tracker():
"""Performance tracker fixture."""
return PerformanceTracker()
# Async test utilities
def async_test(coro):
"""Decorator to run async tests."""
def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro(*args, **kwargs))
return wrapper
# Test markers
pytest.mark.unit = pytest.mark.unit
pytest.mark.integration = pytest.mark.integration
pytest.mark.performance = pytest.mark.performance
pytest.mark.no_cost_validation = pytest.mark.no_cost_validation
```
--------------------------------------------------------------------------------
/tests/unit/services/test_cloudwatch_service.py:
--------------------------------------------------------------------------------
```python
"""
Unit tests for CloudWatchService.
Tests cost-aware operations, error handling, fallback mechanisms,
and comprehensive CloudWatch API integration with strict cost constraint validation.
"""
import pytest
import asyncio
from unittest.mock import patch, MagicMock, AsyncMock
from datetime import datetime, timedelta, timezone
import json
from services.cloudwatch_service import (
CloudWatchService, CloudWatchServiceConfig, CloudWatchOperationResult,
create_cloudwatch_service
)
from playbooks.cloudwatch.cost_controller import CostPreferences
from botocore.exceptions import ClientError
@pytest.mark.unit
class TestCloudWatchServiceConfig:
"""Test CloudWatchServiceConfig dataclass."""
def test_default_config(self):
"""Test default configuration values."""
config = CloudWatchServiceConfig()
assert config.region is None
assert config.max_retries == 3
assert config.retry_delay == 1.0
assert config.timeout_seconds == 30.0
assert config.enable_cost_tracking is True
assert config.enable_fallback is True
assert config.cost_preferences is None
def test_custom_config(self):
"""Test custom configuration values."""
cost_prefs = CostPreferences(allow_cost_explorer=True)
config = CloudWatchServiceConfig(
region='us-west-2',
max_retries=5,
retry_delay=2.0,
timeout_seconds=60.0,
enable_cost_tracking=False,
enable_fallback=False,
cost_preferences=cost_prefs
)
assert config.region == 'us-west-2'
assert config.max_retries == 5
assert config.retry_delay == 2.0
assert config.timeout_seconds == 60.0
assert config.enable_cost_tracking is False
assert config.enable_fallback is False
assert config.cost_preferences == cost_prefs
@pytest.mark.unit
class TestCloudWatchOperationResult:
"""Test CloudWatchOperationResult dataclass."""
def test_default_result(self):
"""Test default operation result values."""
result = CloudWatchOperationResult(success=True)
assert result.success is True
assert result.data is None
assert result.error_message is None
assert result.operation_name == ""
assert result.cost_incurred is False
assert result.operation_type == "free"
assert result.execution_time == 0.0
assert result.fallback_used is False
assert result.primary_data_source == "cloudwatch_api"
assert result.api_calls_made == []
def test_custom_result(self):
"""Test custom operation result values."""
result = CloudWatchOperationResult(
success=False,
data={'test': 'data'},
error_message='Test error',
operation_name='test_operation',
cost_incurred=True,
operation_type='paid',
execution_time=1.5,
fallback_used=True,
primary_data_source='cost_explorer',
api_calls_made=['test_api_call']
)
assert result.success is False
assert result.data == {'test': 'data'}
assert result.error_message == 'Test error'
assert result.operation_name == 'test_operation'
assert result.cost_incurred is True
assert result.operation_type == 'paid'
assert result.execution_time == 1.5
assert result.fallback_used is True
assert result.primary_data_source == 'cost_explorer'
assert result.api_calls_made == ['test_api_call']
@pytest.mark.unit
class TestCloudWatchService:
"""Test CloudWatchService factory and lifecycle management."""
@pytest.fixture
def mock_boto3_clients(self):
"""Mock boto3 clients for testing."""
with patch('services.cloudwatch_service.boto3') as mock_boto3:
mock_cloudwatch = MagicMock()
mock_logs = MagicMock()
mock_ce = MagicMock()
mock_pricing = MagicMock()
mock_boto3.client.side_effect = lambda service, **kwargs: {
'cloudwatch': mock_cloudwatch,
'logs': mock_logs,
'ce': mock_ce,
'pricing': mock_pricing
}[service]
yield {
'cloudwatch': mock_cloudwatch,
'logs': mock_logs,
'ce': mock_ce,
'pricing': mock_pricing
}
@pytest.fixture
def cloudwatch_service(self, mock_boto3_clients):
"""Create CloudWatchService instance for testing."""
config = CloudWatchServiceConfig(region='us-east-1')
return CloudWatchService(config)
def test_initialization(self, cloudwatch_service):
"""Test CloudWatchService initialization."""
assert cloudwatch_service.region == 'us-east-1'
assert isinstance(cloudwatch_service.cost_preferences, CostPreferences)
assert cloudwatch_service._dao is not None
assert cloudwatch_service._pricing_dao is not None
def test_update_cost_preferences_dict(self, cloudwatch_service):
"""Test updating cost preferences with CostPreferences object."""
prefs = CostPreferences(
allow_cost_explorer=True,
allow_minimal_cost_metrics=True
)
cloudwatch_service.update_cost_preferences(prefs)
assert cloudwatch_service.cost_preferences.allow_cost_explorer is True
assert cloudwatch_service.cost_preferences.allow_minimal_cost_metrics is True
assert cloudwatch_service.cost_preferences.allow_aws_config is False
def test_get_general_spend_service(self, cloudwatch_service):
"""Test getting general spend service (lazy loading)."""
service1 = cloudwatch_service.getGeneralSpendService()
service2 = cloudwatch_service.getGeneralSpendService()
assert service1 is not None
assert service1 is service2 # Same instance (lazy loaded)
def test_get_metrics_service(self, cloudwatch_service):
"""Test getting metrics service (lazy loading)."""
service1 = cloudwatch_service.getMetricsService()
service2 = cloudwatch_service.getMetricsService()
assert service1 is not None
assert service1 is service2 # Same instance (lazy loaded)
def test_get_logs_service(self, cloudwatch_service):
"""Test getting logs service (lazy loading)."""
service1 = cloudwatch_service.getLogsService()
service2 = cloudwatch_service.getLogsService()
assert service1 is not None
assert service1 is service2 # Same instance (lazy loaded)
def test_get_alarms_service(self, cloudwatch_service):
"""Test getting alarms service (lazy loading)."""
service1 = cloudwatch_service.getAlarmsService()
service2 = cloudwatch_service.getAlarmsService()
assert service1 is not None
assert service1 is service2 # Same instance (lazy loaded)
def test_get_dashboards_service(self, cloudwatch_service):
"""Test getting dashboards service (lazy loading)."""
service1 = cloudwatch_service.getDashboardsService()
service2 = cloudwatch_service.getDashboardsService()
assert service1 is not None
assert service1 is service2 # Same instance (lazy loaded)
def test_update_cost_preferences_propagation(self, cloudwatch_service):
"""Test that cost preferences are propagated to all tip services."""
# Initialize all services
general = cloudwatch_service.getGeneralSpendService()
metrics = cloudwatch_service.getMetricsService()
logs = cloudwatch_service.getLogsService()
alarms = cloudwatch_service.getAlarmsService()
dashboards = cloudwatch_service.getDashboardsService()
# Update preferences
new_prefs = CostPreferences(allow_cost_explorer=True)
cloudwatch_service.update_cost_preferences(new_prefs)
# Verify all services have updated preferences
assert general.cost_preferences.allow_cost_explorer is True
assert metrics.cost_preferences.allow_cost_explorer is True
assert logs.cost_preferences.allow_cost_explorer is True
assert alarms.cost_preferences.allow_cost_explorer is True
assert dashboards.cost_preferences.allow_cost_explorer is True
def test_get_service_statistics(self, cloudwatch_service):
"""Test service statistics retrieval."""
stats = cloudwatch_service.get_service_statistics()
assert 'service_info' in stats
assert 'cache_statistics' in stats
assert 'cost_control_status' in stats
assert stats['service_info']['region'] == 'us-east-1'
assert 'initialized_services' in stats['service_info']
def test_clear_cache(self, cloudwatch_service):
"""Test cache clearing."""
# This should not raise an error
cloudwatch_service.clear_cache()
# Verify cache is cleared by checking stats
stats = cloudwatch_service.get_service_statistics()
cache_stats = stats['cache_statistics']
assert cache_stats['total_entries'] == 0
@pytest.mark.unit
@pytest.mark.asyncio
class TestCloudWatchServiceConvenienceFunctions:
"""Test convenience functions for CloudWatchService."""
@patch('services.cloudwatch_service.boto3')
async def test_create_cloudwatch_service_default(self, mock_boto3):
"""Test creating CloudWatchService with default parameters."""
# Mock all required clients
mock_cloudwatch = MagicMock()
mock_logs = MagicMock()
mock_pricing = MagicMock()
mock_boto3.client.side_effect = lambda service, **kwargs: {
'cloudwatch': mock_cloudwatch,
'logs': mock_logs,
'pricing': mock_pricing
}[service]
service = await create_cloudwatch_service()
assert isinstance(service, CloudWatchService)
assert service.region is None
assert isinstance(service.cost_preferences, CostPreferences)
@patch('services.cloudwatch_service.boto3')
async def test_create_cloudwatch_service_with_params(self, mock_boto3):
"""Test creating CloudWatchService with custom parameters."""
# Mock all required clients
mock_cloudwatch = MagicMock()
mock_logs = MagicMock()
mock_pricing = MagicMock()
mock_boto3.client.side_effect = lambda service, **kwargs: {
'cloudwatch': mock_cloudwatch,
'logs': mock_logs,
'pricing': mock_pricing
}[service]
cost_prefs = CostPreferences(
allow_cost_explorer=True,
allow_minimal_cost_metrics=True
)
service = await create_cloudwatch_service(
region='us-west-2',
cost_preferences=cost_prefs
)
assert service.region == 'us-west-2'
assert service.cost_preferences.allow_cost_explorer is True
assert service.cost_preferences.allow_minimal_cost_metrics is True
# TODO: Implement check_cloudwatch_service_connectivity function in cloudwatch_service.py
# @patch('services.cloudwatch_service.boto3')
# async def test_test_cloudwatch_service_connectivity_success(self, mock_boto3):
# """Test connectivity testing with successful operations."""
# # Mock successful responses for all operations
# mock_cloudwatch = MagicMock()
# mock_logs = MagicMock()
# mock_ce = MagicMock()
#
# mock_boto3.client.side_effect = lambda service, **kwargs: {
# 'cloudwatch': mock_cloudwatch,
# 'logs': mock_logs,
# 'ce': mock_ce
# }[service]
#
# # Mock paginator responses
# mock_paginator = MagicMock()
# mock_paginator.paginate.return_value = [{'Metrics': [], 'MetricAlarms': [], 'CompositeAlarms': [], 'DashboardEntries': [], 'logGroups': []}]
# mock_cloudwatch.get_paginator.return_value = mock_paginator
# mock_logs.get_paginator.return_value = mock_paginator
#
# service = CloudWatchService()
# result = await check_cloudwatch_service_connectivity(service)
#
# assert result['connectivity_test'] is True
# assert result['overall_success'] is True
# assert 'tests' in result
# assert len(result['tests']) == 4 # Four free operations tested
#
# for test_name, test_result in result['tests'].items():
# assert test_result['success'] is True
# assert test_result['execution_time'] >= 0
# assert test_result['error'] is None
#
# @patch('services.cloudwatch_service.boto3')
# async def test_test_cloudwatch_service_connectivity_failure(self, mock_boto3):
# """Test connectivity testing with failed operations."""
# # Mock failed responses
# mock_cloudwatch = MagicMock()
# mock_logs = MagicMock()
# mock_ce = MagicMock()
#
# mock_boto3.client.side_effect = lambda service, **kwargs: {
# 'cloudwatch': mock_cloudwatch,
# 'logs': mock_logs,
# 'ce': mock_ce
# }[service]
#
# # Mock paginator to raise exception
# mock_paginator = MagicMock()
# mock_paginator.paginate.side_effect = ClientError(
# error_response={'Error': {'Code': 'AccessDenied', 'Message': 'Access denied'}},
# operation_name='ListMetrics'
# )
# mock_cloudwatch.get_paginator.return_value = mock_paginator
# mock_logs.get_paginator.return_value = mock_paginator
#
# service = CloudWatchService()
# result = await check_cloudwatch_service_connectivity(service)
#
# assert result['connectivity_test'] is True
# assert result['overall_success'] is False
#
# for test_name, test_result in result['tests'].items():
# assert test_result['success'] is False
# assert 'Access denied' in test_result['error']
@pytest.mark.unit
class TestCloudWatchServiceCostConstraints:
"""Test cost constraint validation and enforcement."""
@pytest.fixture
def mock_boto3_clients(self):
"""Mock boto3 clients for testing."""
with patch('services.cloudwatch_service.boto3') as mock_boto3:
mock_cloudwatch = MagicMock()
mock_logs = MagicMock()
mock_ce = MagicMock()
mock_pricing = MagicMock()
mock_boto3.client.side_effect = lambda service, **kwargs: {
'cloudwatch': mock_cloudwatch,
'logs': mock_logs,
'ce': mock_ce,
'pricing': mock_pricing
}[service]
yield {
'cloudwatch': mock_cloudwatch,
'logs': mock_logs,
'ce': mock_ce,
'pricing': mock_pricing
}
@pytest.fixture
def cloudwatch_service(self, mock_boto3_clients):
"""Create CloudWatchService instance for testing."""
return CloudWatchService()
def test_cost_preferences_initialization(self, cloudwatch_service):
"""Test that cost preferences are properly initialized."""
assert isinstance(cloudwatch_service.cost_preferences, CostPreferences)
assert cloudwatch_service.cost_preferences.allow_cost_explorer is False
assert cloudwatch_service.cost_preferences.allow_minimal_cost_metrics is False
assert cloudwatch_service.cost_preferences.allow_aws_config is False
assert cloudwatch_service.cost_preferences.allow_cloudtrail is False
def test_cost_preferences_update(self, cloudwatch_service):
"""Test updating cost preferences."""
new_prefs = CostPreferences(
allow_cost_explorer=True,
allow_minimal_cost_metrics=True
)
cloudwatch_service.update_cost_preferences(new_prefs)
assert cloudwatch_service.cost_preferences.allow_cost_explorer is True
assert cloudwatch_service.cost_preferences.allow_minimal_cost_metrics is True
assert cloudwatch_service.cost_preferences.allow_aws_config is False
def test_cost_control_status_in_statistics(self, cloudwatch_service):
"""Test that cost control status is included in statistics."""
stats = cloudwatch_service.get_service_statistics()
assert 'cost_control_status' in stats
assert stats['cost_control_status']['cost_controller_active'] is True
assert stats['cost_control_status']['preferences_validated'] is True
if __name__ == "__main__":
pytest.main([__file__])
```
--------------------------------------------------------------------------------
/tests/unit/analyzers/test_metrics_optimization_analyzer.py:
--------------------------------------------------------------------------------
```python
"""
Unit tests for MetricsOptimizationAnalyzer
Tests the CloudWatch Metrics optimization analyzer functionality including
cost analysis, configuration analysis, and recommendation generation.
"""
import pytest
import asyncio
from unittest.mock import Mock, AsyncMock, patch
from datetime import datetime, timedelta
from playbooks.cloudwatch.metrics_optimization_analyzer import MetricsOptimizationAnalyzer
from services.cloudwatch_service import CloudWatchOperationResult
class TestMetricsOptimizationAnalyzer:
"""Test suite for MetricsOptimizationAnalyzer."""
@pytest.fixture
def mock_services(self):
"""Create mock services for testing."""
return {
'cost_explorer_service': Mock(),
'config_service': Mock(),
'metrics_service': Mock(),
'cloudwatch_service': Mock(),
'pricing_service': Mock(),
'performance_monitor': Mock(),
'memory_manager': Mock()
}
@pytest.fixture
def analyzer(self, mock_services):
"""Create MetricsOptimizationAnalyzer instance with mocked services."""
return MetricsOptimizationAnalyzer(**mock_services)
@pytest.fixture
def sample_metrics_data(self):
"""Sample metrics data for testing."""
return {
'metrics': [
{
'Namespace': 'AWS/EC2',
'MetricName': 'CPUUtilization',
'Dimensions': [{'Name': 'InstanceId', 'Value': 'i-1234567890abcdef0'}]
},
{
'Namespace': 'MyApp/Custom',
'MetricName': 'RequestCount',
'Dimensions': [
{'Name': 'Environment', 'Value': 'prod'},
{'Name': 'Service', 'Value': 'api'},
{'Name': 'Region', 'Value': 'us-east-1'},
{'Name': 'AZ', 'Value': 'us-east-1a'},
{'Name': 'Instance', 'Value': 'i-abc123'},
{'Name': 'Version', 'Value': 'v1.2.3'}
]
},
{
'Namespace': 'MyApp/Custom',
'MetricName': 'ErrorRate',
'Dimensions': [{'Name': 'Service', 'Value': 'api'}]
}
],
'total_count': 3
}
@pytest.mark.asyncio
async def test_analyze_basic_functionality(self, analyzer, mock_services):
"""Test basic analyze functionality."""
# Mock CloudWatch service responses
mock_services['cloudwatch_service'].list_metrics = AsyncMock(return_value=CloudWatchOperationResult(
success=True,
data={'metrics': [], 'total_count': 0}
))
result = await analyzer.analyze(region='us-east-1', lookback_days=30)
assert result['status'] == 'success'
assert result['analysis_type'] == 'metrics_optimization'
assert 'data' in result
assert 'recommendations' in result
assert result['cost_incurred'] == False # No paid operations by default
@pytest.mark.asyncio
async def test_analyze_with_cost_explorer(self, analyzer, mock_services):
"""Test analyze with Cost Explorer enabled."""
# Mock CloudWatch service responses
mock_services['cloudwatch_service'].list_metrics = AsyncMock(return_value=CloudWatchOperationResult(
success=True,
data={'metrics': [], 'total_count': 0}
))
# Mock Cost Explorer response
with patch('playbooks.cloudwatch.metrics_optimization_analyzer.get_cost_and_usage') as mock_cost_explorer:
mock_cost_explorer.return_value = {
'status': 'success',
'data': {
'ResultsByTime': [
{
'TimePeriod': {'Start': '2024-01-01'},
'Groups': [
{
'Keys': ['CloudWatch-Metrics'],
'Metrics': {
'BlendedCost': {'Amount': '10.50', 'Unit': 'USD'},
'UsageQuantity': {'Amount': '35', 'Unit': 'Count'}
}
}
]
}
]
}
}
result = await analyzer.analyze(
region='us-east-1',
lookback_days=30,
allow_cost_explorer=True
)
assert result['status'] == 'success'
assert result['cost_incurred'] == True
assert 'cost_explorer_metrics_analysis' in result['cost_incurring_operations']
assert result['primary_data_source'] == 'cost_explorer'
@pytest.mark.asyncio
async def test_analyze_metrics_configuration(self, analyzer, mock_services, sample_metrics_data):
"""Test metrics configuration analysis."""
# Mock CloudWatch service response
mock_services['cloudwatch_service'].list_metrics = AsyncMock(return_value=CloudWatchOperationResult(
success=True,
data=sample_metrics_data
))
result = await analyzer._analyze_metrics_configuration()
assert result['status'] == 'success'
assert 'metrics_configuration_analysis' in result['data']
config_data = result['data']['metrics_configuration_analysis']
assert 'metrics' in config_data
assert 'metrics_analysis' in config_data
metrics_analysis = config_data['metrics_analysis']
assert metrics_analysis['total_metrics'] == 3
assert metrics_analysis['custom_metrics_count'] == 2 # MyApp/Custom metrics
assert metrics_analysis['aws_metrics_count'] == 1 # AWS/EC2 metric
def test_analyze_metrics_metadata(self, analyzer, sample_metrics_data):
"""Test metrics metadata analysis."""
metrics = sample_metrics_data['metrics']
analysis = analyzer._analyze_metrics_metadata(metrics)
assert analysis['total_metrics'] == 3
assert analysis['custom_metrics_count'] == 2
assert analysis['aws_metrics_count'] == 1
assert 'MyApp/Custom' in analysis['custom_namespaces']
assert len(analysis['high_cardinality_metrics']) == 1 # RequestCount has 6 dimensions
# Check free tier analysis
free_tier = analysis['free_tier_analysis']
assert free_tier['free_tier_limit'] == 10
assert free_tier['within_free_tier'] == True # Only 2 custom metrics
def test_categorize_metrics_usage_type(self, analyzer):
"""Test metrics usage type categorization."""
processed_data = {'metrics_specific_costs': {}}
# Test custom metrics categorization
analyzer._categorize_metrics_usage_type('CloudWatch-CustomMetrics', 5.0, 10, processed_data)
assert processed_data['metrics_specific_costs']['custom_metrics'] == 5.0
# Test detailed monitoring categorization (needs 'metric' and 'detailed' in usage type)
analyzer._categorize_metrics_usage_type('CloudWatch-MetricDetailedMonitoring', 3.0, 5, processed_data)
assert processed_data['metrics_specific_costs']['detailed_monitoring'] == 3.0
# Test API requests categorization
analyzer._categorize_metrics_usage_type('CloudWatch-Requests', 1.0, 1000, processed_data)
assert processed_data['metrics_specific_costs']['api_requests'] == 1.0
@pytest.mark.asyncio
async def test_analyze_with_minimal_cost_metrics(self, analyzer, mock_services):
"""Test analyze with minimal cost metrics enabled."""
# Mock CloudWatch service responses
mock_services['cloudwatch_service'].list_metrics = AsyncMock(return_value=CloudWatchOperationResult(
success=True,
data={'metrics': [], 'total_count': 0}
))
mock_services['cloudwatch_service'].get_targeted_metric_statistics = AsyncMock(return_value=CloudWatchOperationResult(
success=True,
data={'datapoints': []}
))
result = await analyzer.analyze(
region='us-east-1',
lookback_days=30,
allow_minimal_cost_metrics=True
)
assert result['status'] == 'success'
assert result['cost_incurred'] == True
assert 'minimal_cost_metrics_analysis' in result['cost_incurring_operations']
@pytest.mark.asyncio
async def test_analyze_custom_metrics_patterns(self, analyzer, mock_services, sample_metrics_data):
"""Test custom metrics patterns analysis."""
# Mock CloudWatch service response
mock_services['cloudwatch_service'].list_metrics = AsyncMock(return_value=CloudWatchOperationResult(
success=True,
data=sample_metrics_data
))
result = await analyzer._analyze_custom_metrics_patterns(30)
assert result is not None
assert result['total_custom_metrics'] == 2
assert 'MyApp/Custom' in result['custom_namespaces']
assert len(result['high_cardinality_metrics']) == 1
assert len(result['optimization_opportunities']) == 1
# Check optimization opportunity
opportunity = result['optimization_opportunities'][0]
assert opportunity['type'] == 'reduce_cardinality'
assert 'MyApp/Custom/RequestCount' in opportunity['metric']
def test_generate_optimization_priorities(self, analyzer):
"""Test optimization priorities generation."""
optimization_analysis = {
'custom_metrics_optimization': {
'optimization_opportunities': [
{
'type': 'reduce_high_cardinality_metrics',
'potential_monthly_savings': 15.0,
'implementation_effort': 'medium'
}
]
},
'detailed_monitoring_optimization': {
'optimization_opportunities': [
{
'type': 'disable_detailed_monitoring',
'potential_monthly_savings': 25.0,
'implementation_effort': 'low'
}
]
}
}
priorities = analyzer._generate_optimization_priorities(optimization_analysis)
assert len(priorities) == 2
# Higher savings with lower effort should be first
assert priorities[0]['potential_monthly_savings'] == 25.0
assert priorities[0]['implementation_effort'] == 'low'
assert priorities[1]['potential_monthly_savings'] == 15.0
assert priorities[1]['implementation_effort'] == 'medium'
def test_get_recommendations_basic(self, analyzer):
"""Test basic recommendations generation."""
analysis_results = {
'data': {
'optimization_analysis': {
'custom_metrics_optimization': {
'optimization_opportunities': [
{
'type': 'reduce_high_cardinality_metrics',
'description': 'Reduce cardinality of 5 high-cardinality metrics',
'potential_monthly_savings': 15.0,
'implementation_effort': 'medium',
'affected_metrics': 5
}
]
}
},
'metrics_configuration_analysis': {
'metrics_analysis': {
'free_tier_analysis': {
'within_free_tier': False,
'custom_metrics_beyond_free_tier': 25
},
'high_cardinality_metrics': [
{'namespace': 'MyApp', 'metric_name': 'RequestCount'}
]
}
}
}
}
recommendations = analyzer.get_recommendations(analysis_results)
assert len(recommendations) >= 2
# Check for custom metrics optimization recommendation
custom_metrics_rec = next((r for r in recommendations if 'Custom Metrics Optimization' in r['title']), None)
assert custom_metrics_rec is not None
assert custom_metrics_rec['potential_savings'] == 15.0
assert custom_metrics_rec['cloudwatch_component'] == 'metrics'
# Check for free tier recommendation
free_tier_rec = next((r for r in recommendations if 'Free Tier' in r['title']), None)
assert free_tier_rec is not None
assert free_tier_rec['potential_savings'] == 25 * 0.30 # 25 metrics * $0.30
def test_get_recommendations_with_cost_preferences_disabled(self, analyzer):
"""Test recommendations when cost preferences are disabled."""
analyzer.cost_preferences = {
'allow_cost_explorer': False,
'allow_minimal_cost_metrics': False
}
analysis_results = {
'data': {
'optimization_analysis': {},
'metrics_configuration_analysis': {
'metrics_analysis': {
'free_tier_analysis': {'within_free_tier': True}
}
}
}
}
recommendations = analyzer.get_recommendations(analysis_results)
# Should include recommendations to enable cost analysis features
cost_explorer_rec = next((r for r in recommendations if 'Cost Explorer' in r['title']), None)
assert cost_explorer_rec is not None
assert cost_explorer_rec['type'] == 'governance'
minimal_cost_rec = next((r for r in recommendations if 'Minimal Cost Metrics' in r['title']), None)
assert minimal_cost_rec is not None
assert minimal_cost_rec['type'] == 'governance'
@pytest.mark.asyncio
async def test_error_handling(self, analyzer, mock_services):
"""Test error handling in analysis."""
# Mock CloudWatch service to raise an exception
mock_services['cloudwatch_service'].list_metrics = AsyncMock(side_effect=Exception("API Error"))
result = await analyzer.analyze(region='us-east-1')
# The analyzer handles errors gracefully and continues with partial results
assert result['status'] == 'success' # Still succeeds with partial data
assert result['fallback_used'] == True # But marks that fallback was used
assert result['cost_incurred'] == False
@pytest.mark.asyncio
async def test_parameter_validation(self, analyzer):
"""Test parameter validation."""
# Test invalid lookback_days
validation = analyzer.validate_parameters(lookback_days=-5)
assert not validation['valid']
assert any('positive integer' in error for error in validation['errors'])
# Test invalid region type
validation = analyzer.validate_parameters(region=123)
assert not validation['valid']
assert any('string' in error for error in validation['errors'])
# Test valid parameters
validation = analyzer.validate_parameters(region='us-east-1', lookback_days=30)
assert validation['valid']
assert len(validation['errors']) == 0
def test_analyzer_info(self, analyzer):
"""Test analyzer information retrieval."""
info = analyzer.get_analyzer_info()
assert info['analysis_type'] == 'metrics_optimization'
assert info['class_name'] == 'MetricsOptimizationAnalyzer'
assert info['version'] == '1.0.0'
assert 'services' in info
assert 'cost_optimization' in info
assert info['cost_optimization']['prioritizes_cost_explorer'] == True
assert info['cost_optimization']['minimizes_api_costs'] == True
@pytest.mark.asyncio
async def test_execute_with_error_handling(self, analyzer, mock_services):
"""Test execute_with_error_handling method."""
# Mock successful analysis
mock_services['cloudwatch_service'].list_metrics = AsyncMock(return_value=CloudWatchOperationResult(
success=True,
data={'metrics': [], 'total_count': 0}
))
result = await analyzer.execute_with_error_handling(region='us-east-1', lookback_days=30)
assert result['status'] == 'success'
assert result['analysis_type'] == 'metrics_optimization'
assert 'execution_time' in result
assert 'timestamp' in result
assert result['cost_incurred'] == False
assert result['primary_data_source'] == 'cloudwatch_config'
if __name__ == '__main__':
pytest.main([__file__])
```
--------------------------------------------------------------------------------
/playbooks/cloudwatch/cloudwatch_optimization.py:
--------------------------------------------------------------------------------
```python
"""
CloudWatch Optimization MCP Wrapper Functions
This module provides MCP-compatible wrapper functions for CloudWatch optimization analysis.
These functions follow the same pattern as other service optimization modules in the CFM Tips project.
"""
import asyncio
import json
import logging
import time
from typing import Dict, List, Any
from mcp.types import TextContent
from utils.logging_config import log_function_entry, log_function_exit
from utils.error_handler import handle_aws_error, ResponseFormatter
from utils.documentation_links import add_documentation_links
logger = logging.getLogger(__name__)
@handle_aws_error
async def run_cloudwatch_general_spend_analysis_mcp(arguments: Dict[str, Any]) -> List[TextContent]:
"""Run CloudWatch general spend analysis to understand cost breakdown across logs, metrics, alarms, and dashboards."""
log_function_entry(logger, "run_cloudwatch_general_spend_analysis_mcp", arguments=arguments)
start_time = time.time()
try:
from playbooks.cloudwatch.cloudwatch_optimization_analyzer import CloudWatchOptimizationAnalyzer
from playbooks.cloudwatch.cost_controller import CostPreferences
region = arguments.get("region")
page = arguments.get("page", 1)
timeout_seconds = arguments.get("timeout_seconds", 120)
# Build cost preferences from arguments
cost_preferences = CostPreferences(
allow_cost_explorer=arguments.get("allow_cost_explorer", False),
allow_aws_config=arguments.get("allow_aws_config", False),
allow_cloudtrail=arguments.get("allow_cloudtrail", False),
allow_minimal_cost_metrics=arguments.get("allow_minimal_cost_metrics", False)
)
# Initialize analyzer
analyzer = CloudWatchOptimizationAnalyzer(region=region, cost_preferences=cost_preferences)
# Remove internal parameters from arguments
analysis_args = {k: v for k, v in arguments.items() if k not in ['page', 'timeout_seconds']}
# Execute analysis with timeout
result = await asyncio.wait_for(
analyzer.analyze_general_spend(page=page, **analysis_args),
timeout=timeout_seconds
)
# Format response
formatted_result = ResponseFormatter.success_response(
data=result,
message="CloudWatch general spend analysis completed successfully",
analysis_type="cloudwatch_general_spend"
)
# Add documentation links
formatted_result = add_documentation_links(formatted_result, "cloudwatch")
execution_time = time.time() - start_time
log_function_exit(logger, "run_cloudwatch_general_spend_analysis_mcp", "success", execution_time)
return ResponseFormatter.to_text_content(formatted_result)
except asyncio.TimeoutError:
error_message = f"CloudWatch general spend analysis timed out after {timeout_seconds} seconds"
logger.error(error_message)
return ResponseFormatter.to_text_content(ResponseFormatter.error_response(
error_message, "timeout_error", "cloudwatch_general_spend"
))
except Exception as e:
logger.error(f"CloudWatch general spend analysis failed: {str(e)}")
return ResponseFormatter.to_text_content(ResponseFormatter.error_response(
str(e), "analysis_error", "cloudwatch_general_spend"
))
@handle_aws_error
async def run_cloudwatch_metrics_optimization_mcp(arguments: Dict[str, Any]) -> List[TextContent]:
"""Run CloudWatch metrics optimization analysis to identify custom metrics cost optimization opportunities."""
log_function_entry(logger, "run_cloudwatch_metrics_optimization_mcp", arguments=arguments)
start_time = time.time()
try:
from playbooks.cloudwatch.cloudwatch_optimization_analyzer import CloudWatchOptimizationAnalyzer
from playbooks.cloudwatch.cost_controller import CostPreferences
region = arguments.get("region")
page = arguments.get("page", 1)
timeout_seconds = arguments.get("timeout_seconds", 120)
# Build cost preferences from arguments
cost_preferences = CostPreferences(
allow_cost_explorer=arguments.get("allow_cost_explorer", False),
allow_aws_config=arguments.get("allow_aws_config", False),
allow_cloudtrail=arguments.get("allow_cloudtrail", False),
allow_minimal_cost_metrics=arguments.get("allow_minimal_cost_metrics", False)
)
# Initialize analyzer
analyzer = CloudWatchOptimizationAnalyzer(region=region, cost_preferences=cost_preferences)
# Remove internal parameters from arguments
analysis_args = {k: v for k, v in arguments.items() if k not in ['page', 'timeout_seconds']}
# Execute analysis with timeout
result = await asyncio.wait_for(
analyzer.analyze_metrics_optimization(page=page, **analysis_args),
timeout=timeout_seconds
)
# Format response
formatted_result = ResponseFormatter.success_response(
data=result,
message="CloudWatch metrics optimization analysis completed successfully",
analysis_type="cloudwatch_metrics_optimization"
)
# Add documentation links
formatted_result = add_documentation_links(formatted_result, "cloudwatch")
execution_time = time.time() - start_time
log_function_exit(logger, "run_cloudwatch_metrics_optimization_mcp", "success", execution_time)
return ResponseFormatter.to_text_content(formatted_result)
except asyncio.TimeoutError:
error_message = f"CloudWatch metrics optimization analysis timed out after {timeout_seconds} seconds"
logger.error(error_message)
return ResponseFormatter.to_text_content(ResponseFormatter.error_response(
error_message, "timeout_error", "cloudwatch_metrics_optimization"
))
except Exception as e:
logger.error(f"CloudWatch metrics optimization analysis failed: {str(e)}")
return ResponseFormatter.to_text_content(ResponseFormatter.error_response(
str(e), "analysis_error", "cloudwatch_metrics_optimization"
))
@handle_aws_error
async def run_cloudwatch_logs_optimization_mcp(arguments: Dict[str, Any]) -> List[TextContent]:
"""Run CloudWatch logs optimization analysis to identify log retention and ingestion cost optimization opportunities."""
log_function_entry(logger, "run_cloudwatch_logs_optimization_mcp", arguments=arguments)
start_time = time.time()
try:
from playbooks.cloudwatch.cloudwatch_optimization_analyzer import CloudWatchOptimizationAnalyzer
from playbooks.cloudwatch.cost_controller import CostPreferences
region = arguments.get("region")
page = arguments.get("page", 1)
timeout_seconds = arguments.get("timeout_seconds", 120)
# Build cost preferences from arguments
cost_preferences = CostPreferences(
allow_cost_explorer=arguments.get("allow_cost_explorer", False),
allow_aws_config=arguments.get("allow_aws_config", False),
allow_cloudtrail=arguments.get("allow_cloudtrail", False),
allow_minimal_cost_metrics=arguments.get("allow_minimal_cost_metrics", False)
)
# Initialize analyzer
analyzer = CloudWatchOptimizationAnalyzer(region=region, cost_preferences=cost_preferences)
# Remove internal parameters from arguments
analysis_args = {k: v for k, v in arguments.items() if k not in ['page', 'timeout_seconds']}
# Execute analysis with timeout
result = await asyncio.wait_for(
analyzer.analyze_logs_optimization(page=page, **analysis_args),
timeout=timeout_seconds
)
# Format response
formatted_result = ResponseFormatter.success_response(
data=result,
message="CloudWatch logs optimization analysis completed successfully",
analysis_type="cloudwatch_logs_optimization"
)
# Add documentation links
formatted_result = add_documentation_links(formatted_result, "cloudwatch")
execution_time = time.time() - start_time
log_function_exit(logger, "run_cloudwatch_logs_optimization_mcp", "success", execution_time)
return ResponseFormatter.to_text_content(formatted_result)
except asyncio.TimeoutError:
error_message = f"CloudWatch logs optimization analysis timed out after {timeout_seconds} seconds"
logger.error(error_message)
return ResponseFormatter.to_text_content(ResponseFormatter.error_response(
error_message, "timeout_error", "cloudwatch_logs_optimization"
))
except Exception as e:
logger.error(f"CloudWatch logs optimization analysis failed: {str(e)}")
return ResponseFormatter.to_text_content(ResponseFormatter.error_response(
str(e), "analysis_error", "cloudwatch_logs_optimization"
))
@handle_aws_error
async def run_cloudwatch_alarms_and_dashboards_optimization_mcp(arguments: Dict[str, Any]) -> List[TextContent]:
"""Run CloudWatch alarms and dashboards optimization analysis to identify monitoring efficiency improvements."""
log_function_entry(logger, "run_cloudwatch_alarms_and_dashboards_optimization_mcp", arguments=arguments)
start_time = time.time()
try:
from playbooks.cloudwatch.cloudwatch_optimization_analyzer import CloudWatchOptimizationAnalyzer
from playbooks.cloudwatch.cost_controller import CostPreferences
region = arguments.get("region")
page = arguments.get("page", 1)
timeout_seconds = arguments.get("timeout_seconds", 120)
# Build cost preferences from arguments
cost_preferences = CostPreferences(
allow_cost_explorer=arguments.get("allow_cost_explorer", False),
allow_aws_config=arguments.get("allow_aws_config", False),
allow_cloudtrail=arguments.get("allow_cloudtrail", False),
allow_minimal_cost_metrics=arguments.get("allow_minimal_cost_metrics", False)
)
# Initialize analyzer
analyzer = CloudWatchOptimizationAnalyzer(region=region, cost_preferences=cost_preferences)
# Remove internal parameters from arguments
analysis_args = {k: v for k, v in arguments.items() if k not in ['page', 'timeout_seconds']}
# Execute analysis with timeout
result = await asyncio.wait_for(
analyzer.analyze_alarms_optimization(page=page, **analysis_args),
timeout=timeout_seconds
)
# Format response
formatted_result = ResponseFormatter.success_response(
data=result,
message="CloudWatch alarms and dashboards optimization analysis completed successfully",
analysis_type="cloudwatch_alarms_dashboards_optimization"
)
# Add documentation links
formatted_result = add_documentation_links(formatted_result, "cloudwatch")
execution_time = time.time() - start_time
log_function_exit(logger, "run_cloudwatch_alarms_and_dashboards_optimization_mcp", "success", execution_time)
return ResponseFormatter.to_text_content(formatted_result)
except asyncio.TimeoutError:
error_message = f"CloudWatch alarms and dashboards optimization analysis timed out after {timeout_seconds} seconds"
logger.error(error_message)
return ResponseFormatter.to_text_content(ResponseFormatter.error_response(
error_message, "timeout_error", "cloudwatch_alarms_dashboards_optimization"
))
except Exception as e:
logger.error(f"CloudWatch alarms and dashboards optimization analysis failed: {str(e)}")
return ResponseFormatter.to_text_content(ResponseFormatter.error_response(
str(e), "analysis_error", "cloudwatch_alarms_dashboards_optimization"
))
@handle_aws_error
async def run_cloudwatch_comprehensive_optimization_tool_mcp(arguments: Dict[str, Any]) -> List[TextContent]:
"""Run comprehensive CloudWatch optimization using the unified optimization tool with intelligent orchestration."""
log_function_entry(logger, "run_cloudwatch_comprehensive_optimization_tool_mcp", arguments=arguments)
start_time = time.time()
try:
from playbooks.cloudwatch.cloudwatch_optimization_tool import CloudWatchOptimizationTool
region = arguments.get("region")
timeout_seconds = arguments.get("timeout_seconds", 120)
# Initialize comprehensive optimization tool
tool = CloudWatchOptimizationTool(region=region)
# Execute comprehensive analysis with timeout
result = await asyncio.wait_for(
tool.execute_comprehensive_optimization_analysis(**arguments),
timeout=timeout_seconds
)
# Format response
formatted_result = ResponseFormatter.success_response(
data=result,
message="CloudWatch comprehensive optimization analysis completed successfully",
analysis_type="cloudwatch_comprehensive_optimization"
)
# Add documentation links
formatted_result = add_documentation_links(formatted_result, "cloudwatch")
execution_time = time.time() - start_time
log_function_exit(logger, "run_cloudwatch_comprehensive_optimization_tool_mcp", "success", execution_time)
return ResponseFormatter.to_text_content(formatted_result)
except asyncio.TimeoutError:
error_message = f"CloudWatch comprehensive optimization analysis timed out after {timeout_seconds} seconds"
logger.error(error_message)
return ResponseFormatter.to_text_content(ResponseFormatter.error_response(
error_message, "timeout_error", "cloudwatch_comprehensive_optimization"
))
except Exception as e:
logger.error(f"CloudWatch comprehensive optimization analysis failed: {str(e)}")
return ResponseFormatter.to_text_content(ResponseFormatter.error_response(
str(e), "analysis_error", "cloudwatch_comprehensive_optimization"
))
@handle_aws_error
async def query_cloudwatch_analysis_results_mcp(arguments: Dict[str, Any]) -> List[TextContent]:
"""Query stored CloudWatch analysis results using SQL queries."""
log_function_entry(logger, "query_cloudwatch_analysis_results_mcp", arguments=arguments)
start_time = time.time()
try:
# Import the sync function from runbook_functions and call it
from runbook_functions import query_cloudwatch_analysis_results
# Call the existing function and convert to MCP format
result = await query_cloudwatch_analysis_results(arguments)
execution_time = time.time() - start_time
log_function_exit(logger, "query_cloudwatch_analysis_results_mcp", "success", execution_time)
return result
except Exception as e:
logger.error(f"CloudWatch analysis results query failed: {str(e)}")
return ResponseFormatter.to_text_content(ResponseFormatter.error_response(
str(e), "query_error", "cloudwatch_analysis_results"
))
@handle_aws_error
async def validate_cloudwatch_cost_preferences_mcp(arguments: Dict[str, Any]) -> List[TextContent]:
"""Validate CloudWatch cost preferences and get functionality coverage estimates."""
log_function_entry(logger, "validate_cloudwatch_cost_preferences_mcp", arguments=arguments)
start_time = time.time()
try:
# Import the sync function from runbook_functions and call it
from runbook_functions import validate_cloudwatch_cost_preferences
# Call the existing function and convert to MCP format
result = await validate_cloudwatch_cost_preferences(arguments)
execution_time = time.time() - start_time
log_function_exit(logger, "validate_cloudwatch_cost_preferences_mcp", "success", execution_time)
return result
except Exception as e:
logger.error(f"CloudWatch cost preferences validation failed: {str(e)}")
return ResponseFormatter.to_text_content(ResponseFormatter.error_response(
str(e), "validation_error", "cloudwatch_cost_preferences"
))
@handle_aws_error
async def get_cloudwatch_cost_estimate_mcp(arguments: Dict[str, Any]) -> List[TextContent]:
"""Get detailed cost estimate for CloudWatch optimization analysis based on enabled features."""
log_function_entry(logger, "get_cloudwatch_cost_estimate_mcp", arguments=arguments)
start_time = time.time()
try:
# Import the sync function from runbook_functions and call it
from runbook_functions import get_cloudwatch_cost_estimate
# Call the existing function and convert to MCP format
result = await get_cloudwatch_cost_estimate(arguments)
execution_time = time.time() - start_time
log_function_exit(logger, "get_cloudwatch_cost_estimate_mcp", "success", execution_time)
return result
except Exception as e:
logger.error(f"CloudWatch cost estimate failed: {str(e)}")
return ResponseFormatter.to_text_content(ResponseFormatter.error_response(
str(e), "estimation_error", "cloudwatch_cost_estimate"
))
```
--------------------------------------------------------------------------------
/tests/legacy/test_cloudwatch_pagination_integration.py:
--------------------------------------------------------------------------------
```python
"""
Comprehensive integration tests for CloudWatch pagination and sorting functionality.
This test suite validates the complete end-to-end functionality of:
- Cost-based sorting (highest cost first)
- 1-based pagination with fixed page size of 10
- Zero-cost guarantee (no additional AWS API calls)
- MCP function integration
- Backward compatibility
"""
import pytest
import asyncio
import unittest.mock as mock
from unittest.mock import MagicMock, patch, AsyncMock
from playbooks.cloudwatch.result_processor import CloudWatchResultProcessor
from playbooks.cloudwatch.optimization_orchestrator import CloudWatchOptimizationOrchestrator
from runbook_functions import cloudwatch_general_spend_analysis
class TestCloudWatchPaginationIntegration:
"""Comprehensive integration tests for CloudWatch pagination."""
def test_cost_based_sorting_validation(self):
"""Test that results are consistently sorted by cost in descending order."""
processor = CloudWatchResultProcessor()
# Test with log groups of different sizes
log_groups = [
{'logGroupName': 'tiny-group', 'storedBytes': 536870912}, # 0.5 GB = $0.015
{'logGroupName': 'small-group', 'storedBytes': 1073741824}, # 1 GB = $0.03
{'logGroupName': 'large-group', 'storedBytes': 10737418240}, # 10 GB = $0.30
{'logGroupName': 'medium-group', 'storedBytes': 5368709120}, # 5 GB = $0.15
{'logGroupName': 'huge-group', 'storedBytes': 21474836480}, # 20 GB = $0.60
]
result = processor.process_log_groups_results(log_groups, page=1)
items = result['items']
# Verify sorting by cost descending
assert len(items) == 5
assert items[0]['logGroupName'] == 'huge-group'
assert items[0]['estimated_monthly_cost'] == 0.60
assert items[1]['logGroupName'] == 'large-group'
assert items[1]['estimated_monthly_cost'] == 0.30
assert items[2]['logGroupName'] == 'medium-group'
assert items[2]['estimated_monthly_cost'] == 0.15
assert items[3]['logGroupName'] == 'small-group'
assert items[3]['estimated_monthly_cost'] == 0.03
assert items[4]['logGroupName'] == 'tiny-group'
assert items[4]['estimated_monthly_cost'] == 0.015
def test_pagination_accuracy_comprehensive(self):
"""Test pagination metadata accuracy across different scenarios."""
processor = CloudWatchResultProcessor()
# Test with exactly 30 items (3 full pages)
log_groups = [
{'logGroupName': f'group-{i:02d}', 'storedBytes': (31-i) * 1073741824} # Descending sizes
for i in range(1, 31) # 30 log groups
]
# Test page 1
result_p1 = processor.process_log_groups_results(log_groups, page=1)
assert result_p1['pagination']['current_page'] == 1
assert result_p1['pagination']['total_items'] == 30
assert result_p1['pagination']['total_pages'] == 3
assert result_p1['pagination']['has_next_page'] is True
assert result_p1['pagination']['has_previous_page'] is False
assert len(result_p1['items']) == 10
# Test page 2
result_p2 = processor.process_log_groups_results(log_groups, page=2)
assert result_p2['pagination']['current_page'] == 2
assert result_p2['pagination']['has_next_page'] is True
assert result_p2['pagination']['has_previous_page'] is True
assert len(result_p2['items']) == 10
# Test page 3 (last full page)
result_p3 = processor.process_log_groups_results(log_groups, page=3)
assert result_p3['pagination']['current_page'] == 3
assert result_p3['pagination']['has_next_page'] is False
assert result_p3['pagination']['has_previous_page'] is True
assert len(result_p3['items']) == 10
# Test out of range page
result_p4 = processor.process_log_groups_results(log_groups, page=4)
assert result_p4['pagination']['current_page'] == 4
assert result_p4['pagination']['has_next_page'] is False
assert result_p4['pagination']['has_previous_page'] is True
assert len(result_p4['items']) == 0
def test_1_based_pagination_edge_cases(self):
"""Test 1-based pagination handles edge cases correctly."""
processor = CloudWatchResultProcessor()
log_groups = [{'logGroupName': 'test', 'storedBytes': 1073741824}]
# Test page 0 (should default to page 1)
result_p0 = processor.process_log_groups_results(log_groups, page=0)
assert result_p0['pagination']['current_page'] == 1
# Test negative page (should default to page 1)
result_neg = processor.process_log_groups_results(log_groups, page=-10)
assert result_neg['pagination']['current_page'] == 1
# Test very large page number
result_large = processor.process_log_groups_results(log_groups, page=999)
assert result_large['pagination']['current_page'] == 999
assert result_large['pagination']['total_pages'] == 1
assert len(result_large['items']) == 0
def test_zero_cost_guarantee_comprehensive(self):
"""Test that no AWS API calls are made during any processing operations."""
processor = CloudWatchResultProcessor()
# Test data for all resource types
log_groups = [{'logGroupName': 'test-lg', 'storedBytes': 1073741824}]
metrics = [{'MetricName': 'CustomMetric', 'Namespace': 'MyApp'}]
alarms = [{'AlarmName': 'test-alarm', 'Period': 300}]
dashboards = [{'DashboardName': 'test-dashboard'}]
recommendations = [{'type': 'optimization', 'potential_monthly_savings': 10.0}]
with mock.patch('boto3.client') as mock_boto3, \
mock.patch('boto3.resource') as mock_resource, \
mock.patch('requests.get') as mock_get:
# Test all processing methods
processor.process_log_groups_results(log_groups, page=1)
processor.process_metrics_results(metrics, page=1)
processor.process_alarms_results(alarms, page=1)
processor.process_dashboards_results(dashboards, page=1)
processor.process_recommendations(recommendations, page=1)
# Verify no external calls were made
mock_boto3.assert_not_called()
mock_resource.assert_not_called()
mock_get.assert_not_called()
def test_all_resource_types_sorting(self):
"""Test that all CloudWatch resource types are sorted correctly."""
processor = CloudWatchResultProcessor()
# Test metrics (custom vs AWS)
metrics = [
{'MetricName': 'AWSMetric', 'Namespace': 'AWS/EC2'}, # Free = $0.00
{'MetricName': 'CustomMetric1', 'Namespace': 'MyApp/Perf'}, # Custom = $0.30
{'MetricName': 'CustomMetric2', 'Namespace': 'MyApp/Business'} # Custom = $0.30
]
result = processor.process_metrics_results(metrics, page=1)
items = result['items']
# Custom metrics should be first (higher cost)
assert items[0]['Namespace'] in ['MyApp/Perf', 'MyApp/Business']
assert items[0]['estimated_monthly_cost'] == 0.30
assert items[1]['Namespace'] in ['MyApp/Perf', 'MyApp/Business']
assert items[1]['estimated_monthly_cost'] == 0.30
assert items[2]['Namespace'] == 'AWS/EC2'
assert items[2]['estimated_monthly_cost'] == 0.0
# Test alarms (high-resolution vs standard)
alarms = [
{'AlarmName': 'standard-alarm', 'Period': 300}, # Standard = $0.10
{'AlarmName': 'high-res-alarm', 'Period': 60} # High-res = $0.50
]
result = processor.process_alarms_results(alarms, page=1)
items = result['items']
# High-resolution alarm should be first (higher cost)
assert items[0]['AlarmName'] == 'high-res-alarm'
assert items[0]['estimated_monthly_cost'] == 0.50
assert items[1]['AlarmName'] == 'standard-alarm'
assert items[1]['estimated_monthly_cost'] == 0.10
def test_dashboard_free_tier_handling(self):
"""Test that dashboard free tier is handled correctly."""
processor = CloudWatchResultProcessor()
# Test with 5 dashboards (2 beyond free tier)
dashboards = [
{'DashboardName': f'dashboard-{i}'} for i in range(1, 6)
]
result = processor.process_dashboards_results(dashboards, page=1)
items = result['items']
# First 2 should be paid dashboards (beyond free tier)
assert items[0]['estimated_monthly_cost'] == 3.00
assert items[1]['estimated_monthly_cost'] == 3.00
# Last 3 should be free tier dashboards
assert items[2]['estimated_monthly_cost'] == 0.0
assert items[3]['estimated_monthly_cost'] == 0.0
assert items[4]['estimated_monthly_cost'] == 0.0
def test_recommendations_sorting(self):
"""Test that recommendations are sorted by potential savings."""
processor = CloudWatchResultProcessor()
recommendations = [
{'type': 'low_impact', 'potential_monthly_savings': 5.0},
{'type': 'high_impact', 'potential_monthly_savings': 50.0},
{'type': 'medium_impact', 'potential_monthly_savings': 20.0},
{'type': 'minimal_impact', 'potential_monthly_savings': 1.0}
]
result = processor.process_recommendations(recommendations, page=1)
items = result['items']
# Should be sorted by potential savings descending
assert items[0]['potential_monthly_savings'] == 50.0
assert items[1]['potential_monthly_savings'] == 20.0
assert items[2]['potential_monthly_savings'] == 5.0
assert items[3]['potential_monthly_savings'] == 1.0
def test_empty_results_handling(self):
"""Test that empty results are handled correctly."""
processor = CloudWatchResultProcessor()
# Test with empty list
result = processor.process_log_groups_results([], page=1)
assert len(result['items']) == 0
assert result['pagination']['current_page'] == 1
assert result['pagination']['total_items'] == 0
assert result['pagination']['total_pages'] == 0
assert result['pagination']['has_next_page'] is False
assert result['pagination']['has_previous_page'] is False
def test_single_page_results(self):
"""Test handling of results that fit in a single page."""
processor = CloudWatchResultProcessor()
# Test with 5 items (less than page size of 10)
log_groups = [
{'logGroupName': f'group-{i}', 'storedBytes': i * 1073741824}
for i in range(1, 6)
]
result = processor.process_log_groups_results(log_groups, page=1)
assert len(result['items']) == 5
assert result['pagination']['current_page'] == 1
assert result['pagination']['total_items'] == 5
assert result['pagination']['total_pages'] == 1
assert result['pagination']['has_next_page'] is False
assert result['pagination']['has_previous_page'] is False
class TestMCPFunctionIntegration:
"""Test MCP function integration with pagination."""
def test_mcp_function_signatures(self):
"""Test that MCP functions have correct pagination signatures."""
import inspect
# Test cloudwatch_general_spend_analysis
sig = inspect.signature(cloudwatch_general_spend_analysis)
params = sig.parameters
assert 'region' in params
assert 'page' in params
assert params['page'].default == 1
# Verify page parameter type annotation if present
if params['page'].annotation != inspect.Parameter.empty:
assert params['page'].annotation == int
def test_mcp_function_parameter_handling(self):
"""Test that MCP functions handle pagination parameters correctly."""
# Mock the orchestrator to avoid actual AWS calls
with patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchOptimizationOrchestrator') as mock_orchestrator_class:
mock_orchestrator = AsyncMock()
mock_orchestrator.execute_analysis.return_value = {
'status': 'success',
'data': {'test': 'data'},
'pagination_applied': True,
'current_page': 1
}
mock_orchestrator_class.return_value = mock_orchestrator
# Test default page
result = cloudwatch_general_spend_analysis(region='us-east-1')
assert result['status'] == 'success'
# Verify orchestrator was called with page=1 (default)
mock_orchestrator.execute_analysis.assert_called()
call_args = mock_orchestrator.execute_analysis.call_args
assert call_args[1]['page'] == 1
# Test explicit page parameter
result = cloudwatch_general_spend_analysis(region='us-east-1', page=2)
assert result['status'] == 'success'
# Verify orchestrator was called with page=2
call_args = mock_orchestrator.execute_analysis.call_args
assert call_args[1]['page'] == 2
def test_mcp_function_error_handling(self):
"""Test that MCP functions handle errors gracefully."""
# Test with invalid region to trigger error
result = cloudwatch_general_spend_analysis(region='invalid-region-12345')
# Should return error status, not raise exception
assert 'status' in result
# The function should handle errors gracefully
assert result['status'] in ['error', 'success'] # May succeed with mock data
class TestBackwardCompatibility:
"""Test backward compatibility of pagination implementation."""
def test_existing_api_compatibility(self):
"""Test that existing API calls still work without pagination parameters."""
with patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchOptimizationOrchestrator') as mock_orchestrator_class:
mock_orchestrator = AsyncMock()
mock_orchestrator.execute_analysis.return_value = {
'status': 'success',
'data': {'test': 'data'}
}
mock_orchestrator_class.return_value = mock_orchestrator
# Test call without page parameter (should default to page 1)
result = cloudwatch_general_spend_analysis(region='us-east-1')
assert result['status'] == 'success'
# Verify default page was used
call_args = mock_orchestrator.execute_analysis.call_args
assert call_args[1]['page'] == 1
def test_result_structure_compatibility(self):
"""Test that result structures are backward compatible."""
processor = CloudWatchResultProcessor()
log_groups = [{'logGroupName': 'test', 'storedBytes': 1073741824}]
result = processor.process_log_groups_results(log_groups, page=1)
# Verify expected structure
assert 'items' in result
assert 'pagination' in result
assert isinstance(result['items'], list)
assert isinstance(result['pagination'], dict)
# Verify pagination metadata structure
pagination = result['pagination']
required_fields = ['current_page', 'page_size', 'total_items', 'total_pages', 'has_next_page', 'has_previous_page']
for field in required_fields:
assert field in pagination
class TestPerformanceAndMemory:
"""Test performance and memory efficiency of pagination."""
def test_large_dataset_handling(self):
"""Test that large datasets are handled efficiently."""
processor = CloudWatchResultProcessor()
# Create a large dataset (1000 items)
large_dataset = [
{'logGroupName': f'group-{i:04d}', 'storedBytes': i * 1073741824}
for i in range(1, 1001)
]
# Processing should complete quickly
import time
start_time = time.time()
result = processor.process_log_groups_results(large_dataset, page=1)
end_time = time.time()
processing_time = end_time - start_time
# Should complete within reasonable time (less than 1 second)
assert processing_time < 1.0
# Should return correct pagination
assert len(result['items']) == 10
assert result['pagination']['total_items'] == 1000
assert result['pagination']['total_pages'] == 100
def test_memory_efficiency(self):
"""Test that pagination doesn't cause memory issues."""
processor = CloudWatchResultProcessor()
# Create dataset and process multiple pages
dataset = [
{'logGroupName': f'group-{i}', 'storedBytes': i * 1073741824}
for i in range(1, 101) # 100 items
]
# Process multiple pages - should not accumulate memory
for page in range(1, 11): # Pages 1-10
result = processor.process_log_groups_results(dataset, page=page)
assert len(result['items']) == 10
assert result['pagination']['current_page'] == page
if __name__ == '__main__':
pytest.main([__file__, '-v'])
```
--------------------------------------------------------------------------------
/runbook_functions.py:
--------------------------------------------------------------------------------
```python
"""
Runbook Functions for AWS Cost Optimization
This module contains all the runbook/playbook functions for cost optimization analysis.
"""
import json
import logging
from typing import Dict, List, Any
from mcp.types import TextContent
# Import playbook modules
from playbooks.ec2_optimization import (
get_underutilized_instances, get_right_sizing_recommendation, generate_right_sizing_report,
get_stopped_instances, get_unattached_elastic_ips, get_old_generation_instances,
get_instances_without_detailed_monitoring, get_graviton_compatible_instances,
get_burstable_instances_analysis, get_spot_instance_opportunities,
get_unused_capacity_reservations, get_scheduling_opportunities,
get_commitment_plan_recommendations, get_governance_violations,
generate_comprehensive_ec2_report
)
from playbooks.ebs_optimization import get_underutilized_volumes, identify_unused_volumes, generate_ebs_optimization_report
from playbooks.rds_optimization import get_underutilized_rds_instances, identify_idle_rds_instances
from playbooks.lambda_optimization import get_underutilized_lambda_functions, identify_unused_lambda_functions
logger = logging.getLogger(__name__)
# EC2 Right Sizing Runbook Functions
async def run_ec2_right_sizing_analysis(arguments: Dict[str, Any]) -> List[TextContent]:
"""Run comprehensive EC2 right-sizing analysis."""
try:
result = get_underutilized_instances(
region=arguments.get("region"),
lookback_period_days=arguments.get("lookback_period_days", 14),
cpu_threshold=arguments.get("cpu_threshold", 40.0),
memory_threshold=arguments.get("memory_threshold"),
network_threshold=arguments.get("network_threshold")
)
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def generate_ec2_right_sizing_report(arguments: Dict[str, Any]) -> List[TextContent]:
"""Generate detailed EC2 right-sizing report."""
try:
# Get underutilized instances
instances_result = get_underutilized_instances(
region=arguments.get("region"),
lookback_period_days=arguments.get("lookback_period_days", 14),
cpu_threshold=arguments.get("cpu_threshold", 40.0)
)
if instances_result["status"] != "success":
return [TextContent(type="text", text=json.dumps(instances_result, indent=2))]
# Generate report
report_result = generate_right_sizing_report(
instances_result["data"]["underutilized_instances"]
)
output_format = arguments.get("output_format", "json")
if output_format == "markdown":
# Convert to markdown format
data = report_result["data"]
report = f"""# EC2 Right Sizing Report
## Summary
- **Total Instances**: {data['total_instances']}
- **Monthly Savings**: ${data['total_monthly_savings']:.2f}
## Top Recommendations
"""
for instance in data.get('top_recommendations', []):
rec = instance.get('recommendation', {})
report += f"""### {instance['instance_id']}
- **Current**: {rec.get('current_instance_type', 'N/A')}
- **Recommended**: {rec.get('recommended_instance_type', 'N/A')}
- **Monthly Savings**: ${rec.get('estimated_monthly_savings', 0):.2f}
"""
return [TextContent(type="text", text=report)]
else:
return [TextContent(type="text", text=json.dumps(report_result, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error generating report: {str(e)}")]
# EBS Optimization Runbook Functions
async def run_ebs_optimization_analysis(arguments: Dict[str, Any]) -> List[TextContent]:
"""Run comprehensive EBS optimization analysis."""
try:
result = get_underutilized_volumes(
region=arguments.get("region"),
lookback_period_days=arguments.get("lookback_period_days", 30),
iops_threshold=arguments.get("iops_threshold", 100.0),
throughput_threshold=arguments.get("throughput_threshold", 1.0)
)
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def identify_unused_ebs_volumes(arguments: Dict[str, Any]) -> List[TextContent]:
"""Identify unused EBS volumes."""
try:
result = identify_unused_volumes(
region=arguments.get("region"),
min_age_days=arguments.get("min_age_days", 30)
)
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def generate_ebs_optimization_report(arguments: Dict[str, Any]) -> List[TextContent]:
"""Generate detailed EBS optimization report."""
try:
region = arguments.get("region")
# Get underutilized and unused volumes
underutilized_result = get_underutilized_volumes(region=region)
unused_result = identify_unused_volumes(region=region)
if underutilized_result["status"] != "success" or unused_result["status"] != "success":
return [TextContent(type="text", text="Error getting volume data")]
# Generate comprehensive report
report_result = generate_ebs_optimization_report(
underutilized_result["data"]["underutilized_volumes"],
unused_result["data"]["unused_volumes"]
)
output_format = arguments.get("output_format", "json")
if output_format == "markdown":
data = report_result["data"]
report = f"""# EBS Optimization Report
## Summary
- **Total Volumes**: {data['total_volumes']}
- **Monthly Savings**: ${data['total_monthly_savings']:.2f}
- **Unused Savings**: ${data['unused_savings']:.2f}
## Top Unused Volumes
"""
for volume in data.get('top_unused', []):
report += f"""### {volume['volume_id']}
- **Size**: {volume.get('volume_size', 'N/A')} GB
- **Monthly Cost**: ${volume.get('monthly_cost', 0):.2f}
"""
return [TextContent(type="text", text=report)]
else:
return [TextContent(type="text", text=json.dumps(report_result, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error generating report: {str(e)}")]
# RDS Optimization Runbook Functions
async def run_rds_optimization_analysis(arguments: Dict[str, Any]) -> List[TextContent]:
"""Run comprehensive RDS optimization analysis."""
try:
result = get_underutilized_rds_instances(
region=arguments.get("region"),
lookback_period_days=arguments.get("lookback_period_days", 14),
cpu_threshold=arguments.get("cpu_threshold", 40.0),
connection_threshold=arguments.get("connection_threshold", 20.0)
)
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def identify_idle_rds_instances(arguments: Dict[str, Any]) -> List[TextContent]:
"""Identify idle RDS instances."""
try:
from playbooks.rds_optimization import identify_idle_rds_instances as get_idle_rds
result = get_idle_rds(
region=arguments.get("region"),
lookback_period_days=arguments.get("lookback_period_days", 7),
connection_threshold=arguments.get("connection_threshold", 1.0)
)
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def generate_rds_optimization_report(arguments: Dict[str, Any]) -> List[TextContent]:
"""Generate detailed RDS optimization report."""
try:
region = arguments.get("region")
# Get data from playbooks
from playbooks.rds_optimization import identify_idle_rds_instances as get_idle_rds
underutilized_result = get_underutilized_rds_instances(region=region)
idle_result = get_idle_rds(region=region)
combined_report = {
"status": "success",
"report_type": "RDS Comprehensive Optimization Report",
"region": region or "default",
"optimization_analysis": underutilized_result,
"idle_instances_analysis": idle_result,
"summary": {
"underutilized_instances": underutilized_result.get("data", {}).get("count", 0),
"idle_instances": idle_result.get("data", {}).get("count", 0)
}
}
return [TextContent(type="text", text=json.dumps(combined_report, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error generating report: {str(e)}")]
# Lambda Optimization Runbook Functions
async def run_lambda_optimization_analysis(arguments: Dict[str, Any]) -> List[TextContent]:
"""Run comprehensive Lambda optimization analysis."""
try:
result = get_underutilized_lambda_functions(
region=arguments.get("region"),
lookback_period_days=arguments.get("lookback_period_days", 14),
memory_utilization_threshold=arguments.get("memory_utilization_threshold", 50.0),
min_invocations=arguments.get("min_invocations", 100)
)
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def identify_unused_lambda_functions(arguments: Dict[str, Any]) -> List[TextContent]:
"""Identify unused Lambda functions."""
try:
from playbooks.lambda_optimization import identify_unused_lambda_functions as get_unused_lambda
result = get_unused_lambda(
region=arguments.get("region"),
lookback_period_days=arguments.get("lookback_period_days", 30),
max_invocations=arguments.get("max_invocations", 5)
)
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def generate_lambda_optimization_report(arguments: Dict[str, Any]) -> List[TextContent]:
"""Generate detailed Lambda optimization report."""
try:
region = arguments.get("region")
# Get data from playbooks
from playbooks.lambda_optimization import identify_unused_lambda_functions as get_unused_lambda
optimization_result = get_underutilized_lambda_functions(region=region)
unused_result = get_unused_lambda(region=region)
combined_report = {
"status": "success",
"report_type": "Lambda Comprehensive Optimization Report",
"region": region or "default",
"optimization_analysis": optimization_result,
"unused_functions_analysis": unused_result,
"summary": {
"functions_with_usage": optimization_result.get("data", {}).get("count", 0),
"unused_functions": unused_result.get("data", {}).get("count", 0)
}
}
return [TextContent(type="text", text=json.dumps(combined_report, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error generating report: {str(e)}")]
async def run_comprehensive_cost_analysis(arguments: Dict[str, Any]) -> List[TextContent]:
"""Run comprehensive cost analysis across all services."""
try:
region = arguments.get("region")
services = arguments.get("services", ["ec2", "ebs", "rds", "lambda", "cloudtrail"])
lookback_period_days = arguments.get("lookback_period_days", 14)
comprehensive_report = {
"status": "success",
"report_type": "Comprehensive Cost Analysis Report",
"region": region or "default",
"services_analyzed": services,
"analyses": {}
}
# Run analyses using playbook functions
if "ec2" in services:
try:
comprehensive_report["analyses"]["ec2"] = get_underutilized_instances(
region=region, lookback_period_days=lookback_period_days
)
except Exception as e:
comprehensive_report["analyses"]["ec2"] = {"error": str(e)}
if "ebs" in services:
try:
comprehensive_report["analyses"]["ebs"] = {
"optimization": get_underutilized_volumes(region=region),
"unused_volumes": identify_unused_volumes(region=region)
}
except Exception as e:
comprehensive_report["analyses"]["ebs"] = {"error": str(e)}
if "rds" in services:
try:
from playbooks.rds_optimization import identify_idle_rds_instances as get_idle_rds
comprehensive_report["analyses"]["rds"] = {
"optimization": get_underutilized_rds_instances(region=region),
"idle_instances": get_idle_rds(region=region)
}
except Exception as e:
comprehensive_report["analyses"]["rds"] = {"error": str(e)}
if "lambda" in services:
try:
from playbooks.lambda_optimization import identify_unused_lambda_functions as get_unused_lambda
comprehensive_report["analyses"]["lambda"] = {
"optimization": get_underutilized_lambda_functions(region=region),
"unused_functions": get_unused_lambda(region=region)
}
except Exception as e:
comprehensive_report["analyses"]["lambda"] = {"error": str(e)}
if "cloudtrail" in services:
try:
from playbooks.cloudtrail_optimization import run_cloudtrail_optimization
comprehensive_report["analyses"]["cloudtrail"] = run_cloudtrail_optimization(region=region)
except Exception as e:
comprehensive_report["analyses"]["cloudtrail"] = {"error": str(e)}
return [TextContent(type="text", text=json.dumps(comprehensive_report, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error running comprehensive analysis: {str(e)}")]
# Additional EC2 runbook functions
async def identify_stopped_ec2_instances(arguments: Dict[str, Any]) -> List[TextContent]:
"""Identify stopped EC2 instances."""
try:
result = get_stopped_instances(
region=arguments.get("region"),
min_stopped_days=arguments.get("min_stopped_days", 7)
)
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def identify_unattached_elastic_ips(arguments: Dict[str, Any]) -> List[TextContent]:
"""Identify unattached Elastic IPs."""
try:
result = get_unattached_elastic_ips(
region=arguments.get("region")
)
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def identify_old_generation_instances(arguments: Dict[str, Any]) -> List[TextContent]:
"""Identify old generation instances."""
try:
result = get_old_generation_instances(
region=arguments.get("region")
)
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def identify_instances_without_monitoring(arguments: Dict[str, Any]) -> List[TextContent]:
"""Identify instances without detailed monitoring."""
try:
result = get_instances_without_detailed_monitoring(
region=arguments.get("region")
)
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
# CloudTrail optimization functions
async def get_management_trails(arguments: Dict[str, Any]) -> List[TextContent]:
"""Get CloudTrail management trails."""
try:
from playbooks.cloudtrail_optimization import get_management_trails as get_trails
result = get_trails(region=arguments.get("region", "us-east-1"))
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def run_cloudtrail_trails_analysis(arguments: Dict[str, Any]) -> List[TextContent]:
"""Run CloudTrail trails analysis."""
try:
from playbooks.cloudtrail_optimization import run_cloudtrail_optimization as analyze_trails
result = analyze_trails(region=arguments.get("region", "us-east-1"))
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
async def generate_cloudtrail_report(arguments: Dict[str, Any]) -> List[TextContent]:
"""Generate CloudTrail optimization report."""
try:
from playbooks.cloudtrail_optimization import generate_cloudtrail_report as gen_report
result = gen_report(
region=arguments.get("region", "us-east-1"),
output_format=arguments.get("output_format", "json")
)
return [TextContent(type="text", text=json.dumps(result, indent=2, default=str) if isinstance(result, dict) else result)]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
```
--------------------------------------------------------------------------------
/tests/unit/analyzers/test_base_analyzer.py:
--------------------------------------------------------------------------------
```python
"""
Unit tests for BaseAnalyzer class.
Tests the abstract base class functionality including parameter validation,
error handling, recommendation creation, and performance monitoring integration.
"""
import pytest
import asyncio
from unittest.mock import Mock, AsyncMock, patch
from datetime import datetime
from typing import Dict, List, Any
from playbooks.s3.base_analyzer import BaseAnalyzer, AnalyzerRegistry, get_analyzer_registry
class MockAnalyzer(BaseAnalyzer):
"""Concrete implementation of BaseAnalyzer for testing."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.analysis_type = "test_analyzer"
async def analyze(self, **kwargs) -> Dict[str, Any]:
"""Test implementation of analyze method."""
return {
"status": "success",
"data": {"test_metric": 100},
"execution_time": 1.0
}
def get_recommendations(self, analysis_results: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Test implementation of get_recommendations method."""
return [
{
"type": "test_recommendation",
"priority": "high",
"title": "Test Recommendation",
"description": "This is a test recommendation"
}
]
class MockFailingAnalyzer(BaseAnalyzer):
"""Analyzer that fails for testing error handling."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.analysis_type = "failing_analyzer"
async def analyze(self, **kwargs) -> Dict[str, Any]:
"""Failing implementation for testing."""
raise ValueError("Test error for error handling")
def get_recommendations(self, analysis_results: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Test implementation."""
return []
@pytest.mark.unit
class TestBaseAnalyzer:
"""Test cases for BaseAnalyzer class."""
def test_analyzer_initialization(self, mock_s3_service, mock_pricing_service,
mock_storage_lens_service, mock_performance_monitor,
mock_memory_manager):
"""Test analyzer initialization with all services."""
analyzer = MockAnalyzer(
s3_service=mock_s3_service,
pricing_service=mock_pricing_service,
storage_lens_service=mock_storage_lens_service,
performance_monitor=mock_performance_monitor,
memory_manager=mock_memory_manager
)
assert analyzer.s3_service == mock_s3_service
assert analyzer.pricing_service == mock_pricing_service
assert analyzer.storage_lens_service == mock_storage_lens_service
assert analyzer.performance_monitor == mock_performance_monitor
assert analyzer.memory_manager == mock_memory_manager
assert analyzer.analysis_type == "test_analyzer"
assert analyzer.version == "1.0.0"
assert analyzer.execution_count == 0
assert analyzer.last_execution is None
def test_analyzer_initialization_minimal(self):
"""Test analyzer initialization with minimal parameters."""
analyzer = MockAnalyzer()
assert analyzer.s3_service is None
assert analyzer.pricing_service is None
assert analyzer.storage_lens_service is None
assert analyzer.analysis_type == "test_analyzer"
def test_validate_parameters_valid(self):
"""Test parameter validation with valid parameters."""
analyzer = MockAnalyzer()
validation = analyzer.validate_parameters(
region="us-east-1",
lookback_days=30,
bucket_names=["bucket1", "bucket2"],
timeout_seconds=60
)
assert validation["valid"] is True
assert len(validation["errors"]) == 0
assert len(validation["warnings"]) == 0
def test_validate_parameters_invalid_region(self):
"""Test parameter validation with invalid region."""
analyzer = MockAnalyzer()
validation = analyzer.validate_parameters(region=123)
assert validation["valid"] is False
assert "Region must be a string" in validation["errors"]
def test_validate_parameters_invalid_lookback_days(self):
"""Test parameter validation with invalid lookback_days."""
analyzer = MockAnalyzer()
validation = analyzer.validate_parameters(lookback_days=-5)
assert validation["valid"] is False
assert "lookback_days must be a positive integer" in validation["errors"]
def test_validate_parameters_large_lookback_days(self):
"""Test parameter validation with large lookback_days."""
analyzer = MockAnalyzer()
validation = analyzer.validate_parameters(lookback_days=400)
assert validation["valid"] is True
assert "lookback_days > 365 may result in large datasets" in validation["warnings"]
def test_validate_parameters_invalid_bucket_names(self):
"""Test parameter validation with invalid bucket_names."""
analyzer = MockAnalyzer()
validation = analyzer.validate_parameters(bucket_names="not_a_list")
assert validation["valid"] is False
assert "bucket_names must be a list" in validation["errors"]
def test_validate_parameters_invalid_bucket_name_types(self):
"""Test parameter validation with invalid bucket name types."""
analyzer = MockAnalyzer()
validation = analyzer.validate_parameters(bucket_names=["bucket1", 123, "bucket2"])
assert validation["valid"] is False
assert "All bucket names must be strings" in validation["errors"]
def test_validate_parameters_invalid_timeout(self):
"""Test parameter validation with invalid timeout."""
analyzer = MockAnalyzer()
validation = analyzer.validate_parameters(timeout_seconds=-10)
assert validation["valid"] is False
assert "timeout_seconds must be a positive number" in validation["errors"]
def test_prepare_analysis_context(self):
"""Test analysis context preparation."""
analyzer = MockAnalyzer()
context = analyzer.prepare_analysis_context(
region="us-west-2",
session_id="test_session",
lookback_days=14,
bucket_names=["test-bucket"]
)
assert context["analysis_type"] == "test_analyzer"
assert context["analyzer_version"] == "1.0.0"
assert context["region"] == "us-west-2"
assert context["session_id"] == "test_session"
assert context["lookback_days"] == 14
assert context["bucket_names"] == ["test-bucket"]
assert context["timeout_seconds"] == 60 # default
assert "started_at" in context
assert "execution_id" in context
def test_handle_analysis_error(self):
"""Test error handling functionality."""
analyzer = MockAnalyzer()
context = {"analysis_type": "test_analyzer", "session_id": "test"}
error = ValueError("Test error message")
error_result = analyzer.handle_analysis_error(error, context)
assert error_result["status"] == "error"
assert error_result["analysis_type"] == "test_analyzer"
assert error_result["error_message"] == "Test error message"
assert error_result["error_type"] == "ValueError"
assert error_result["context"] == context
assert "timestamp" in error_result
assert len(error_result["recommendations"]) == 1
assert error_result["recommendations"][0]["type"] == "error_resolution"
def test_create_recommendation_minimal(self):
"""Test recommendation creation with minimal parameters."""
analyzer = MockAnalyzer()
recommendation = analyzer.create_recommendation(
rec_type="cost_optimization",
priority="high",
title="Test Recommendation",
description="Test description"
)
assert recommendation["type"] == "cost_optimization"
assert recommendation["priority"] == "high"
assert recommendation["title"] == "Test Recommendation"
assert recommendation["description"] == "Test description"
assert recommendation["implementation_effort"] == "medium" # default
assert recommendation["analyzer"] == "test_analyzer"
assert "created_at" in recommendation
def test_create_recommendation_full(self):
"""Test recommendation creation with all parameters."""
analyzer = MockAnalyzer()
recommendation = analyzer.create_recommendation(
rec_type="governance",
priority="medium",
title="Full Recommendation",
description="Full description",
potential_savings=100.50,
implementation_effort="low",
affected_resources=["bucket1", "bucket2"],
action_items=["Action 1", "Action 2"]
)
assert recommendation["type"] == "governance"
assert recommendation["priority"] == "medium"
assert recommendation["potential_savings"] == 100.50
assert recommendation["potential_savings_formatted"] == "$100.50"
assert recommendation["implementation_effort"] == "low"
assert recommendation["affected_resources"] == ["bucket1", "bucket2"]
assert recommendation["resource_count"] == 2
assert recommendation["action_items"] == ["Action 1", "Action 2"]
def test_log_analysis_start(self, mock_performance_monitor):
"""Test analysis start logging."""
analyzer = MockAnalyzer(performance_monitor=mock_performance_monitor)
context = {"analysis_type": "test_analyzer"}
analyzer.log_analysis_start(context)
assert analyzer.execution_count == 1
assert analyzer.last_execution is not None
mock_performance_monitor.record_metric.assert_called_once()
def test_log_analysis_complete(self, mock_performance_monitor):
"""Test analysis completion logging."""
analyzer = MockAnalyzer(performance_monitor=mock_performance_monitor)
context = {"analysis_type": "test_analyzer"}
result = {
"status": "success",
"execution_time": 5.0,
"recommendations": [{"type": "test"}]
}
analyzer.log_analysis_complete(context, result)
# Should record 3 metrics: completed, execution_time, recommendations
assert mock_performance_monitor.record_metric.call_count == 3
def test_get_analyzer_info(self, mock_s3_service, mock_pricing_service):
"""Test analyzer info retrieval."""
analyzer = MockAnalyzer(
s3_service=mock_s3_service,
pricing_service=mock_pricing_service
)
analyzer.execution_count = 5
analyzer.last_execution = datetime.now()
info = analyzer.get_analyzer_info()
assert info["analysis_type"] == "test_analyzer"
assert info["class_name"] == "MockAnalyzer"
assert info["version"] == "1.0.0"
assert info["execution_count"] == 5
assert info["last_execution"] is not None
assert info["services"]["s3_service"] is True
assert info["services"]["pricing_service"] is True
assert info["services"]["storage_lens_service"] is False
@pytest.mark.asyncio
async def test_execute_with_error_handling_success(self, mock_memory_manager):
"""Test successful execution with error handling."""
analyzer = MockAnalyzer(memory_manager=mock_memory_manager)
result = await analyzer.execute_with_error_handling(
region="us-east-1",
lookback_days=30
)
assert result["status"] == "success"
assert result["analysis_type"] == "test_analyzer"
assert "timestamp" in result
assert "recommendations" in result
assert len(result["recommendations"]) == 1
# Verify memory tracking was called
mock_memory_manager.start_memory_tracking.assert_called_once()
mock_memory_manager.stop_memory_tracking.assert_called_once()
@pytest.mark.asyncio
async def test_execute_with_error_handling_validation_failure(self):
"""Test execution with parameter validation failure."""
analyzer = MockAnalyzer()
result = await analyzer.execute_with_error_handling(
region=123, # Invalid region
lookback_days=-5 # Invalid lookback_days
)
assert result["status"] == "error"
assert result["error_message"] == "Parameter validation failed"
assert "validation_errors" in result
assert len(result["validation_errors"]) == 2
@pytest.mark.asyncio
async def test_execute_with_error_handling_analysis_failure(self):
"""Test execution with analysis failure."""
analyzer = MockFailingAnalyzer()
result = await analyzer.execute_with_error_handling(
region="us-east-1",
lookback_days=30
)
assert result["status"] == "error"
assert result["error_message"] == "Test error for error handling"
assert result["error_type"] == "ValueError"
assert "recommendations" in result
assert len(result["recommendations"]) == 1
assert result["recommendations"][0]["type"] == "error_resolution"
@pytest.mark.unit
class TestAnalyzerRegistry:
"""Test cases for AnalyzerRegistry class."""
def test_registry_initialization(self):
"""Test registry initialization."""
registry = AnalyzerRegistry()
assert len(registry._analyzers) == 0
assert registry.list_analyzers() == []
def test_register_analyzer(self):
"""Test analyzer registration."""
registry = AnalyzerRegistry()
analyzer = MockAnalyzer()
registry.register(analyzer)
assert len(registry._analyzers) == 1
assert "test_analyzer" in registry._analyzers
assert registry.get("test_analyzer") == analyzer
def test_register_multiple_analyzers(self):
"""Test registering multiple analyzers."""
registry = AnalyzerRegistry()
analyzer1 = MockAnalyzer()
analyzer2 = MockFailingAnalyzer()
registry.register(analyzer1)
registry.register(analyzer2)
assert len(registry._analyzers) == 2
assert set(registry.list_analyzers()) == {"test_analyzer", "failing_analyzer"}
def test_get_nonexistent_analyzer(self):
"""Test getting non-existent analyzer."""
registry = AnalyzerRegistry()
result = registry.get("nonexistent")
assert result is None
def test_get_analyzer_info(self):
"""Test getting analyzer info from registry."""
registry = AnalyzerRegistry()
analyzer = MockAnalyzer()
registry.register(analyzer)
info = registry.get_analyzer_info()
assert "test_analyzer" in info
assert info["test_analyzer"]["class_name"] == "MockAnalyzer"
assert info["test_analyzer"]["analysis_type"] == "test_analyzer"
def test_global_registry(self):
"""Test global registry access."""
global_registry = get_analyzer_registry()
assert isinstance(global_registry, AnalyzerRegistry)
# Test that multiple calls return the same instance
registry2 = get_analyzer_registry()
assert global_registry is registry2
@pytest.mark.unit
class TestAnalyzerPerformanceIntegration:
"""Test performance monitoring integration in analyzers."""
@pytest.mark.asyncio
async def test_performance_monitoring_integration(self, mock_performance_monitor,
mock_memory_manager):
"""Test that analyzers properly integrate with performance monitoring."""
analyzer = MockAnalyzer(
performance_monitor=mock_performance_monitor,
memory_manager=mock_memory_manager
)
result = await analyzer.execute_with_error_handling(
region="us-east-1",
lookback_days=30
)
assert result["status"] == "success"
# Verify performance monitoring calls
mock_performance_monitor.record_metric.assert_called()
# Verify memory management calls
mock_memory_manager.start_memory_tracking.assert_called_once()
mock_memory_manager.stop_memory_tracking.assert_called_once()
mock_memory_manager.register_large_object.assert_called_once()
@pytest.mark.asyncio
async def test_memory_stats_in_result(self, mock_memory_manager):
"""Test that memory statistics are included in results."""
mock_memory_manager.stop_memory_tracking.return_value = {
"peak_memory_mb": 75.5,
"avg_memory_mb": 45.2
}
analyzer = MockAnalyzer(memory_manager=mock_memory_manager)
result = await analyzer.execute_with_error_handling(
region="us-east-1",
lookback_days=30
)
assert result["status"] == "success"
assert "memory_usage" in result
assert result["memory_usage"]["peak_memory_mb"] == 75.5
assert result["memory_usage"]["avg_memory_mb"] == 45.2
def test_analyzer_without_performance_components(self):
"""Test analyzer behavior without performance monitoring components."""
analyzer = MockAnalyzer()
# Should not raise errors
context = {"analysis_type": "test"}
analyzer.log_analysis_start(context)
analyzer.log_analysis_complete(context, {"status": "success", "execution_time": 1.0, "recommendations": []})
info = analyzer.get_analyzer_info()
assert info["services"]["s3_service"] is False
assert info["services"]["pricing_service"] is False
assert info["services"]["storage_lens_service"] is False
```