This is page 2 of 19. Use http://codebase.md/aws-samples/sample-cfm-tips-mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .gitignore
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── diagnose_cost_optimization_hub_v2.py
├── LICENSE
├── logging_config.py
├── mcp_runbooks.json
├── mcp_server_with_runbooks.py
├── playbooks
│ ├── __init__.py
│ ├── aws_lambda
│ │ ├── __init__.py
│ │ └── lambda_optimization.py
│ ├── cloudtrail
│ │ ├── __init__.py
│ │ └── cloudtrail_optimization.py
│ ├── cloudtrail_optimization.py
│ ├── cloudwatch
│ │ ├── __init__.py
│ │ ├── aggregation_queries.py
│ │ ├── alarms_and_dashboards_analyzer.py
│ │ ├── analysis_engine.py
│ │ ├── base_analyzer.py
│ │ ├── cloudwatch_optimization_analyzer.py
│ │ ├── cloudwatch_optimization_tool.py
│ │ ├── cloudwatch_optimization.py
│ │ ├── cost_controller.py
│ │ ├── general_spend_analyzer.py
│ │ ├── logs_optimization_analyzer.py
│ │ ├── metrics_optimization_analyzer.py
│ │ ├── optimization_orchestrator.py
│ │ └── result_processor.py
│ ├── comprehensive_optimization.py
│ ├── ebs
│ │ ├── __init__.py
│ │ └── ebs_optimization.py
│ ├── ebs_optimization.py
│ ├── ec2
│ │ ├── __init__.py
│ │ └── ec2_optimization.py
│ ├── ec2_optimization.py
│ ├── lambda_optimization.py
│ ├── rds
│ │ ├── __init__.py
│ │ └── rds_optimization.py
│ ├── rds_optimization.py
│ └── s3
│ ├── __init__.py
│ ├── analyzers
│ │ ├── __init__.py
│ │ ├── api_cost_analyzer.py
│ │ ├── archive_optimization_analyzer.py
│ │ ├── general_spend_analyzer.py
│ │ ├── governance_analyzer.py
│ │ ├── multipart_cleanup_analyzer.py
│ │ └── storage_class_analyzer.py
│ ├── base_analyzer.py
│ ├── s3_aggregation_queries.py
│ ├── s3_analysis_engine.py
│ ├── s3_comprehensive_optimization_tool.py
│ ├── s3_optimization_orchestrator.py
│ └── s3_optimization.py
├── README.md
├── requirements.txt
├── runbook_functions_extended.py
├── runbook_functions.py
├── RUNBOOKS_GUIDE.md
├── services
│ ├── __init__.py
│ ├── cloudwatch_pricing.py
│ ├── cloudwatch_service_vended_log.py
│ ├── cloudwatch_service.py
│ ├── compute_optimizer.py
│ ├── cost_explorer.py
│ ├── optimization_hub.py
│ ├── performance_insights.py
│ ├── pricing.py
│ ├── s3_pricing.py
│ ├── s3_service.py
│ ├── storage_lens_service.py
│ └── trusted_advisor.py
├── setup.py
├── test_runbooks.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── cloudwatch
│ │ │ └── test_cloudwatch_integration.py
│ │ ├── test_cloudwatch_comprehensive_tool_integration.py
│ │ ├── test_cloudwatch_orchestrator_integration.py
│ │ ├── test_integration_suite.py
│ │ └── test_orchestrator_integration.py
│ ├── legacy
│ │ ├── example_output_with_docs.py
│ │ ├── example_wellarchitected_output.py
│ │ ├── test_aws_session_management.py
│ │ ├── test_cloudwatch_orchestrator_pagination.py
│ │ ├── test_cloudwatch_pagination_integration.py
│ │ ├── test_cloudwatch_performance_optimizations.py
│ │ ├── test_cloudwatch_result_processor.py
│ │ ├── test_cloudwatch_timeout_issue.py
│ │ ├── test_documentation_links.py
│ │ ├── test_metrics_pagination_count.py
│ │ ├── test_orchestrator_integration.py
│ │ ├── test_pricing_cache_fix_moved.py
│ │ ├── test_pricing_cache_fix.py
│ │ ├── test_runbook_integration.py
│ │ ├── test_runbooks.py
│ │ ├── test_setup_verification.py
│ │ └── test_stack_trace_fix.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── cloudwatch
│ │ │ └── test_cloudwatch_performance.py
│ │ ├── test_cloudwatch_parallel_execution.py
│ │ ├── test_parallel_execution.py
│ │ └── test_performance_suite.py
│ ├── pytest-cloudwatch.ini
│ ├── pytest.ini
│ ├── README.md
│ ├── requirements-test.txt
│ ├── run_cloudwatch_tests.py
│ ├── run_tests.py
│ ├── test_setup_verification.py
│ ├── test_suite_main.py
│ └── unit
│ ├── __init__.py
│ ├── analyzers
│ │ ├── __init__.py
│ │ ├── conftest_cloudwatch.py
│ │ ├── test_alarms_and_dashboards_analyzer.py
│ │ ├── test_base_analyzer.py
│ │ ├── test_cloudwatch_base_analyzer.py
│ │ ├── test_cloudwatch_cost_constraints.py
│ │ ├── test_cloudwatch_general_spend_analyzer.py
│ │ ├── test_general_spend_analyzer.py
│ │ ├── test_logs_optimization_analyzer.py
│ │ └── test_metrics_optimization_analyzer.py
│ ├── cloudwatch
│ │ ├── test_cache_control.py
│ │ ├── test_cloudwatch_api_mocking.py
│ │ ├── test_cloudwatch_metrics_pagination.py
│ │ ├── test_cloudwatch_pagination_architecture.py
│ │ ├── test_cloudwatch_pagination_comprehensive_fixed.py
│ │ ├── test_cloudwatch_pagination_comprehensive.py
│ │ ├── test_cloudwatch_pagination_fixed.py
│ │ ├── test_cloudwatch_pagination_real_format.py
│ │ ├── test_cloudwatch_pagination_simple.py
│ │ ├── test_cloudwatch_query_pagination.py
│ │ ├── test_cloudwatch_unit_suite.py
│ │ ├── test_general_spend_tips_refactor.py
│ │ ├── test_import_error.py
│ │ ├── test_mcp_pagination_bug.py
│ │ └── test_mcp_surface_pagination.py
│ ├── s3
│ │ └── live
│ │ ├── test_bucket_listing.py
│ │ ├── test_s3_governance_bucket_discovery.py
│ │ └── test_top_buckets.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── test_cloudwatch_cost_controller.py
│ │ ├── test_cloudwatch_query_service.py
│ │ ├── test_cloudwatch_service.py
│ │ ├── test_cost_control_routing.py
│ │ └── test_s3_service.py
│ └── test_unit_suite.py
└── utils
├── __init__.py
├── aws_client_factory.py
├── cache_decorator.py
├── cleanup_manager.py
├── cloudwatch_cache.py
├── documentation_links.py
├── error_handler.py
├── intelligent_cache.py
├── logging_config.py
├── memory_manager.py
├── parallel_executor.py
├── performance_monitor.py
├── progressive_timeout.py
├── service_orchestrator.py
└── session_manager.py
```
# Files
--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_cloudwatch_metrics_pagination.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Unit tests for CloudWatch metrics pagination functionality.
4 | """
5 |
6 | import pytest
7 | import sys
8 | import os
9 | from unittest.mock import patch, MagicMock
10 |
11 | # Add the project root to the path
12 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))
13 |
14 | from playbooks.cloudwatch.result_processor import CloudWatchResultProcessor
15 |
16 |
17 | class TestCloudWatchMetricsPagination:
18 | """Unit tests for CloudWatch metrics pagination functionality."""
19 |
20 | def test_metrics_pagination_with_large_dataset(self):
21 | """Test that metrics pagination correctly limits results to 10 items per page."""
22 | processor = CloudWatchResultProcessor()
23 |
24 | # Create 25 metrics to test pagination (should result in 3 pages)
25 | metrics = []
26 | for i in range(25):
27 | metric = {
28 | 'MetricName': f'CustomMetric{i:02d}',
29 | 'Namespace': f'MyApp/Service{i % 3}', # Mix of namespaces
30 | 'Dimensions': [
31 | {'Name': 'InstanceId', 'Value': f'i-{i:010d}'},
32 | {'Name': 'Environment', 'Value': 'production' if i % 2 == 0 else 'staging'}
33 | ]
34 | }
35 | metrics.append(metric)
36 |
37 | # Test page 1 - should have exactly 10 items
38 | result_p1 = processor.process_metrics_results(metrics, page=1)
39 |
40 | assert len(result_p1['items']) == 10, f"Page 1 should have exactly 10 items, got {len(result_p1['items'])}"
41 | assert result_p1['pagination']['current_page'] == 1
42 | assert result_p1['pagination']['total_items'] == 25
43 | assert result_p1['pagination']['total_pages'] == 3
44 | assert result_p1['pagination']['has_next_page'] is True
45 | assert result_p1['pagination']['has_previous_page'] is False
46 |
47 | # Test page 2 - should have exactly 10 items
48 | result_p2 = processor.process_metrics_results(metrics, page=2)
49 |
50 | assert len(result_p2['items']) == 10, f"Page 2 should have exactly 10 items, got {len(result_p2['items'])}"
51 | assert result_p2['pagination']['current_page'] == 2
52 | assert result_p2['pagination']['has_next_page'] is True
53 | assert result_p2['pagination']['has_previous_page'] is True
54 |
55 | # Test page 3 - should have exactly 5 items (remainder)
56 | result_p3 = processor.process_metrics_results(metrics, page=3)
57 |
58 | assert len(result_p3['items']) == 5, f"Page 3 should have exactly 5 items, got {len(result_p3['items'])}"
59 | assert result_p3['pagination']['current_page'] == 3
60 | assert result_p3['pagination']['has_next_page'] is False
61 | assert result_p3['pagination']['has_previous_page'] is True
62 |
63 | # Verify dimensions are preserved (not truncated)
64 | for item in result_p1['items']:
65 | assert 'Dimensions' in item, "Dimensions should be preserved"
66 | assert len(item['Dimensions']) == 2, "All dimensions should be preserved"
67 |
68 | def test_metrics_cost_sorting(self):
69 | """Test that metrics are sorted by cost (custom metrics first)."""
70 | processor = CloudWatchResultProcessor()
71 |
72 | # Create mix of AWS and custom metrics
73 | metrics = [
74 | {'MetricName': 'CPUUtilization', 'Namespace': 'AWS/EC2'}, # Free
75 | {'MetricName': 'CustomMetric1', 'Namespace': 'MyApp/Performance'}, # $0.30
76 | {'MetricName': 'NetworkIn', 'Namespace': 'AWS/EC2'}, # Free
77 | {'MetricName': 'CustomMetric2', 'Namespace': 'MyApp/Business'}, # $0.30
78 | ]
79 |
80 | result = processor.process_metrics_results(metrics, page=1)
81 | items = result['items']
82 |
83 | # Custom metrics should be first (higher cost)
84 | assert items[0]['Namespace'] in ['MyApp/Performance', 'MyApp/Business']
85 | assert items[0]['estimated_monthly_cost'] == 0.30
86 | assert items[1]['Namespace'] in ['MyApp/Performance', 'MyApp/Business']
87 | assert items[1]['estimated_monthly_cost'] == 0.30
88 |
89 | # AWS metrics should be last (free)
90 | assert items[2]['Namespace'] == 'AWS/EC2'
91 | assert items[2]['estimated_monthly_cost'] == 0.0
92 | assert items[3]['Namespace'] == 'AWS/EC2'
93 | assert items[3]['estimated_monthly_cost'] == 0.0
94 |
95 | def test_metrics_dimensions_preservation(self):
96 | """Test that metric dimensions are fully preserved, not truncated."""
97 | processor = CloudWatchResultProcessor()
98 |
99 | # Create metric with many dimensions
100 | metrics = [{
101 | 'MetricName': 'ComplexMetric',
102 | 'Namespace': 'MyApp/Complex',
103 | 'Dimensions': [
104 | {'Name': 'InstanceId', 'Value': 'i-1234567890abcdef0'},
105 | {'Name': 'Environment', 'Value': 'production'},
106 | {'Name': 'Service', 'Value': 'web-server'},
107 | {'Name': 'Region', 'Value': 'us-east-1'},
108 | {'Name': 'AZ', 'Value': 'us-east-1a'},
109 | {'Name': 'Version', 'Value': 'v2.1.3'},
110 | {'Name': 'Team', 'Value': 'platform-engineering'},
111 | ]
112 | }]
113 |
114 | result = processor.process_metrics_results(metrics, page=1)
115 | item = result['items'][0]
116 |
117 | # All dimensions should be preserved
118 | assert len(item['Dimensions']) == 7, f"Expected 7 dimensions, got {len(item['Dimensions'])}"
119 |
120 | # Verify specific dimensions are present
121 | dimension_names = [d['Name'] for d in item['Dimensions']]
122 | expected_names = ['InstanceId', 'Environment', 'Service', 'Region', 'AZ', 'Version', 'Team']
123 |
124 | for expected_name in expected_names:
125 | assert expected_name in dimension_names, f"Dimension {expected_name} should be preserved"
126 |
127 | def test_empty_metrics_pagination(self):
128 | """Test pagination with empty metrics list."""
129 | processor = CloudWatchResultProcessor()
130 |
131 | result = processor.process_metrics_results([], page=1)
132 |
133 | assert len(result['items']) == 0
134 | assert result['pagination']['current_page'] == 1
135 | assert result['pagination']['total_items'] == 0
136 | assert result['pagination']['total_pages'] == 0
137 | assert result['pagination']['has_next_page'] is False
138 | assert result['pagination']['has_previous_page'] is False
139 |
140 | def test_single_page_metrics(self):
141 | """Test pagination with metrics that fit in a single page."""
142 | processor = CloudWatchResultProcessor()
143 |
144 | # Create 5 metrics (less than page size of 10)
145 | metrics = [
146 | {'MetricName': f'Metric{i}', 'Namespace': 'MyApp/Test'}
147 | for i in range(5)
148 | ]
149 |
150 | result = processor.process_metrics_results(metrics, page=1)
151 |
152 | assert len(result['items']) == 5
153 | assert result['pagination']['current_page'] == 1
154 | assert result['pagination']['total_items'] == 5
155 | assert result['pagination']['total_pages'] == 1
156 | assert result['pagination']['has_next_page'] is False
157 | assert result['pagination']['has_previous_page'] is False
158 |
159 |
160 | if __name__ == '__main__':
161 | pytest.main([__file__, '-v'])
```
--------------------------------------------------------------------------------
/tests/legacy/test_runbooks.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Test script for CFM Tips AWS Cost Optimization MCP Server
4 | """
5 |
6 | import sys
7 | import os
8 |
9 | # Add current directory to path
10 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
11 |
12 | def test_imports():
13 | """Test that all imports work correctly."""
14 | print("Testing imports...")
15 |
16 | try:
17 | # Test MCP server imports
18 | from mcp.server import Server
19 | from mcp.server.stdio import stdio_server
20 | from mcp.types import Tool, TextContent
21 | print("✅ MCP imports successful")
22 |
23 | # Test AWS imports
24 | import boto3
25 | from botocore.exceptions import ClientError, NoCredentialsError
26 | print("✅ AWS imports successful")
27 |
28 | # Test runbook functions import
29 | # Import functions directly from playbooks
30 | from playbooks.ec2.ec2_optimization import (
31 | run_ec2_right_sizing_analysis,
32 | generate_ec2_right_sizing_report
33 | )
34 | from playbooks.ebs.ebs_optimization import (
35 | run_ebs_optimization_analysis,
36 | identify_unused_ebs_volumes,
37 | generate_ebs_optimization_report_mcp as generate_ebs_optimization_report
38 | )
39 | from playbooks.rds.rds_optimization import (
40 | run_rds_optimization_analysis,
41 | identify_idle_rds_instances_wrapper as identify_idle_rds_instances,
42 | generate_rds_optimization_report
43 | )
44 | from playbooks.aws_lambda.lambda_optimization import (
45 | run_lambda_optimization_analysis,
46 | identify_unused_lambda_functions_mcp as identify_unused_lambda_functions,
47 | generate_lambda_optimization_report
48 | )
49 | from playbooks.comprehensive_optimization import run_comprehensive_cost_analysis
50 | from playbooks.cloudtrail.cloudtrail_optimization import (
51 | get_management_trails_mcp as get_management_trails,
52 | run_cloudtrail_trails_analysis_mcp as run_cloudtrail_trails_analysis,
53 | generate_cloudtrail_report_mcp as generate_cloudtrail_report
54 | )
55 | print("✅ Runbook functions import successful")
56 |
57 | return True
58 |
59 | except ImportError as e:
60 | print(f"❌ Import error: {e}")
61 | return False
62 | except Exception as e:
63 | print(f"❌ Unexpected error: {e}")
64 | return False
65 |
66 | def test_server_creation():
67 | """Test that the MCP server can be created."""
68 | print("\nTesting server creation...")
69 |
70 | try:
71 | # Import the server module
72 | import mcp_server_with_runbooks
73 | print("✅ Server module imported successfully")
74 |
75 | # Check if server is created
76 | if hasattr(mcp_server_with_runbooks, 'server'):
77 | print("✅ Server object created successfully")
78 |
79 | # Check server name
80 | if mcp_server_with_runbooks.server.name == "cfm_tips":
81 | print("✅ Server name is correct: cfm_tips")
82 | else:
83 | print(f"⚠️ Server name: {mcp_server_with_runbooks.server.name}")
84 |
85 | return True
86 | else:
87 | print("❌ Server object not found")
88 | return False
89 |
90 | except Exception as e:
91 | print(f"❌ Server creation error: {str(e)}")
92 | return False
93 |
94 | def test_cloudtrail_functions():
95 | """Test CloudTrail optimization functions."""
96 | print("\nTesting CloudTrail functions...")
97 |
98 | try:
99 | from playbooks.cloudtrail.cloudtrail_optimization import (
100 | get_management_trails_mcp as get_management_trails,
101 | run_cloudtrail_trails_analysis_mcp as run_cloudtrail_trails_analysis,
102 | generate_cloudtrail_report_mcp as generate_cloudtrail_report
103 | )
104 | print("✅ CloudTrail functions imported successfully")
105 |
106 | # Test function signatures
107 | import inspect
108 |
109 | # Check get_management_trails
110 | sig = inspect.signature(get_management_trails)
111 | if 'arguments' in sig.parameters:
112 | print("✅ get_management_trails has correct signature")
113 | else:
114 | print("❌ get_management_trails signature incorrect")
115 | return False
116 |
117 | # Check run_cloudtrail_trails_analysis
118 | sig = inspect.signature(run_cloudtrail_trails_analysis)
119 | if 'arguments' in sig.parameters:
120 | print("✅ run_cloudtrail_trails_analysis has correct signature")
121 | else:
122 | print("❌ run_cloudtrail_trails_analysis signature incorrect")
123 | return False
124 |
125 | # Check generate_cloudtrail_report
126 | sig = inspect.signature(generate_cloudtrail_report)
127 | if 'arguments' in sig.parameters:
128 | print("✅ generate_cloudtrail_report has correct signature")
129 | else:
130 | print("❌ generate_cloudtrail_report signature incorrect")
131 | return False
132 |
133 | return True
134 |
135 | except ImportError as e:
136 | print(f"❌ CloudTrail import error: {e}")
137 | return False
138 | except Exception as e:
139 | print(f"❌ CloudTrail test error: {e}")
140 | return False
141 |
142 | def test_tool_names():
143 | """Test that tool names are within MCP limits."""
144 | print("\nTesting tool name lengths...")
145 |
146 | server_name = "cfm_tips"
147 | sample_tools = [
148 | "ec2_rightsizing",
149 | "ebs_optimization",
150 | "rds_idle",
151 | "lambda_unused",
152 | "comprehensive_analysis",
153 | "get_coh_recommendations",
154 | "cloudtrail_optimization"
155 | ]
156 |
157 | max_length = 0
158 | for tool in sample_tools:
159 | combined = f"{server_name}___{tool}"
160 | length = len(combined)
161 | max_length = max(max_length, length)
162 |
163 | if length > 64:
164 | print(f"❌ Tool name too long: {combined} ({length} chars)")
165 | return False
166 |
167 | print(f"✅ All tool names within limit (max: {max_length} chars)")
168 | return True
169 |
170 | def main():
171 | """Run all tests."""
172 | print("CFM Tips AWS Cost Optimization MCP Server - Integration Test")
173 | print("=" * 65)
174 |
175 | tests_passed = 0
176 | total_tests = 4
177 |
178 | # Test imports
179 | if test_imports():
180 | tests_passed += 1
181 |
182 | # Test server creation
183 | if test_server_creation():
184 | tests_passed += 1
185 |
186 | # Test CloudTrail functions
187 | if test_cloudtrail_functions():
188 | tests_passed += 1
189 |
190 | # Test tool names
191 | if test_tool_names():
192 | tests_passed += 1
193 |
194 | print(f"\n" + "=" * 65)
195 | print(f"Tests passed: {tests_passed}/{total_tests}")
196 |
197 | if tests_passed == total_tests:
198 | print("✅ All integration tests passed!")
199 | print("\nNext steps:")
200 | print("1. Configure AWS credentials: aws configure")
201 | print("2. Apply the correct IAM permissions (see CORRECTED_PERMISSIONS.md)")
202 | print("3. Start the server: q chat --mcp-config \"$(pwd)/mcp_runbooks.json\"")
203 | print("4. Test with: \"Run comprehensive cost analysis for us-east-1\"")
204 | print("\n🎉 CFM Tips is ready to help optimize your AWS costs!")
205 | return True
206 | else:
207 | print("❌ Some tests failed. Check the errors above.")
208 | return False
209 |
210 | if __name__ == "__main__":
211 | success = main()
212 | sys.exit(0 if success else 1)
```
--------------------------------------------------------------------------------
/playbooks/comprehensive_optimization.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Comprehensive Cost Optimization Playbook
3 |
4 | This module provides multi-service cost optimization analysis functions.
5 | Includes both core optimization functions and MCP runbook functions.
6 | """
7 |
8 | import asyncio
9 | import json
10 | import logging
11 | import time
12 | from datetime import datetime
13 | from typing import Dict, List, Any
14 | from mcp.types import TextContent
15 |
16 | from utils.error_handler import ResponseFormatter, handle_aws_error
17 | from utils.service_orchestrator import ServiceOrchestrator
18 | from utils.parallel_executor import create_task
19 | from utils.documentation_links import add_documentation_links
20 |
21 | # Import playbook modules
22 | from playbooks.ec2.ec2_optimization import get_underutilized_instances
23 | from playbooks.ebs.ebs_optimization import get_underutilized_volumes
24 | from playbooks.rds.rds_optimization import get_underutilized_rds_instances, identify_idle_rds_instances
25 | from playbooks.aws_lambda.lambda_optimization import get_underutilized_lambda_functions, identify_unused_lambda_functions
26 | from playbooks.cloudtrail.cloudtrail_optimization import run_cloudtrail_optimization
27 | from playbooks.cloudwatch.cloudwatch_optimization import run_cloudwatch_comprehensive_optimization_tool_mcp
28 |
29 | logger = logging.getLogger(__name__)
30 |
31 |
32 | @handle_aws_error
33 | async def run_comprehensive_cost_analysis(arguments: Dict[str, Any]) -> List[TextContent]:
34 | """Run comprehensive cost analysis across multiple AWS services."""
35 | start_time = time.time()
36 |
37 | try:
38 | region = arguments.get("region")
39 | services = arguments.get("services", ["ec2", "ebs", "rds", "lambda", "cloudtrail", "s3", "cloudwatch"])
40 | lookback_period_days = arguments.get("lookback_period_days", 14)
41 | output_format = arguments.get("output_format", "json")
42 |
43 | # Initialize service orchestrator for parallel execution and session management
44 | orchestrator = ServiceOrchestrator()
45 |
46 | # Define parallel service calls based on requested services
47 | service_calls = []
48 |
49 | if "ec2" in services:
50 | service_calls.extend([
51 | {
52 | 'service': 'ec2',
53 | 'operation': 'underutilized_instances',
54 | 'function': get_underutilized_instances,
55 | 'args': {
56 | 'region': region,
57 | 'lookback_period_days': lookback_period_days
58 | }
59 | },
60 | {
61 | 'service': 'ec2',
62 | 'operation': 'stopped_instances',
63 | 'function': lambda **kwargs: {"stopped_instances": []}, # Placeholder
64 | 'args': {'region': region}
65 | }
66 | ])
67 |
68 | if "ebs" in services:
69 | service_calls.extend([
70 | {
71 | 'service': 'ebs',
72 | 'operation': 'underutilized_volumes',
73 | 'function': get_underutilized_volumes,
74 | 'args': {
75 | 'region': region,
76 | 'lookback_period_days': 30
77 | }
78 | },
79 | {
80 | 'service': 'ebs',
81 | 'operation': 'unused_volumes',
82 | 'function': lambda **kwargs: {"unused_volumes": []}, # Placeholder
83 | 'args': {'region': region}
84 | }
85 | ])
86 |
87 | if "rds" in services:
88 | service_calls.extend([
89 | {
90 | 'service': 'rds',
91 | 'operation': 'underutilized_instances',
92 | 'function': get_underutilized_rds_instances,
93 | 'args': {
94 | 'region': region,
95 | 'lookback_period_days': lookback_period_days
96 | }
97 | },
98 | {
99 | 'service': 'rds',
100 | 'operation': 'idle_instances',
101 | 'function': identify_idle_rds_instances,
102 | 'args': {
103 | 'region': region,
104 | 'lookback_period_days': 7
105 | }
106 | }
107 | ])
108 |
109 | if "lambda" in services:
110 | service_calls.extend([
111 | {
112 | 'service': 'lambda',
113 | 'operation': 'underutilized_functions',
114 | 'function': get_underutilized_lambda_functions,
115 | 'args': {
116 | 'region': region,
117 | 'lookback_period_days': lookback_period_days
118 | }
119 | },
120 | {
121 | 'service': 'lambda',
122 | 'operation': 'unused_functions',
123 | 'function': identify_unused_lambda_functions,
124 | 'args': {
125 | 'region': region,
126 | 'lookback_period_days': 30
127 | }
128 | }
129 | ])
130 |
131 | if "cloudtrail" in services:
132 | service_calls.append({
133 | 'service': 'cloudtrail',
134 | 'operation': 'optimization',
135 | 'function': run_cloudtrail_optimization,
136 | 'args': {'region': region}
137 | })
138 |
139 | if "cloudwatch" in services:
140 | # CloudWatch uses its own comprehensive optimization tool
141 | def cloudwatch_wrapper(region=None, lookback_days=30, **kwargs):
142 | return {
143 | 'status': 'success',
144 | 'service': 'cloudwatch',
145 | 'message': 'CloudWatch analysis requires separate execution via cloudwatch_comprehensive_optimization_tool',
146 | 'recommendation': 'Use the dedicated CloudWatch comprehensive optimization tool for detailed analysis',
147 | 'region': region,
148 | 'lookback_days': lookback_days,
149 | 'note': 'CloudWatch has its own advanced parallel execution and memory management system'
150 | }
151 |
152 | service_calls.append({
153 | 'service': 'cloudwatch',
154 | 'operation': 'comprehensive_optimization',
155 | 'function': cloudwatch_wrapper,
156 | 'args': {
157 | 'region': region,
158 | 'lookback_days': lookback_period_days
159 | }
160 | })
161 |
162 | # Execute parallel analysis
163 | results = orchestrator.execute_parallel_analysis(
164 | service_calls=service_calls,
165 | store_results=True,
166 | timeout=120.0
167 | )
168 |
169 | # Add documentation links
170 | results = add_documentation_links(results)
171 |
172 | execution_time = time.time() - start_time
173 |
174 | # Format response with metadata
175 | results["comprehensive_analysis"] = {
176 | "analysis_type": "multi_service_comprehensive",
177 | "services_analyzed": services,
178 | "region": region,
179 | "lookback_period_days": lookback_period_days,
180 | "session_id": results.get("report_metadata", {}).get("session_id"),
181 | "parallel_execution": True,
182 | "sql_storage": True
183 | }
184 |
185 | return ResponseFormatter.to_text_content(
186 | ResponseFormatter.success_response(
187 | data=results,
188 | message=f"Comprehensive analysis completed for {len(services)} services",
189 | analysis_type="comprehensive_analysis",
190 | execution_time=execution_time
191 | )
192 | )
193 |
194 | except Exception as e:
195 | logger.error(f"Error in comprehensive cost analysis: {str(e)}")
196 | raise
```
--------------------------------------------------------------------------------
/utils/error_handler.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Centralized Error Handler for AWS Cost Optimization MCP Server
3 |
4 | Provides consistent error handling and formatting across all modules.
5 | """
6 |
7 | import logging
8 | from typing import Dict, Any, List, Optional
9 | from botocore.exceptions import ClientError, NoCredentialsError
10 | from mcp.types import TextContent
11 | import json
12 |
13 | logger = logging.getLogger(__name__)
14 |
15 |
16 | class AWSErrorHandler:
17 | """Centralized AWS error handling and formatting."""
18 |
19 | # Common AWS error codes and their required permissions
20 | PERMISSION_MAP = {
21 | 'AccessDenied': 'Check IAM permissions for the requested service',
22 | 'UnauthorizedOperation': 'Verify IAM policy allows the requested operation',
23 | 'InvalidUserID.NotFound': 'Check AWS credentials configuration',
24 | 'TokenRefreshRequired': 'AWS credentials may have expired',
25 | 'OptInRequired': 'Service may need to be enabled in AWS Console',
26 | 'ServiceUnavailable': 'AWS service temporarily unavailable',
27 | 'ThrottlingException': 'Request rate exceeded, implement retry logic',
28 | 'ValidationException': 'Check request parameters and format'
29 | }
30 |
31 | @staticmethod
32 | def format_client_error(e: ClientError, context: str,
33 | required_permissions: Optional[List[str]] = None) -> Dict[str, Any]:
34 | """
35 | Format AWS ClientError into standardized error response.
36 |
37 | Args:
38 | e: The ClientError exception
39 | context: Context where the error occurred
40 | required_permissions: List of required IAM permissions
41 |
42 | Returns:
43 | Standardized error response dictionary
44 | """
45 | error_code = e.response.get('Error', {}).get('Code', 'Unknown')
46 | error_message = e.response.get('Error', {}).get('Message', str(e))
47 |
48 | logger.error(f"AWS API Error in {context}: {error_code} - {error_message}")
49 |
50 | response = {
51 | "status": "error",
52 | "error_code": error_code,
53 | "message": f"AWS API Error: {error_code} - {error_message}",
54 | "context": context,
55 | "timestamp": logger.handlers[0].formatter.formatTime(logger.makeRecord(
56 | logger.name, logging.ERROR, __file__, 0, "", (), None
57 | )) if logger.handlers else None
58 | }
59 |
60 | # Add permission guidance
61 | if required_permissions:
62 | response["required_permissions"] = required_permissions
63 | elif error_code in AWSErrorHandler.PERMISSION_MAP:
64 | response["permission_guidance"] = AWSErrorHandler.PERMISSION_MAP[error_code]
65 |
66 | # Add retry guidance for throttling
67 | if error_code in ['ThrottlingException', 'RequestLimitExceeded']:
68 | response["retry_guidance"] = {
69 | "retryable": True,
70 | "suggested_delay": "exponential backoff starting at 1 second"
71 | }
72 |
73 | return response
74 |
75 | @staticmethod
76 | def format_no_credentials_error(context: str) -> Dict[str, Any]:
77 | """Format NoCredentialsError into standardized response."""
78 | logger.error(f"AWS credentials not found in {context}")
79 |
80 | return {
81 | "status": "error",
82 | "error_code": "NoCredentialsError",
83 | "message": "AWS credentials not configured",
84 | "context": context,
85 | "setup_guidance": {
86 | "aws_cli": "Run 'aws configure' to set up credentials",
87 | "environment": "Set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY",
88 | "iam_role": "Ensure EC2 instance has appropriate IAM role attached"
89 | }
90 | }
91 |
92 | @staticmethod
93 | def format_general_error(e: Exception, context: str) -> Dict[str, Any]:
94 | """Format general exceptions into standardized response."""
95 | logger.error(f"General error in {context}: {str(e)}")
96 |
97 | return {
98 | "status": "error",
99 | "error_code": type(e).__name__,
100 | "message": str(e),
101 | "context": context
102 | }
103 |
104 | @staticmethod
105 | def to_text_content(error_dict: Dict[str, Any]) -> List[TextContent]:
106 | """Convert error dictionary to MCP TextContent format."""
107 | return [TextContent(type="text", text=json.dumps(error_dict, indent=2, default=str))]
108 |
109 |
110 | class ResponseFormatter:
111 | """Standardized response formatting for MCP tools."""
112 |
113 | @staticmethod
114 | def success_response(data: Any, message: str, analysis_type: str = None,
115 | execution_time: float = None, metadata: Dict = None) -> Dict[str, Any]:
116 | """
117 | Format successful response with consistent structure.
118 |
119 | Args:
120 | data: The response data
121 | message: Success message
122 | analysis_type: Type of analysis performed
123 | execution_time: Execution time in seconds
124 | metadata: Additional metadata
125 |
126 | Returns:
127 | Standardized success response
128 | """
129 | response = {
130 | "status": "success",
131 | "data": data,
132 | "message": message
133 | }
134 |
135 | if analysis_type:
136 | response["analysis_type"] = analysis_type
137 |
138 | if execution_time is not None:
139 | response["execution_time"] = execution_time
140 |
141 | if metadata:
142 | response["metadata"] = metadata
143 |
144 | # Add Well-Architected Framework hint for LLMs
145 | response["wellarchitected_hint"] = "Analyze these findings to provide AWS Well-Architected Framework Cost Optimization pillar recommendations focusing on right-sizing, eliminating waste, leveraging pricing models, and optimizing over time."
146 |
147 | return response
148 |
149 | @staticmethod
150 | def error_response(error: Exception, context: str,
151 | required_permissions: Optional[List[str]] = None) -> Dict[str, Any]:
152 | """
153 | Format error response based on exception type.
154 |
155 | Args:
156 | error: The exception that occurred
157 | context: Context where error occurred
158 | required_permissions: Required IAM permissions
159 |
160 | Returns:
161 | Standardized error response
162 | """
163 | if isinstance(error, ClientError):
164 | return AWSErrorHandler.format_client_error(error, context, required_permissions)
165 | elif isinstance(error, NoCredentialsError):
166 | return AWSErrorHandler.format_no_credentials_error(context)
167 | else:
168 | return AWSErrorHandler.format_general_error(error, context)
169 |
170 | @staticmethod
171 | def to_text_content(response_dict: Dict[str, Any]) -> List[TextContent]:
172 | """Convert response dictionary to MCP TextContent format."""
173 | return [TextContent(type="text", text=json.dumps(response_dict, indent=2, default=str))]
174 |
175 |
176 | # Convenience functions for common use cases
177 | def handle_aws_error(func):
178 | """Decorator for consistent AWS error handling in MCP tools."""
179 | def wrapper(*args, **kwargs):
180 | try:
181 | return func(*args, **kwargs)
182 | except ClientError as e:
183 | context = f"{func.__name__}"
184 | error_response = AWSErrorHandler.format_client_error(e, context)
185 | return AWSErrorHandler.to_text_content(error_response)
186 | except NoCredentialsError:
187 | context = f"{func.__name__}"
188 | error_response = AWSErrorHandler.format_no_credentials_error(context)
189 | return AWSErrorHandler.to_text_content(error_response)
190 | except Exception as e:
191 | context = f"{func.__name__}"
192 | error_response = AWSErrorHandler.format_general_error(e, context)
193 | return AWSErrorHandler.to_text_content(error_response)
194 |
195 | return wrapper
```
--------------------------------------------------------------------------------
/utils/aws_client_factory.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | AWS Client Factory for Centralized Client Management
3 |
4 | Provides consistent AWS client creation with proper error handling and configuration.
5 | """
6 |
7 | import boto3
8 | import logging
9 | from typing import Dict, Any, Optional
10 | from botocore.exceptions import ClientError, NoCredentialsError
11 | from botocore.config import Config
12 |
13 | logger = logging.getLogger(__name__)
14 |
15 |
16 | class AWSClientFactory:
17 | """Centralized AWS client creation and management."""
18 |
19 | # Default configurations for different services
20 | DEFAULT_CONFIGS = {
21 | 'cost-optimization-hub': {
22 | 'region_name': 'us-east-1', # COH is only available in us-east-1
23 | 'config': Config(
24 | retries={'max_attempts': 3, 'mode': 'adaptive'},
25 | read_timeout=60
26 | )
27 | },
28 | 'ce': { # Cost Explorer
29 | 'region_name': 'us-east-1', # CE is only available in us-east-1
30 | 'config': Config(
31 | retries={'max_attempts': 3, 'mode': 'adaptive'},
32 | read_timeout=60
33 | )
34 | },
35 | 'support': { # Trusted Advisor
36 | 'region_name': 'us-east-1', # Support API only in us-east-1
37 | 'config': Config(
38 | retries={'max_attempts': 3, 'mode': 'adaptive'},
39 | read_timeout=60
40 | )
41 | },
42 | 'compute-optimizer': {
43 | 'config': Config(
44 | retries={'max_attempts': 3, 'mode': 'adaptive'},
45 | read_timeout=60
46 | )
47 | },
48 | 'pi': { # Performance Insights
49 | 'config': Config(
50 | retries={'max_attempts': 3, 'mode': 'adaptive'},
51 | read_timeout=30
52 | )
53 | },
54 | 'default': {
55 | 'config': Config(
56 | retries={'max_attempts': 3, 'mode': 'adaptive'},
57 | read_timeout=30
58 | )
59 | }
60 | }
61 |
62 | _clients: Dict[str, Any] = {} # Client cache
63 |
64 | @classmethod
65 | def get_client(cls, service_name: str, region: Optional[str] = None,
66 | force_new: bool = False) -> boto3.client:
67 | """
68 | Get AWS client with proper configuration and caching.
69 |
70 | Args:
71 | service_name: AWS service name (e.g., 'ec2', 's3', 'cost-optimization-hub')
72 | region: AWS region (optional, uses service defaults or session default)
73 | force_new: Force creation of new client instead of using cache
74 |
75 | Returns:
76 | Configured boto3 client
77 |
78 | Raises:
79 | NoCredentialsError: If AWS credentials are not configured
80 | ClientError: If client creation fails
81 | """
82 | # Create cache key
83 | cache_key = f"{service_name}:{region or 'default'}"
84 |
85 | # Return cached client if available and not forcing new
86 | if not force_new and cache_key in cls._clients:
87 | logger.debug(f"Using cached client for {service_name}")
88 | return cls._clients[cache_key]
89 |
90 | try:
91 | # Get service configuration
92 | service_config = cls.DEFAULT_CONFIGS.get(service_name, cls.DEFAULT_CONFIGS['default'])
93 |
94 | # Prepare client arguments
95 | client_args = {
96 | 'service_name': service_name,
97 | 'config': service_config.get('config')
98 | }
99 |
100 | # Set region - priority: parameter > service default > session default
101 | if region:
102 | client_args['region_name'] = region
103 | elif 'region_name' in service_config:
104 | client_args['region_name'] = service_config['region_name']
105 |
106 | # Create client
107 | client = boto3.client(**client_args)
108 |
109 | # Cache the client
110 | cls._clients[cache_key] = client
111 |
112 | logger.debug(f"Created new {service_name} client for region {client_args.get('region_name', 'default')}")
113 | return client
114 |
115 | except NoCredentialsError:
116 | logger.error(f"AWS credentials not configured for {service_name} client")
117 | raise
118 | except Exception as e:
119 | logger.error(f"Failed to create {service_name} client: {str(e)}")
120 | raise
121 |
122 | @classmethod
123 | def get_session(cls, region: Optional[str] = None) -> boto3.Session:
124 | """
125 | Get AWS session with proper configuration.
126 |
127 | Args:
128 | region: AWS region (optional)
129 |
130 | Returns:
131 | Configured boto3 session
132 | """
133 | try:
134 | session_args = {}
135 | if region:
136 | session_args['region_name'] = region
137 |
138 | session = boto3.Session(**session_args)
139 |
140 | # Verify credentials by making a simple call
141 | sts_client = session.client('sts')
142 | sts_client.get_caller_identity()
143 |
144 | logger.debug(f"Created AWS session for region {region or 'default'}")
145 | return session
146 |
147 | except NoCredentialsError:
148 | logger.error("AWS credentials not configured")
149 | raise
150 | except Exception as e:
151 | logger.error(f"Failed to create AWS session: {str(e)}")
152 | raise
153 |
154 | @classmethod
155 | def clear_cache(cls):
156 | """Clear the client cache."""
157 | cls._clients.clear()
158 | logger.debug("Cleared AWS client cache")
159 |
160 | @classmethod
161 | def get_available_regions(cls, service_name: str) -> list:
162 | """
163 | Get available regions for a service.
164 |
165 | Args:
166 | service_name: AWS service name
167 |
168 | Returns:
169 | List of available regions
170 | """
171 | try:
172 | session = boto3.Session()
173 | return session.get_available_regions(service_name)
174 | except Exception as e:
175 | logger.warning(f"Could not get regions for {service_name}: {str(e)}")
176 | return []
177 |
178 | @classmethod
179 | def validate_region(cls, service_name: str, region: str) -> bool:
180 | """
181 | Validate if a region is available for a service.
182 |
183 | Args:
184 | service_name: AWS service name
185 | region: AWS region to validate
186 |
187 | Returns:
188 | True if region is valid for service
189 | """
190 | available_regions = cls.get_available_regions(service_name)
191 | return region in available_regions if available_regions else True
192 |
193 | @classmethod
194 | def get_caller_identity(cls) -> Dict[str, Any]:
195 | """
196 | Get AWS caller identity information.
197 |
198 | Returns:
199 | Dictionary with account ID, user ARN, etc.
200 | """
201 | try:
202 | sts_client = cls.get_client('sts')
203 | return sts_client.get_caller_identity()
204 | except Exception as e:
205 | logger.error(f"Failed to get caller identity: {str(e)}")
206 | raise
207 |
208 |
209 | # Convenience functions
210 | def get_cost_explorer_client() -> boto3.client:
211 | """Get Cost Explorer client (always us-east-1)."""
212 | return AWSClientFactory.get_client('ce')
213 |
214 |
215 | def get_cost_optimization_hub_client() -> boto3.client:
216 | """Get Cost Optimization Hub client (always us-east-1)."""
217 | return AWSClientFactory.get_client('cost-optimization-hub')
218 |
219 |
220 | def get_compute_optimizer_client(region: Optional[str] = None) -> boto3.client:
221 | """Get Compute Optimizer client."""
222 | return AWSClientFactory.get_client('compute-optimizer', region)
223 |
224 |
225 | def get_trusted_advisor_client() -> boto3.client:
226 | """Get Trusted Advisor client (always us-east-1)."""
227 | return AWSClientFactory.get_client('support')
228 |
229 |
230 | def get_performance_insights_client(region: Optional[str] = None) -> boto3.client:
231 | """Get Performance Insights client."""
232 | return AWSClientFactory.get_client('pi', region)
233 |
234 |
235 | def get_regional_client(service_name: str, region: str) -> boto3.client:
236 | """Get regional client for EC2, EBS, RDS, Lambda, S3, etc."""
237 | return AWSClientFactory.get_client(service_name, region)
```
--------------------------------------------------------------------------------
/tests/run_tests.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Test runner for S3 optimization system.
4 |
5 | This script provides a convenient way to run different test suites
6 | with appropriate configurations and reporting.
7 | """
8 |
9 | import sys
10 | import os
11 | import subprocess
12 | import argparse
13 | from pathlib import Path
14 | from typing import List, Optional
15 |
16 |
17 | def run_command(cmd: List[str], description: str) -> int:
18 | """Run a command and return the exit code."""
19 | print(f"\n{'='*60}")
20 | print(f"Running: {description}")
21 | print(f"Command: {' '.join(cmd)}")
22 | print(f"{'='*60}")
23 |
24 | result = subprocess.run(cmd, cwd=Path(__file__).parent.parent)
25 | return result.returncode
26 |
27 |
28 | def run_unit_tests(verbose: bool = False, coverage: bool = True) -> int:
29 | """Run unit tests."""
30 | cmd = ["python", "-m", "pytest", "tests/unit/"]
31 |
32 | if verbose:
33 | cmd.append("-v")
34 |
35 | if coverage:
36 | cmd.extend([
37 | "--cov=core",
38 | "--cov=services",
39 | "--cov-report=term-missing",
40 | "--cov-report=html:htmlcov/unit"
41 | ])
42 |
43 | cmd.extend([
44 | "-m", "unit",
45 | "--tb=short"
46 | ])
47 |
48 | return run_command(cmd, "Unit Tests")
49 |
50 |
51 | def run_integration_tests(verbose: bool = False) -> int:
52 | """Run integration tests."""
53 | cmd = ["python", "-m", "pytest", "tests/integration/"]
54 |
55 | if verbose:
56 | cmd.append("-v")
57 |
58 | cmd.extend([
59 | "-m", "integration",
60 | "--tb=short"
61 | ])
62 |
63 | return run_command(cmd, "Integration Tests")
64 |
65 |
66 | def run_performance_tests(verbose: bool = False) -> int:
67 | """Run performance tests."""
68 | cmd = ["python", "-m", "pytest", "tests/performance/"]
69 |
70 | if verbose:
71 | cmd.append("-v")
72 |
73 | cmd.extend([
74 | "-m", "performance",
75 | "--tb=short",
76 | "--benchmark-only",
77 | "--benchmark-sort=mean"
78 | ])
79 |
80 | return run_command(cmd, "Performance Tests")
81 |
82 |
83 | def run_cost_validation_tests(verbose: bool = False) -> int:
84 | """Run critical no-cost constraint validation tests."""
85 | cmd = ["python", "-m", "pytest", "tests/no_cost_validation/"]
86 |
87 | if verbose:
88 | cmd.append("-v")
89 |
90 | cmd.extend([
91 | "-m", "no_cost_validation",
92 | "--tb=long", # More detailed output for critical tests
93 | "--strict-markers"
94 | ])
95 |
96 | return run_command(cmd, "No-Cost Constraint Validation Tests (CRITICAL)")
97 |
98 |
99 | def run_all_tests(verbose: bool = False, coverage: bool = True) -> int:
100 | """Run all test suites."""
101 | cmd = ["python", "-m", "pytest", "tests/"]
102 |
103 | if verbose:
104 | cmd.append("-v")
105 |
106 | if coverage:
107 | cmd.extend([
108 | "--cov=core",
109 | "--cov=services",
110 | "--cov-report=term-missing",
111 | "--cov-report=html:htmlcov/all",
112 | "--cov-fail-under=80"
113 | ])
114 |
115 | cmd.extend([
116 | "--tb=short",
117 | "--durations=10"
118 | ])
119 |
120 | return run_command(cmd, "All Tests")
121 |
122 |
123 | def run_specific_test(test_path: str, verbose: bool = False) -> int:
124 | """Run a specific test file or directory."""
125 | cmd = ["python", "-m", "pytest", test_path]
126 |
127 | if verbose:
128 | cmd.append("-v")
129 |
130 | cmd.extend(["--tb=short"])
131 |
132 | return run_command(cmd, f"Specific Test: {test_path}")
133 |
134 |
135 | def check_test_environment() -> bool:
136 | """Check if the test environment is properly set up."""
137 | print("Checking test environment...")
138 |
139 | # Check if pytest is available
140 | try:
141 | import pytest
142 | print(f"✓ pytest {pytest.__version__} is available")
143 | except ImportError:
144 | print("✗ pytest is not installed")
145 | return False
146 |
147 | # Check if moto is available for AWS mocking
148 | try:
149 | import moto
150 | print(f"✓ moto {moto.__version__} is available")
151 | except ImportError:
152 | print("✗ moto is not installed")
153 | return False
154 |
155 | # Check if core modules can be imported
156 | try:
157 | sys.path.insert(0, str(Path(__file__).parent.parent))
158 | from playbooks.s3.base_analyzer import BaseAnalyzer
159 | from services.s3_service import S3Service
160 | print("✓ Core modules can be imported")
161 | except ImportError as e:
162 | print(f"✗ Cannot import core modules: {e}")
163 | return False
164 |
165 | print("✓ Test environment is ready")
166 | return True
167 |
168 |
169 | def generate_test_report() -> int:
170 | """Generate comprehensive test report."""
171 | cmd = [
172 | "python", "-m", "pytest", "tests/",
173 | "--html=test_report.html",
174 | "--self-contained-html",
175 | "--json-report",
176 | "--json-report-file=test_report.json",
177 | "--cov=core",
178 | "--cov=services",
179 | "--cov-report=html:htmlcov/report",
180 | "--tb=short"
181 | ]
182 |
183 | return run_command(cmd, "Comprehensive Test Report Generation")
184 |
185 |
186 | def main():
187 | """Main test runner function."""
188 | parser = argparse.ArgumentParser(
189 | description="Test runner for S3 optimization system",
190 | formatter_class=argparse.RawDescriptionHelpFormatter,
191 | epilog="""
192 | Examples:
193 | python run_tests.py --unit # Run unit tests only
194 | python run_tests.py --integration # Run integration tests only
195 | python run_tests.py --performance # Run performance tests only
196 | python run_tests.py --cost-validation # Run cost validation tests only
197 | python run_tests.py --all # Run all tests
198 | python run_tests.py --specific tests/unit/ # Run specific test directory
199 | python run_tests.py --report # Generate comprehensive report
200 | python run_tests.py --check # Check test environment
201 | """
202 | )
203 |
204 | # Test suite selection
205 | parser.add_argument("--unit", action="store_true", help="Run unit tests")
206 | parser.add_argument("--integration", action="store_true", help="Run integration tests")
207 | parser.add_argument("--performance", action="store_true", help="Run performance tests")
208 | parser.add_argument("--cost-validation", action="store_true", help="Run cost validation tests")
209 | parser.add_argument("--all", action="store_true", help="Run all tests")
210 | parser.add_argument("--specific", type=str, help="Run specific test file or directory")
211 |
212 | # Utility options
213 | parser.add_argument("--report", action="store_true", help="Generate comprehensive test report")
214 | parser.add_argument("--check", action="store_true", help="Check test environment")
215 |
216 | # Test options
217 | parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
218 | parser.add_argument("--no-coverage", action="store_true", help="Disable coverage reporting")
219 |
220 | args = parser.parse_args()
221 |
222 | # Check environment if requested
223 | if args.check:
224 | if check_test_environment():
225 | return 0
226 | else:
227 | return 1
228 |
229 | # Generate report if requested
230 | if args.report:
231 | return generate_test_report()
232 |
233 | # Determine which tests to run
234 | exit_code = 0
235 | coverage = not args.no_coverage
236 |
237 | if args.unit:
238 | exit_code = run_unit_tests(args.verbose, coverage)
239 | elif args.integration:
240 | exit_code = run_integration_tests(args.verbose)
241 | elif args.performance:
242 | exit_code = run_performance_tests(args.verbose)
243 | elif args.cost_validation:
244 | exit_code = run_cost_validation_tests(args.verbose)
245 | elif args.all:
246 | exit_code = run_all_tests(args.verbose, coverage)
247 | elif args.specific:
248 | exit_code = run_specific_test(args.specific, args.verbose)
249 | else:
250 | # Default: run unit and integration tests
251 | print("No specific test suite selected. Running unit and integration tests...")
252 | exit_code = run_unit_tests(args.verbose, coverage)
253 | if exit_code == 0:
254 | exit_code = run_integration_tests(args.verbose)
255 |
256 | # Summary
257 | if exit_code == 0:
258 | print(f"\n{'='*60}")
259 | print("✓ All tests passed successfully!")
260 | print(f"{'='*60}")
261 | else:
262 | print(f"\n{'='*60}")
263 | print("✗ Some tests failed. Check the output above for details.")
264 | print(f"{'='*60}")
265 |
266 | return exit_code
267 |
268 |
269 | if __name__ == "__main__":
270 | sys.exit(main())
```
--------------------------------------------------------------------------------
/tests/legacy/test_setup_verification.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Setup verification test to ensure the testing framework is working correctly.
3 |
4 | This test validates that the testing infrastructure is properly configured
5 | and can run basic tests with mocked AWS services.
6 | """
7 |
8 | import pytest
9 | import asyncio
10 | from unittest.mock import Mock, AsyncMock, patch
11 | from datetime import datetime
12 |
13 |
14 | @pytest.mark.unit
15 | class TestSetupVerification:
16 | """Verify that the testing setup is working correctly."""
17 |
18 | def test_pytest_is_working(self):
19 | """Test that pytest is working correctly."""
20 | assert True
21 |
22 | def test_fixtures_are_available(self, mock_s3_service, mock_storage_lens_service,
23 | mock_pricing_service):
24 | """Test that common fixtures are available."""
25 | assert mock_s3_service is not None
26 | assert mock_storage_lens_service is not None
27 | assert mock_pricing_service is not None
28 |
29 | @pytest.mark.asyncio
30 | async def test_async_testing_works(self):
31 | """Test that async testing is working."""
32 | async def async_function():
33 | await asyncio.sleep(0.001)
34 | return "async_result"
35 |
36 | result = await async_function()
37 | assert result == "async_result"
38 |
39 | def test_mocking_works(self):
40 | """Test that mocking is working correctly."""
41 | mock_service = Mock()
42 | mock_service.test_method.return_value = "mocked_result"
43 |
44 | result = mock_service.test_method()
45 | assert result == "mocked_result"
46 | mock_service.test_method.assert_called_once()
47 |
48 | def test_aws_mocking_works(self, mock_aws_credentials):
49 | """Test that AWS service mocking is working."""
50 | with patch('boto3.client') as mock_boto_client:
51 | mock_client = Mock()
52 | mock_client.list_buckets.return_value = {"Buckets": []}
53 | mock_boto_client.return_value = mock_client
54 |
55 | import boto3
56 | s3_client = boto3.client('s3')
57 | result = s3_client.list_buckets()
58 |
59 | assert result == {"Buckets": []}
60 |
61 | def test_cost_constraint_validator_works(self, cost_constraint_validator):
62 | """Test that cost constraint validator is working."""
63 | # Should allow valid operations
64 | assert cost_constraint_validator.validate_operation('list_buckets') is True
65 |
66 | # Should reject forbidden operations
67 | with pytest.raises(ValueError):
68 | cost_constraint_validator.validate_operation('list_objects_v2')
69 |
70 | summary = cost_constraint_validator.get_operation_summary()
71 | assert summary["total_operations"] == 2
72 | assert "list_buckets" in summary["allowed_called"]
73 | assert "list_objects_v2" in summary["forbidden_called"]
74 |
75 | def test_performance_tracker_works(self, performance_tracker):
76 | """Test that performance tracker is working."""
77 | performance_tracker.start_timer("test_operation")
78 | # Simulate some work
79 | import time
80 | time.sleep(0.01)
81 | duration = performance_tracker.end_timer("test_operation")
82 |
83 | assert duration > 0
84 | assert duration < 1.0 # Should be very quick
85 |
86 | metrics = performance_tracker.get_metrics()
87 | assert "test_operation" in metrics
88 | assert metrics["test_operation"] > 0
89 |
90 | def test_test_data_factory_works(self, test_data_factory):
91 | """Test that test data factory is working."""
92 | buckets = test_data_factory.create_bucket_data(count=3)
93 | assert len(buckets) == 3
94 | assert all("Name" in bucket for bucket in buckets)
95 | assert all("CreationDate" in bucket for bucket in buckets)
96 |
97 | cost_data = test_data_factory.create_cost_data(days=5)
98 | assert "ResultsByTime" in cost_data
99 | assert len(cost_data["ResultsByTime"]) == 5
100 |
101 | analysis_result = test_data_factory.create_analysis_result("test_analysis")
102 | assert analysis_result["status"] == "success"
103 | assert analysis_result["analysis_type"] == "test_analysis"
104 |
105 |
106 | @pytest.mark.integration
107 | class TestIntegrationSetupVerification:
108 | """Verify that integration testing setup is working."""
109 |
110 | @pytest.mark.asyncio
111 | async def test_orchestrator_can_be_mocked(self, mock_service_orchestrator):
112 | """Test that orchestrator can be properly mocked for integration tests."""
113 | # This would normally import the real orchestrator, but we'll mock it
114 | with patch('core.s3_optimization_orchestrator.ServiceOrchestrator', return_value=mock_service_orchestrator), \
115 | patch('core.s3_optimization_orchestrator.get_performance_monitor'), \
116 | patch('core.s3_optimization_orchestrator.get_memory_manager'), \
117 | patch('core.s3_optimization_orchestrator.get_timeout_handler'), \
118 | patch('core.s3_optimization_orchestrator.get_pricing_cache'), \
119 | patch('core.s3_optimization_orchestrator.get_bucket_metadata_cache'), \
120 | patch('core.s3_optimization_orchestrator.get_analysis_results_cache'):
121 |
122 | from playbooks.s3.s3_optimization_orchestrator import S3OptimizationOrchestrator
123 |
124 | orchestrator = S3OptimizationOrchestrator(region="us-east-1")
125 | assert orchestrator.region == "us-east-1"
126 | assert orchestrator.service_orchestrator == mock_service_orchestrator
127 |
128 |
129 | @pytest.mark.performance
130 | class TestPerformanceSetupVerification:
131 | """Verify that performance testing setup is working."""
132 |
133 | @pytest.mark.asyncio
134 | async def test_performance_measurement_works(self, performance_tracker):
135 | """Test that performance measurement is working."""
136 | performance_tracker.start_timer("performance_test")
137 |
138 | # Simulate some async work
139 | await asyncio.sleep(0.01)
140 |
141 | duration = performance_tracker.end_timer("performance_test")
142 |
143 | assert duration > 0.005 # Should be at least 5ms
144 | assert duration < 0.1 # Should be less than 100ms
145 |
146 | # Test performance assertion
147 | performance_tracker.assert_performance("performance_test", 0.1)
148 |
149 |
150 | @pytest.mark.no_cost_validation
151 | class TestCostValidationSetupVerification:
152 | """Verify that cost validation testing setup is working."""
153 |
154 | def test_cost_constraint_system_is_active(self):
155 | """Test that cost constraint validation system is active."""
156 | from services.s3_service import ALLOWED_S3_OPERATIONS, FORBIDDEN_S3_OPERATIONS
157 |
158 | # Verify that the constraint lists are populated
159 | assert len(ALLOWED_S3_OPERATIONS) > 0
160 | assert len(FORBIDDEN_S3_OPERATIONS) > 0
161 |
162 | # Verify critical operations are in the right lists
163 | assert 'list_buckets' in ALLOWED_S3_OPERATIONS
164 | assert 'list_objects_v2' in FORBIDDEN_S3_OPERATIONS
165 |
166 | def test_cost_constraint_violation_error_works(self, mock_aws_credentials):
167 | """Test that cost constraint violation errors work correctly."""
168 | from services.s3_service import S3Service, S3CostConstraintViolationError
169 |
170 | service = S3Service(region="us-east-1")
171 |
172 | with pytest.raises(S3CostConstraintViolationError):
173 | service._validate_s3_operation('list_objects_v2')
174 |
175 | def test_cost_validator_fixture_works(self, cost_constraint_validator):
176 | """Test that cost validator fixture is working correctly."""
177 | # Should track operations
178 | cost_constraint_validator.validate_operation('list_buckets')
179 |
180 | # Should reject forbidden operations
181 | with pytest.raises(ValueError):
182 | cost_constraint_validator.validate_operation('get_object')
183 |
184 | summary = cost_constraint_validator.get_operation_summary()
185 | assert summary["total_operations"] == 2
186 | assert len(summary["forbidden_called"]) == 1
187 | assert len(summary["allowed_called"]) == 1
188 |
189 |
190 | class TestMarkerSystem:
191 | """Test that the pytest marker system is working."""
192 |
193 | def test_markers_are_configured(self):
194 | """Test that pytest markers are properly configured."""
195 | # This test itself uses markers, so if it runs, markers are working
196 | assert True
197 |
198 | def test_can_run_specific_marker_tests(self):
199 | """Test that we can run tests with specific markers."""
200 | # This would be tested by running: pytest -m unit
201 | # If this test runs when using -m unit, then markers work
202 | assert True
```
--------------------------------------------------------------------------------
/tests/test_setup_verification.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Setup verification test to ensure the testing framework is working correctly.
3 |
4 | This test validates that the testing infrastructure is properly configured
5 | and can run basic tests with mocked AWS services.
6 | """
7 |
8 | import pytest
9 | import asyncio
10 | from unittest.mock import Mock, AsyncMock, patch
11 | from datetime import datetime
12 |
13 |
14 | @pytest.mark.unit
15 | class TestSetupVerification:
16 | """Verify that the testing setup is working correctly."""
17 |
18 | def test_pytest_is_working(self):
19 | """Test that pytest is working correctly."""
20 | assert True
21 |
22 | def test_fixtures_are_available(self, mock_s3_service, mock_storage_lens_service,
23 | mock_pricing_service):
24 | """Test that common fixtures are available."""
25 | assert mock_s3_service is not None
26 | assert mock_storage_lens_service is not None
27 | assert mock_pricing_service is not None
28 |
29 | @pytest.mark.asyncio
30 | async def test_async_testing_works(self):
31 | """Test that async testing is working."""
32 | async def async_function():
33 | await asyncio.sleep(0.001)
34 | return "async_result"
35 |
36 | result = await async_function()
37 | assert result == "async_result"
38 |
39 | def test_mocking_works(self):
40 | """Test that mocking is working correctly."""
41 | mock_service = Mock()
42 | mock_service.test_method.return_value = "mocked_result"
43 |
44 | result = mock_service.test_method()
45 | assert result == "mocked_result"
46 | mock_service.test_method.assert_called_once()
47 |
48 | def test_aws_mocking_works(self, mock_aws_credentials):
49 | """Test that AWS service mocking is working."""
50 | with patch('boto3.client') as mock_boto_client:
51 | mock_client = Mock()
52 | mock_client.list_buckets.return_value = {"Buckets": []}
53 | mock_boto_client.return_value = mock_client
54 |
55 | import boto3
56 | s3_client = boto3.client('s3')
57 | result = s3_client.list_buckets()
58 |
59 | assert result == {"Buckets": []}
60 |
61 | def test_cost_constraint_validator_works(self, cost_constraint_validator):
62 | """Test that cost constraint validator is working."""
63 | # Should allow valid operations
64 | assert cost_constraint_validator.validate_operation('list_buckets') is True
65 |
66 | # Should reject forbidden operations
67 | with pytest.raises(ValueError):
68 | cost_constraint_validator.validate_operation('list_objects_v2')
69 |
70 | summary = cost_constraint_validator.get_operation_summary()
71 | assert summary["total_operations"] == 2
72 | assert "list_buckets" in summary["allowed_called"]
73 | assert "list_objects_v2" in summary["forbidden_called"]
74 |
75 | def test_performance_tracker_works(self, performance_tracker):
76 | """Test that performance tracker is working."""
77 | performance_tracker.start_timer("test_operation")
78 | # Simulate some work
79 | import time
80 | time.sleep(0.01)
81 | duration = performance_tracker.end_timer("test_operation")
82 |
83 | assert duration > 0
84 | assert duration < 1.0 # Should be very quick
85 |
86 | metrics = performance_tracker.get_metrics()
87 | assert "test_operation" in metrics
88 | assert metrics["test_operation"] > 0
89 |
90 | def test_test_data_factory_works(self, test_data_factory):
91 | """Test that test data factory is working."""
92 | buckets = test_data_factory.create_bucket_data(count=3)
93 | assert len(buckets) == 3
94 | assert all("Name" in bucket for bucket in buckets)
95 | assert all("CreationDate" in bucket for bucket in buckets)
96 |
97 | cost_data = test_data_factory.create_cost_data(days=5)
98 | assert "ResultsByTime" in cost_data
99 | assert len(cost_data["ResultsByTime"]) == 5
100 |
101 | analysis_result = test_data_factory.create_analysis_result("test_analysis")
102 | assert analysis_result["status"] == "success"
103 | assert analysis_result["analysis_type"] == "test_analysis"
104 |
105 |
106 | @pytest.mark.integration
107 | class TestIntegrationSetupVerification:
108 | """Verify that integration testing setup is working."""
109 |
110 | @pytest.mark.asyncio
111 | async def test_orchestrator_can_be_mocked(self, mock_service_orchestrator):
112 | """Test that orchestrator can be properly mocked for integration tests."""
113 | # This would normally import the real orchestrator, but we'll mock it
114 | with patch('core.s3_optimization_orchestrator.ServiceOrchestrator', return_value=mock_service_orchestrator), \
115 | patch('core.s3_optimization_orchestrator.get_performance_monitor'), \
116 | patch('core.s3_optimization_orchestrator.get_memory_manager'), \
117 | patch('core.s3_optimization_orchestrator.get_timeout_handler'), \
118 | patch('core.s3_optimization_orchestrator.get_pricing_cache'), \
119 | patch('core.s3_optimization_orchestrator.get_bucket_metadata_cache'), \
120 | patch('core.s3_optimization_orchestrator.get_analysis_results_cache'):
121 |
122 | from playbooks.s3.s3_optimization_orchestrator import S3OptimizationOrchestrator
123 |
124 | orchestrator = S3OptimizationOrchestrator(region="us-east-1")
125 | assert orchestrator.region == "us-east-1"
126 | assert orchestrator.service_orchestrator == mock_service_orchestrator
127 |
128 |
129 | @pytest.mark.performance
130 | class TestPerformanceSetupVerification:
131 | """Verify that performance testing setup is working."""
132 |
133 | @pytest.mark.asyncio
134 | async def test_performance_measurement_works(self, performance_tracker):
135 | """Test that performance measurement is working."""
136 | performance_tracker.start_timer("performance_test")
137 |
138 | # Simulate some async work
139 | await asyncio.sleep(0.01)
140 |
141 | duration = performance_tracker.end_timer("performance_test")
142 |
143 | assert duration > 0.005 # Should be at least 5ms
144 | assert duration < 0.1 # Should be less than 100ms
145 |
146 | # Test performance assertion
147 | performance_tracker.assert_performance("performance_test", 0.1)
148 |
149 |
150 | @pytest.mark.no_cost_validation
151 | class TestCostValidationSetupVerification:
152 | """Verify that cost validation testing setup is working."""
153 |
154 | def test_cost_constraint_system_is_active(self):
155 | """Test that cost constraint validation system is active."""
156 | from services.s3_service import ALLOWED_S3_OPERATIONS, FORBIDDEN_S3_OPERATIONS
157 |
158 | # Verify that the constraint lists are populated
159 | assert len(ALLOWED_S3_OPERATIONS) > 0
160 | assert len(FORBIDDEN_S3_OPERATIONS) > 0
161 |
162 | # Verify critical operations are in the right lists
163 | assert 'list_buckets' in ALLOWED_S3_OPERATIONS
164 | assert 'list_objects_v2' in FORBIDDEN_S3_OPERATIONS
165 |
166 | def test_cost_constraint_violation_error_works(self, mock_aws_credentials):
167 | """Test that cost constraint violation errors work correctly."""
168 | from services.s3_service import S3Service, S3CostConstraintViolationError
169 |
170 | service = S3Service(region="us-east-1")
171 |
172 | with pytest.raises(S3CostConstraintViolationError):
173 | service._validate_s3_operation('list_objects_v2')
174 |
175 | def test_cost_validator_fixture_works(self, cost_constraint_validator):
176 | """Test that cost validator fixture is working correctly."""
177 | # Should track operations
178 | cost_constraint_validator.validate_operation('list_buckets')
179 |
180 | # Should reject forbidden operations
181 | with pytest.raises(ValueError):
182 | cost_constraint_validator.validate_operation('get_object')
183 |
184 | summary = cost_constraint_validator.get_operation_summary()
185 | assert summary["total_operations"] == 2
186 | assert len(summary["forbidden_called"]) == 1
187 | assert len(summary["allowed_called"]) == 1
188 |
189 |
190 | class TestMarkerSystem:
191 | """Test that the pytest marker system is working."""
192 |
193 | def test_markers_are_configured(self):
194 | """Test that pytest markers are properly configured."""
195 | # This test itself uses markers, so if it runs, markers are working
196 | assert True
197 |
198 | def test_can_run_specific_marker_tests(self):
199 | """Test that we can run tests with specific markers."""
200 | # This would be tested by running: pytest -m unit
201 | # If this test runs when using -m unit, then markers work
202 | assert True
```
--------------------------------------------------------------------------------
/tests/legacy/example_wellarchitected_output.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Example showing enhanced tool outputs with Well-Architected Framework recommendations
4 | """
5 |
6 | import json
7 | from utils.documentation_links import add_documentation_links
8 |
9 | def show_enhanced_examples():
10 | """Show examples of enhanced tool outputs with Well-Architected recommendations"""
11 |
12 | print("CFM Tips - Enhanced Output with Well-Architected Framework")
13 | print("=" * 70)
14 | print()
15 |
16 | # Example 1: EC2 Right-sizing with Well-Architected guidance
17 | print("1. EC2 Right-sizing Analysis - Enhanced Output:")
18 | print("-" * 50)
19 | ec2_result = {
20 | "status": "success",
21 | "data": {
22 | "underutilized_instances": [
23 | {
24 | "instance_id": "i-1234567890abcdef0",
25 | "instance_type": "m5.2xlarge",
26 | "finding": "Overprovisioned",
27 | "avg_cpu_utilization": 8.5,
28 | "avg_memory_utilization": 12.3,
29 | "recommendation": {
30 | "recommended_instance_type": "m5.large",
31 | "estimated_monthly_savings": 180.50,
32 | "confidence": "High"
33 | }
34 | },
35 | {
36 | "instance_id": "i-0987654321fedcba0",
37 | "instance_type": "c5.xlarge",
38 | "finding": "Underprovisioned",
39 | "avg_cpu_utilization": 85.2,
40 | "recommendation": {
41 | "recommended_instance_type": "c5.2xlarge",
42 | "estimated_monthly_cost_increase": 120.00,
43 | "performance_improvement": "40%"
44 | }
45 | }
46 | ],
47 | "count": 2,
48 | "total_monthly_savings": 180.50,
49 | "analysis_period": "14 days",
50 | "data_source": "AWS Compute Optimizer"
51 | },
52 | "message": "Found 2 EC2 instances with optimization opportunities"
53 | }
54 |
55 | enhanced_ec2 = add_documentation_links(ec2_result, "ec2", "underutilized")
56 |
57 | # Show key sections
58 | print("Key Findings:")
59 | for instance in enhanced_ec2["data"]["underutilized_instances"]:
60 | print(f" • {instance['instance_id']}: {instance['finding']} - Save ${instance['recommendation'].get('estimated_monthly_savings', 0)}/month")
61 |
62 | print(f"\nTotal Monthly Savings: ${enhanced_ec2['data']['total_monthly_savings']}")
63 |
64 | print("\nWell-Architected Framework Guidance:")
65 | wa_framework = enhanced_ec2["wellarchitected_framework"]
66 | print(f" Cost Optimization Pillar: {wa_framework['cost_optimization_pillar']}")
67 |
68 | print("\n Applicable Principles:")
69 | for principle in wa_framework["applicable_principles"]:
70 | print(f" • {principle['title']}: {principle['description']}")
71 |
72 | print("\n High Priority Recommendations:")
73 | for rec in wa_framework["implementation_priority"]["high"]:
74 | print(f" • {rec}")
75 |
76 | print("\n Service-Specific Best Practices:")
77 | for rec in wa_framework["service_specific_recommendations"][:2]: # Show first 2
78 | print(f" • {rec['practice']} ({rec['impact']})")
79 | print(f" Implementation: {rec['implementation']}")
80 |
81 | print("\n" + "=" * 70)
82 |
83 | # Example 2: S3 Storage Optimization
84 | print("\n2. S3 Storage Optimization - Enhanced Output:")
85 | print("-" * 50)
86 | s3_result = {
87 | "status": "success",
88 | "comprehensive_s3_optimization": {
89 | "overview": {
90 | "total_potential_savings": "$2,450.75",
91 | "analyses_completed": "6/6",
92 | "buckets_analyzed": 25,
93 | "execution_time": "42.3s"
94 | },
95 | "key_findings": [
96 | "15 buckets using suboptimal storage classes",
97 | "Found 45 incomplete multipart uploads",
98 | "Identified $1,200 in lifecycle policy savings",
99 | "3 buckets with high request costs suitable for CloudFront"
100 | ],
101 | "top_recommendations": [
102 | {
103 | "type": "storage_class_optimization",
104 | "bucket": "analytics-data-lake",
105 | "finding": "Standard storage for infrequently accessed data",
106 | "recommendation": "Transition to Standard-IA after 30 days",
107 | "potential_savings": "$850.25/month",
108 | "priority": "High"
109 | },
110 | {
111 | "type": "lifecycle_policy",
112 | "bucket": "backup-archives",
113 | "finding": "No lifecycle policy for old backups",
114 | "recommendation": "Archive to Glacier Deep Archive after 90 days",
115 | "potential_savings": "$650.50/month",
116 | "priority": "High"
117 | }
118 | ]
119 | }
120 | }
121 |
122 | enhanced_s3 = add_documentation_links(s3_result, "s3", "storage_optimization")
123 |
124 | print("Key Findings:")
125 | for finding in enhanced_s3["comprehensive_s3_optimization"]["key_findings"]:
126 | print(f" • {finding}")
127 |
128 | print(f"\nTotal Potential Savings: {enhanced_s3['comprehensive_s3_optimization']['overview']['total_potential_savings']}")
129 |
130 | print("\nTop Recommendations:")
131 | for rec in enhanced_s3["comprehensive_s3_optimization"]["top_recommendations"]:
132 | print(f" • {rec['bucket']}: {rec['recommendation']} - {rec['potential_savings']}")
133 |
134 | print("\nWell-Architected Framework Guidance:")
135 | wa_s3 = enhanced_s3["wellarchitected_framework"]
136 |
137 | print(" High Priority Actions:")
138 | for action in wa_s3["implementation_priority"]["high"]:
139 | print(f" • {action}")
140 |
141 | print(" Medium Priority Actions:")
142 | for action in wa_s3["implementation_priority"]["medium"][:2]: # Show first 2
143 | print(f" • {action}")
144 |
145 | print("\n" + "=" * 70)
146 |
147 | # Example 3: Multi-Service Comprehensive Analysis
148 | print("\n3. Multi-Service Comprehensive Analysis - Enhanced Output:")
149 | print("-" * 50)
150 | comprehensive_result = {
151 | "status": "success",
152 | "comprehensive_analysis": {
153 | "overview": {
154 | "total_monthly_cost": "$8,450.25",
155 | "total_potential_savings": "$2,180.75",
156 | "savings_percentage": "25.8%",
157 | "services_analyzed": ["EC2", "EBS", "RDS", "Lambda", "S3"]
158 | },
159 | "service_breakdown": {
160 | "ec2": {"current_cost": 3200, "potential_savings": 640, "optimization_opportunities": 12},
161 | "ebs": {"current_cost": 850, "potential_savings": 180, "optimization_opportunities": 8},
162 | "rds": {"current_cost": 2100, "potential_savings": 420, "optimization_opportunities": 3},
163 | "lambda": {"current_cost": 150, "potential_savings": 45, "optimization_opportunities": 15},
164 | "s3": {"current_cost": 2150, "potential_savings": 895, "optimization_opportunities": 22}
165 | },
166 | "top_opportunities": [
167 | {"service": "S3", "type": "Storage Class Optimization", "savings": 895, "effort": "Low"},
168 | {"service": "EC2", "type": "Right-sizing", "savings": 640, "effort": "Medium"},
169 | {"service": "RDS", "type": "Reserved Instances", "savings": 420, "effort": "Low"}
170 | ]
171 | }
172 | }
173 |
174 | enhanced_comprehensive = add_documentation_links(comprehensive_result, None, "comprehensive")
175 |
176 | print("Cost Overview:")
177 | overview = enhanced_comprehensive["comprehensive_analysis"]["overview"]
178 | print(f" • Current Monthly Cost: ${overview['total_monthly_cost']}")
179 | print(f" • Potential Savings: ${overview['total_potential_savings']} ({overview['savings_percentage']})")
180 |
181 | print("\nTop Optimization Opportunities:")
182 | for opp in enhanced_comprehensive["comprehensive_analysis"]["top_opportunities"]:
183 | print(f" • {opp['service']} - {opp['type']}: ${opp['savings']}/month ({opp['effort']} effort)")
184 |
185 | print("\nWell-Architected Framework Principles:")
186 | wa_comp = enhanced_comprehensive["wellarchitected_framework"]
187 | for principle in wa_comp["principles"][:3]: # Show first 3
188 | print(f" • {principle['title']}")
189 | print(f" {principle['description']}")
190 | print(f" Key practices: {', '.join(principle['best_practices'][:2])}")
191 | print()
192 |
193 | print("=" * 70)
194 | print("\nEnhanced Features Summary:")
195 | print("✓ Documentation links to AWS best practices")
196 | print("✓ Well-Architected Framework Cost Optimization pillar mapping")
197 | print("✓ Service-specific implementation guidance")
198 | print("✓ Impact assessment and priority ranking")
199 | print("✓ Principle-based recommendations")
200 | print("✓ Actionable next steps with implementation details")
201 |
202 | if __name__ == "__main__":
203 | show_enhanced_examples()
```
--------------------------------------------------------------------------------
/RUNBOOKS_GUIDE.md:
--------------------------------------------------------------------------------
```markdown
1 | # AWS Cost Optimization Runbooks with MCP v3
2 |
3 | This guide shows how to use the AWS Cost Optimization Runbooks with the MCP server that includes proper Cost Optimization Hub permissions.
4 |
5 | ## What's Included
6 |
7 | ### Core AWS Services
8 | - ✅ **Cost Explorer** - Retrieve cost data and usage metrics
9 | - ✅ **Cost Optimization Hub** - With correct permissions and API calls
10 | - ✅ **Compute Optimizer** - Get right-sizing recommendations
11 | - ✅ **Trusted Advisor** - Cost optimization checks
12 | - ✅ **Performance Insights** - RDS performance metrics
13 |
14 | ### Cost Optimization Runbooks
15 | - 🔧 **EC2 Right Sizing** - Identify underutilized EC2 instances
16 | - 💾 **EBS Optimization** - Find unused and underutilized volumes
17 | - 🗄️ **RDS Optimization** - Identify idle and underutilized databases
18 | - ⚡ **Lambda Optimization** - Find overprovisioned and unused functions
19 | - 📋 **CloudTrail Optimization** - Identify duplicate management event trails
20 | - 📊 **Comprehensive Analysis** - Multi-service cost analysis
21 |
22 | ## Quick Start
23 |
24 | ### 1. Setup
25 | ```bash
26 | cd <replace-with-project-folder>/
27 |
28 | # Make sure all files are executable
29 | chmod +x mcp_server_with_runbooks.py
30 |
31 | # Test the server
32 | python3 -m py_compile mcp_server_with_runbooks.py
33 | python3 -c "from playbooks.ec2.ec2_optimization import run_ec2_right_sizing_analysis; print('Playbooks OK')"
34 | ```
35 |
36 | ### 2. Configure AWS Permissions
37 |
38 | Apply the correct IAM policy for Cost Optimization Hub:
39 |
40 | ```json
41 | {
42 | "Version": "2012-10-17",
43 | "Statement": [
44 | {
45 | "Effect": "Allow",
46 | "Action": [
47 | "cost-optimization-hub:ListEnrollmentStatuses",
48 | "cost-optimization-hub:ListRecommendations",
49 | "cost-optimization-hub:GetRecommendation",
50 | "cost-optimization-hub:ListRecommendationSummaries",
51 | "ce:GetCostAndUsage",
52 | "ce:GetCostForecast",
53 | "compute-optimizer:GetEC2InstanceRecommendations",
54 | "compute-optimizer:GetEBSVolumeRecommendations",
55 | "compute-optimizer:GetLambdaFunctionRecommendations",
56 | "ec2:DescribeInstances",
57 | "ec2:DescribeVolumes",
58 | "rds:DescribeDBInstances",
59 | "lambda:ListFunctions",
60 | "cloudwatch:GetMetricStatistics",
61 | "s3:ListBucket",
62 | "s3:ListObjectsV2",
63 | "support:DescribeTrustedAdvisorChecks",
64 | "support:DescribeTrustedAdvisorCheckResult",
65 | "pi:GetResourceMetrics",
66 | "cloudtrail:DescribeTrails",
67 | "cloudtrail:GetTrailStatus",
68 | "cloudtrail:GetEventSelectors"
69 | ],
70 | "Resource": "*"
71 | }
72 | ]
73 | }
74 | ```
75 |
76 | ### 3. Install dependencies
77 | pip install -r requirements_fixed.txt
78 |
79 | ### 4. Configure AWS credentials
80 | aws configure
81 |
82 | ### 5. Add the MCP server config to Amazon Q using the mcp_runbooks.json as a template
83 | vi ~/.aws/amazonq/mcp.json
84 |
85 | ## Available Runbook Tools
86 |
87 | ### EC2 Right Sizing Runbooks
88 |
89 | #### 1. `ec2_rightsizing`
90 | Analyze EC2 instances for right-sizing opportunities.
91 |
92 | **Example Usage:**
93 | ```
94 | "Run EC2 right-sizing analysis for us-east-1 region with 14-day lookback period"
95 | ```
96 |
97 | **Parameters:**
98 | - `region`: AWS region to analyze
99 | - `lookback_period_days`: Days to analyze (default: 14)
100 | - `cpu_threshold`: CPU utilization threshold % (default: 40.0)
101 |
102 | #### 2. `ec2_report`
103 | Generate comprehensive EC2 right-sizing report.
104 |
105 | **Example Usage:**
106 | ```
107 | "Generate an EC2 right-sizing report for us-east-1 in markdown format"
108 | ```
109 |
110 | ### EBS Optimization Runbooks
111 |
112 | #### 1. `ebs_optimization`
113 | Analyze EBS volumes for optimization opportunities.
114 |
115 | **Example Usage:**
116 | ```
117 | "Analyze EBS volumes in us-east-1 for optimization opportunities"
118 | ```
119 |
120 | #### 2. `ebs_unused`
121 | Find unused EBS volumes that can be deleted.
122 |
123 | **Example Usage:**
124 | ```
125 | "Find unused EBS volumes older than 30 days in us-east-1"
126 | ```
127 |
128 | #### 3. `ebs_report`
129 | Generate comprehensive EBS optimization report.
130 |
131 | **Example Usage:**
132 | ```
133 | "Generate a comprehensive EBS optimization report for us-east-1"
134 | ```
135 |
136 | ### RDS Optimization Runbooks
137 |
138 | #### 1. `rds_optimization`
139 | Analyze RDS instances for optimization opportunities.
140 |
141 | **Example Usage:**
142 | ```
143 | "Analyze RDS instances in us-east-1 for underutilization"
144 | ```
145 |
146 | #### 2. `rds_idle`
147 | Find idle RDS instances with minimal activity.
148 |
149 | **Example Usage:**
150 | ```
151 | "Find idle RDS instances with less than 1 connection in the last 7 days"
152 | ```
153 |
154 | #### 3. `rds_report`
155 | Generate comprehensive RDS optimization report.
156 |
157 | **Example Usage:**
158 | ```
159 | "Generate an RDS optimization report for us-east-1"
160 | ```
161 |
162 | ### Lambda Optimization Runbooks
163 |
164 | #### 1. `lambda_optimization`
165 | Analyze Lambda functions for optimization opportunities.
166 |
167 | **Example Usage:**
168 | ```
169 | "Analyze Lambda functions in us-east-1 for memory optimization"
170 | ```
171 |
172 | #### 2. `lambda_unused`
173 | Find unused Lambda functions.
174 |
175 | **Example Usage:**
176 | ```
177 | "Find Lambda functions with less than 5 invocations in the last 30 days"
178 | ```
179 |
180 | #### 3. `lambda_report`
181 | Generate comprehensive Lambda optimization report.
182 |
183 | **Example Usage:**
184 | ```
185 | "Generate a Lambda optimization report for us-east-1"
186 | ```
187 |
188 | ### CloudTrail Optimization Runbooks
189 |
190 | #### 1. `get_management_trails`
191 | Get CloudTrail trails that have management events enabled.
192 |
193 | **Example Usage:**
194 | ```
195 | "Show me all CloudTrail trails with management events enabled in us-east-1"
196 | ```
197 |
198 | #### 2. `run_cloudtrail_trails_analysis`
199 | Analyze CloudTrail trails to identify duplicate management event trails.
200 |
201 | **Example Usage:**
202 | ```
203 | "Analyze CloudTrail trails in us-east-1 for cost optimization opportunities"
204 | ```
205 |
206 | **Parameters:**
207 | - `region`: AWS region to analyze
208 |
209 | #### 3. `generate_cloudtrail_report`
210 | Generate comprehensive CloudTrail optimization report.
211 |
212 | **Example Usage:**
213 | ```
214 | "Generate a CloudTrail optimization report for us-east-1 in markdown format"
215 | ```
216 |
217 | **Parameters:**
218 | - `region`: AWS region to analyze
219 | - `format`: "json" or "markdown" (default: "json")
220 |
221 | ### Comprehensive Analysis
222 |
223 | #### `comprehensive_analysis`
224 | Run analysis across all services (EC2, EBS, RDS, Lambda).
225 |
226 | **Example Usage:**
227 | ```
228 | "Run comprehensive cost analysis for us-east-1 covering all services"
229 | ```
230 |
231 | **Parameters:**
232 | - `region`: AWS region to analyze
233 | - `services`: Array of services ["ec2", "ebs", "rds", "lambda"]
234 | - `lookback_period_days`: Days to analyze (default: 14)
235 | - `output_format`: "json" or "markdown"
236 |
237 | ### Cost Optimization Hub Tools (Shortened)
238 |
239 | #### 1. `list_coh_enrollment`
240 | Check Cost Optimization Hub enrollment status.
241 |
242 | #### 2. `get_coh_recommendations`
243 | Get cost optimization recommendations.
244 |
245 | #### 3. `get_coh_summaries`
246 | Get recommendation summaries.
247 |
248 | #### 4. `get_coh_recommendation`
249 | Get specific recommendation by ID.
250 |
251 | ## Sample Conversation Flow
252 | **Configure AWS credentials**
253 | ```aws configure```
254 |
255 | **Add the MCP server config to Amazon Q using the mcp_runbooks.json as a template**
256 | ```vi ~/.aws/amazonq/mcp.json```
257 |
258 | ```bash
259 | # Start Q with runbooks
260 | q chat
261 | ```
262 |
263 | **User:** "What cost optimization tools are available?"
264 |
265 | **Q:** "I can see several AWS cost optimization tools including Cost Optimization Hub, runbooks for EC2, EBS, RDS, and Lambda optimization..."
266 |
267 | **User:** "Run a comprehensive cost analysis for us-east-1"
268 |
269 | **Q:** "I'll run a comprehensive cost analysis across all services for the us-east-1 region..."
270 | *[Uses comprehensive_analysis tool]*
271 |
272 | **User:** "Show me unused EBS volumes that are costing money"
273 |
274 | **Q:** "Let me identify unused EBS volumes in your account..."
275 | *[Uses ebs_unused tool]*
276 |
277 | **User:** "Generate an EC2 right-sizing report in markdown format"
278 |
279 | **Q:** "I'll generate a detailed EC2 right-sizing report in markdown format..."
280 | *[Uses ec2_report tool]*
281 |
282 | ## Tool Names (For Reference)
283 |
284 | The tool names have been shortened to fit MCP's 64-character limit:
285 |
286 | | Purpose | Tool Name |
287 | |----------|----------|
288 | | `Run EC2 right sizing analysis` | `ec2_rightsizing` |
289 | | `Generate EC2 right sizing report` | `ec2_report` |
290 | | `Run EBS optimization analysis` | `ebs_optimization` |
291 | | `Identify unused EBS volumes` | `ebs_unused` |
292 | | `Generate EBS optimization report` | `ebs_report` |
293 | | `Run RDS optimization analysis` | `rds_optimization` |
294 | | `Iidentify idle RDS instances` | `rds_idle` |
295 | | `Generate RDS optimization report` | `rds_report` |
296 | | `Run Lambda optimization analysis` | `lambda_optimization` |
297 | | `Identify unused Lambda functions` | `lambda_unused` |
298 | | `Generate Lambda optimization report` | `lambda_report` |
299 | | `Run comprehensive cost analysis` | `comprehensive_analysis` |
300 | | `Get CloudTrail management trails` | `get_management_trails` |
301 | | `Run CloudTrail trails analysis` | `run_cloudtrail_trails_analysis` |
302 | | `Generate CloudTrail optimization report` | `generate_cloudtrail_report` |
303 | | `List Cost Optimization Hub enrollment statuses` | `list_coh_enrollment` |
304 | | `Get Cost Optimization Hub recommendations` | `get_coh_recommendations` |
305 | | `Get Cost Optimization Hub recommendation summaries` | `get_coh_summaries` |
306 | | `Get a particular Cost Optimization Hub recommendation` | `get_coh_recommendation` |
307 |
308 | ## Troubleshooting
309 |
310 | ### Common Issues
311 |
312 | 1. **Import Error for playbook functions**
313 | ```bash
314 | # Make sure PYTHONPATH is set in mcp_runbooks.json
315 | export PYTHONPATH="<replace-with-project-folder>"
316 | ```
317 |
318 | 2. **Cost Optimization Hub Errors**
319 | ```bash
320 | # Run the diagnostic first
321 | python3 diagnose_cost_optimization_hub_v2.py
322 | ```
```
--------------------------------------------------------------------------------
/utils/cache_decorator.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Caching Decorator with TTL Support
3 |
4 | Provides DAO-level caching for CloudWatch service methods with:
5 | - Time-to-live (TTL) support
6 | - Page-aware cache keys
7 | - Thread-safe implementation
8 | - Memory-efficient LRU eviction
9 | - Optional caching (can be disabled via environment variable or parameter)
10 | """
11 |
12 | import functools
13 | import hashlib
14 | import json
15 | import logging
16 | import os
17 | import time
18 | from typing import Any, Callable, Dict, Optional
19 | from threading import Lock
20 |
21 | logger = logging.getLogger(__name__)
22 |
23 | # Global flag to enable/disable caching
24 | # Can be controlled via environment variable: CFM_ENABLE_CACHE=false
25 | _CACHE_ENABLED = os.getenv('CFM_ENABLE_CACHE', 'true').lower() in ('true', '1', 'yes')
26 |
27 |
28 | class TTLCache:
29 | """Thread-safe cache with TTL support."""
30 |
31 | def __init__(self, ttl_seconds: int = 600, max_size: int = 1000):
32 | """
33 | Initialize TTL cache.
34 |
35 | Args:
36 | ttl_seconds: Time-to-live in seconds (default: 600 = 10 minutes)
37 | max_size: Maximum cache entries (default: 1000)
38 | """
39 | self.ttl_seconds = ttl_seconds
40 | self.max_size = max_size
41 | self.cache: Dict[str, tuple[Any, float]] = {}
42 | self.lock = Lock()
43 | self.hits = 0
44 | self.misses = 0
45 |
46 | def _is_expired(self, timestamp: float) -> bool:
47 | """Check if cache entry is expired."""
48 | return (time.time() - timestamp) > self.ttl_seconds
49 |
50 | def _evict_expired(self):
51 | """Remove expired entries."""
52 | current_time = time.time()
53 | expired_keys = [
54 | key for key, (_, timestamp) in self.cache.items()
55 | if (current_time - timestamp) > self.ttl_seconds
56 | ]
57 | for key in expired_keys:
58 | del self.cache[key]
59 |
60 | def _evict_lru(self):
61 | """Evict least recently used entries if cache is full."""
62 | if len(self.cache) >= self.max_size:
63 | # Remove oldest 10% of entries
64 | sorted_items = sorted(self.cache.items(), key=lambda x: x[1][1])
65 | num_to_remove = max(1, len(sorted_items) // 10)
66 | for key, _ in sorted_items[:num_to_remove]:
67 | del self.cache[key]
68 |
69 | def get(self, key: str) -> Optional[Any]:
70 | """Get value from cache if not expired."""
71 | with self.lock:
72 | if key in self.cache:
73 | value, timestamp = self.cache[key]
74 | if not self._is_expired(timestamp):
75 | self.hits += 1
76 | logger.debug(f"Cache HIT for key: {key[:50]}...")
77 | return value
78 | else:
79 | # Remove expired entry
80 | del self.cache[key]
81 | logger.debug(f"Cache EXPIRED for key: {key[:50]}...")
82 |
83 | self.misses += 1
84 | logger.debug(f"Cache MISS for key: {key[:50]}...")
85 | return None
86 |
87 | def set(self, key: str, value: Any):
88 | """Set value in cache with current timestamp."""
89 | with self.lock:
90 | # Evict expired entries periodically
91 | if len(self.cache) % 100 == 0:
92 | self._evict_expired()
93 |
94 | # Evict LRU if cache is full
95 | self._evict_lru()
96 |
97 | self.cache[key] = (value, time.time())
98 | logger.debug(f"Cache SET for key: {key[:50]}...")
99 |
100 | def clear(self):
101 | """Clear all cache entries."""
102 | with self.lock:
103 | self.cache.clear()
104 | self.hits = 0
105 | self.misses = 0
106 | logger.info("Cache cleared")
107 |
108 | def get_stats(self) -> Dict[str, Any]:
109 | """Get cache statistics."""
110 | with self.lock:
111 | total_requests = self.hits + self.misses
112 | hit_rate = (self.hits / total_requests * 100) if total_requests > 0 else 0
113 | return {
114 | 'size': len(self.cache),
115 | 'max_size': self.max_size,
116 | 'hits': self.hits,
117 | 'misses': self.misses,
118 | 'hit_rate': round(hit_rate, 2),
119 | 'ttl_seconds': self.ttl_seconds
120 | }
121 |
122 |
123 | # Global cache instance for CloudWatch DAO methods
124 | _cloudwatch_cache = TTLCache(ttl_seconds=600, max_size=1000)
125 |
126 |
127 | def _generate_cache_key(func_name: str, args: tuple, kwargs: dict) -> str:
128 | """
129 | Generate cache key from function name and arguments.
130 |
131 | Includes page parameter to ensure pagination is cached correctly.
132 | """
133 | # Extract key parameters
134 | key_parts = [func_name]
135 |
136 | # Add positional args (excluding self)
137 | if args and len(args) > 1:
138 | key_parts.extend(str(arg) for arg in args[1:])
139 |
140 | # Add important kwargs (including page)
141 | important_params = [
142 | 'region', 'page', 'lookback_days', 'namespace_filter',
143 | 'log_group_name_prefix', 'alarm_name_prefix', 'dashboard_name_prefix',
144 | 'can_spend_for_estimate', 'can_spend_for_exact_usage_estimate'
145 | ]
146 |
147 | for param in important_params:
148 | if param in kwargs:
149 | key_parts.append(f"{param}={kwargs[param]}")
150 |
151 | # Create hash of key parts
152 | key_string = "|".join(str(part) for part in key_parts)
153 | key_hash = hashlib.md5(key_string.encode()).hexdigest()
154 |
155 | return f"{func_name}:{key_hash}"
156 |
157 |
158 | def dao_cache(ttl_seconds: int = 600, enabled: Optional[bool] = None):
159 | """
160 | Decorator for caching DAO method results with TTL.
161 |
162 | Caching can be disabled globally via CFM_ENABLE_CACHE environment variable
163 | or per-decorator via the enabled parameter.
164 |
165 | Args:
166 | ttl_seconds: Time-to-live in seconds (default: 600 = 10 minutes)
167 | enabled: Override global cache setting (None = use global, True/False = force)
168 |
169 | Usage:
170 | # Use global cache setting
171 | @dao_cache(ttl_seconds=600)
172 | async def get_log_groups(self, page: int = 1, **kwargs):
173 | pass
174 |
175 | # Force caching disabled for this method
176 | @dao_cache(ttl_seconds=600, enabled=False)
177 | async def get_real_time_data(self, **kwargs):
178 | pass
179 |
180 | # Force caching enabled for this method
181 | @dao_cache(ttl_seconds=600, enabled=True)
182 | async def get_expensive_data(self, **kwargs):
183 | pass
184 |
185 | Environment Variables:
186 | CFM_ENABLE_CACHE: Set to 'false', '0', or 'no' to disable caching globally
187 | """
188 | def decorator(func: Callable) -> Callable:
189 | @functools.wraps(func)
190 | async def async_wrapper(*args, **kwargs):
191 | # Check if caching is enabled
192 | cache_enabled = enabled if enabled is not None else _CACHE_ENABLED
193 |
194 | if not cache_enabled:
195 | logger.debug(f"Cache disabled for {func.__name__}, calling function directly")
196 | return await func(*args, **kwargs)
197 |
198 | # Generate cache key
199 | cache_key = _generate_cache_key(func.__name__, args, kwargs)
200 |
201 | # Try to get from cache
202 | cached_value = _cloudwatch_cache.get(cache_key)
203 | if cached_value is not None:
204 | logger.debug(f"Returning cached result for {func.__name__}")
205 | return cached_value
206 |
207 | # Call original function
208 | result = await func(*args, **kwargs)
209 |
210 | # Cache the result
211 | _cloudwatch_cache.set(cache_key, result)
212 |
213 | return result
214 |
215 | @functools.wraps(func)
216 | def sync_wrapper(*args, **kwargs):
217 | # Check if caching is enabled
218 | cache_enabled = enabled if enabled is not None else _CACHE_ENABLED
219 |
220 | if not cache_enabled:
221 | logger.debug(f"Cache disabled for {func.__name__}, calling function directly")
222 | return func(*args, **kwargs)
223 |
224 | # Generate cache key
225 | cache_key = _generate_cache_key(func.__name__, args, kwargs)
226 |
227 | # Try to get from cache
228 | cached_value = _cloudwatch_cache.get(cache_key)
229 | if cached_value is not None:
230 | logger.debug(f"Returning cached result for {func.__name__}")
231 | return cached_value
232 |
233 | # Call original function
234 | result = func(*args, **kwargs)
235 |
236 | # Cache the result
237 | _cloudwatch_cache.set(cache_key, result)
238 |
239 | return result
240 |
241 | # Return appropriate wrapper based on function type
242 | if asyncio.iscoroutinefunction(func):
243 | return async_wrapper
244 | else:
245 | return sync_wrapper
246 |
247 | return decorator
248 |
249 |
250 | def get_cache_stats() -> Dict[str, Any]:
251 | """Get cache statistics."""
252 | stats = _cloudwatch_cache.get_stats()
253 | stats['enabled'] = _CACHE_ENABLED
254 | return stats
255 |
256 |
257 | def clear_cache():
258 | """Clear all cache entries."""
259 | _cloudwatch_cache.clear()
260 |
261 |
262 | def is_cache_enabled() -> bool:
263 | """Check if caching is currently enabled."""
264 | return _CACHE_ENABLED
265 |
266 |
267 | def enable_cache():
268 | """Enable caching globally (runtime override)."""
269 | global _CACHE_ENABLED
270 | _CACHE_ENABLED = True
271 | logger.info("Cache enabled globally")
272 |
273 |
274 | def disable_cache():
275 | """Disable caching globally (runtime override)."""
276 | global _CACHE_ENABLED
277 | _CACHE_ENABLED = False
278 | logger.info("Cache disabled globally")
279 |
280 |
281 | def set_cache_enabled(enabled: bool):
282 | """
283 | Set cache enabled state.
284 |
285 | Args:
286 | enabled: True to enable caching, False to disable
287 | """
288 | global _CACHE_ENABLED
289 | _CACHE_ENABLED = enabled
290 | logger.info(f"Cache {'enabled' if enabled else 'disabled'} globally")
291 |
292 |
293 | # Import asyncio at the end to avoid circular imports
294 | import asyncio
295 |
```
--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_mcp_surface_pagination.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Test MCP surface pagination with real parameters to analyze response structure.
4 | """
5 |
6 | import pytest
7 | import sys
8 | import os
9 | import json
10 | from datetime import datetime
11 |
12 | # Add the project root to the path
13 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))
14 |
15 |
16 | @pytest.mark.skip(reason="Tests need refactoring to match actual API structure")
17 | class TestMCPSurfacePagination:
18 | """Test MCP surface pagination with real parameters."""
19 |
20 | @pytest.mark.asyncio
21 | async def test_mcp_cloudwatch_general_spend_analysis_surface(self):
22 | """Test MCP surface call with real parameters to analyze response structure."""
23 |
24 | # Import the MCP function directly
25 | from runbook_functions import run_cloudwatch_general_spend_analysis
26 |
27 | # Test parameters from user request
28 | test_params = {
29 | "region": "us-east-1",
30 | "lookback_days": 30,
31 | "allow_minimal_cost_metrics": True,
32 | "page": 1
33 | }
34 |
35 | print(f"\n=== Testing MCP Surface Call ===")
36 | print(f"Parameters: {json.dumps(test_params, indent=2)}")
37 |
38 | # Call the actual MCP function
39 | result = await run_cloudwatch_general_spend_analysis(test_params)
40 |
41 | # Parse the response
42 | response_text = result[0].text
43 | response_data = json.loads(response_text)
44 |
45 | print(f"\n=== Response Structure Analysis ===")
46 | print(f"Response keys: {list(response_data.keys())}")
47 |
48 | # Check if pagination exists in the response
49 | if 'data' in response_data:
50 | print(f"Data keys: {list(response_data['data'].keys())}")
51 |
52 | # Look for pagination in different places
53 | pagination_locations = []
54 |
55 | # Check top-level data pagination
56 | if 'pagination' in response_data['data']:
57 | pagination_locations.append('data.pagination')
58 | print(f"Found pagination at data.pagination: {response_data['data']['pagination']}")
59 |
60 | # Check configuration_analysis sections
61 | if 'configuration_analysis' in response_data['data']:
62 | config_analysis = response_data['data']['configuration_analysis']
63 | print(f"Configuration analysis keys: {list(config_analysis.keys())}")
64 |
65 | for section_name, section_data in config_analysis.items():
66 | if isinstance(section_data, dict) and 'pagination' in section_data:
67 | pagination_locations.append(f'data.configuration_analysis.{section_name}.pagination')
68 | print(f"Found pagination at data.configuration_analysis.{section_name}.pagination: {section_data['pagination']}")
69 |
70 | print(f"\nPagination found at locations: {pagination_locations}")
71 |
72 | # Check for items/data arrays
73 | items_locations = []
74 | if 'configuration_analysis' in response_data['data']:
75 | config_analysis = response_data['data']['configuration_analysis']
76 | for section_name, section_data in config_analysis.items():
77 | if isinstance(section_data, dict):
78 | if 'items' in section_data:
79 | items_count = len(section_data['items']) if isinstance(section_data['items'], list) else 'not_list'
80 | items_locations.append(f'data.configuration_analysis.{section_name}.items ({items_count} items)')
81 |
82 | # Check for specific data arrays
83 | for data_key in ['log_groups', 'metrics', 'alarms', 'dashboards']:
84 | if data_key in section_data and isinstance(section_data[data_key], list):
85 | items_count = len(section_data[data_key])
86 | items_locations.append(f'data.configuration_analysis.{section_name}.{data_key} ({items_count} items)')
87 |
88 | print(f"Items/data arrays found at: {items_locations}")
89 |
90 | # Check response metadata
91 | if 'runbook_metadata' in response_data:
92 | print(f"Runbook metadata keys: {list(response_data['runbook_metadata'].keys())}")
93 |
94 | if 'orchestrator_metadata' in response_data:
95 | print(f"Orchestrator metadata keys: {list(response_data['orchestrator_metadata'].keys())}")
96 |
97 | # Check for page-related fields at top level
98 | page_fields = []
99 | for key in response_data.keys():
100 | if 'page' in key.lower() or 'pagination' in key.lower():
101 | page_fields.append(f"{key}: {response_data[key]}")
102 |
103 | if page_fields:
104 | print(f"Top-level page-related fields: {page_fields}")
105 |
106 | # Print full response structure (truncated for readability)
107 | print(f"\n=== Full Response Structure (first 2000 chars) ===")
108 | response_str = json.dumps(response_data, indent=2, default=str)
109 | print(response_str[:2000] + "..." if len(response_str) > 2000 else response_str)
110 |
111 | # Assertions to verify the response structure
112 | assert isinstance(response_data, dict), "Response should be a dictionary"
113 | assert 'status' in response_data, "Response should have status field"
114 | assert 'data' in response_data, "Response should have data field"
115 |
116 | # Test passes if we get a valid response structure
117 | print(f"\n=== Test Result ===")
118 | print(f"✅ MCP surface call successful")
119 | print(f"✅ Response structure analyzed")
120 | print(f"✅ Pagination locations identified: {len(pagination_locations) if 'pagination_locations' in locals() else 0}")
121 |
122 | @pytest.mark.asyncio
123 | async def test_mcp_cloudwatch_metrics_optimization_surface(self):
124 | """Test MCP metrics optimization surface call."""
125 |
126 | from runbook_functions import run_cloudwatch_metrics_optimization
127 |
128 | test_params = {
129 | "region": "us-east-1",
130 | "lookback_days": 30,
131 | "allow_minimal_cost_metrics": True,
132 | "page": 1
133 | }
134 |
135 | print(f"\n=== Testing Metrics Optimization MCP Surface Call ===")
136 |
137 | result = await run_cloudwatch_metrics_optimization(test_params)
138 | response_data = json.loads(result[0].text)
139 |
140 | # Check for pagination in metrics response
141 | pagination_found = False
142 | if 'data' in response_data and 'configuration_analysis' in response_data['data']:
143 | config_analysis = response_data['data']['configuration_analysis']
144 | if 'metrics' in config_analysis and 'pagination' in config_analysis['metrics']:
145 | pagination_found = True
146 | pagination_info = config_analysis['metrics']['pagination']
147 | print(f"Metrics pagination: {pagination_info}")
148 |
149 | print(f"Metrics optimization pagination found: {pagination_found}")
150 | assert isinstance(response_data, dict), "Metrics response should be a dictionary"
151 |
152 | @pytest.mark.asyncio
153 | async def test_pagination_consistency_across_apis(self):
154 | """Test pagination consistency across different CloudWatch APIs."""
155 |
156 | from runbook_functions import (
157 | run_cloudwatch_general_spend_analysis,
158 | run_cloudwatch_metrics_optimization,
159 | run_cloudwatch_logs_optimization,
160 | run_cloudwatch_alarms_and_dashboards_optimization
161 | )
162 |
163 | test_params = {
164 | "region": "us-east-1",
165 | "lookback_days": 30,
166 | "allow_minimal_cost_metrics": True,
167 | "page": 1
168 | }
169 |
170 | apis_to_test = [
171 | ("general_spend", run_cloudwatch_general_spend_analysis),
172 | ("metrics", run_cloudwatch_metrics_optimization),
173 | ("logs", run_cloudwatch_logs_optimization),
174 | ("alarms", run_cloudwatch_alarms_and_dashboards_optimization),
175 | ]
176 |
177 | pagination_structures = {}
178 |
179 | for api_name, api_func in apis_to_test:
180 | print(f"\n=== Testing {api_name} API ===")
181 |
182 | try:
183 | result = await api_func(test_params)
184 | response_data = json.loads(result[0].text)
185 |
186 | # Find pagination structures
187 | pagination_paths = []
188 | if 'data' in response_data and 'configuration_analysis' in response_data['data']:
189 | config_analysis = response_data['data']['configuration_analysis']
190 | for section_name, section_data in config_analysis.items():
191 | if isinstance(section_data, dict) and 'pagination' in section_data:
192 | pagination_paths.append(f"data.configuration_analysis.{section_name}.pagination")
193 |
194 | pagination_structures[api_name] = pagination_paths
195 | print(f"{api_name} pagination paths: {pagination_paths}")
196 |
197 | except Exception as e:
198 | print(f"Error testing {api_name}: {str(e)}")
199 | pagination_structures[api_name] = f"ERROR: {str(e)}"
200 |
201 | print(f"\n=== Pagination Structure Summary ===")
202 | for api_name, paths in pagination_structures.items():
203 | print(f"{api_name}: {paths}")
204 |
205 | # Test passes if we collected pagination info from all APIs
206 | assert len(pagination_structures) == len(apis_to_test), "Should test all APIs"
207 |
208 |
209 | if __name__ == '__main__':
210 | pytest.main([__file__, '-v', '-s']) # -s to show print statements
```
--------------------------------------------------------------------------------
/utils/cleanup_manager.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Cleanup Manager for CFM Tips MCP Server
3 |
4 | Handles automatic cleanup of sessions, results, and temporary data.
5 | """
6 |
7 | import logging
8 | import threading
9 | import time
10 | import os
11 | import glob
12 | from datetime import datetime, timedelta
13 | from typing import Dict, Any, Optional
14 |
15 | logger = logging.getLogger(__name__)
16 |
17 | class CleanupManager:
18 | """Manages automatic cleanup of sessions and temporary data."""
19 |
20 | def __init__(self,
21 | session_timeout_minutes: int = 60,
22 | result_retention_minutes: int = 120,
23 | cleanup_interval_minutes: int = 15):
24 | self.session_timeout_minutes = session_timeout_minutes
25 | self.result_retention_minutes = result_retention_minutes
26 | self.cleanup_interval_minutes = cleanup_interval_minutes
27 | self._shutdown = False
28 | self._cleanup_thread = None
29 |
30 | # Start cleanup thread
31 | self._start_cleanup_thread()
32 |
33 | def _start_cleanup_thread(self):
34 | """Start the background cleanup thread."""
35 | if self._cleanup_thread is None or not self._cleanup_thread.is_alive():
36 | self._cleanup_thread = threading.Thread(
37 | target=self._cleanup_worker,
38 | daemon=True,
39 | name="CleanupManager"
40 | )
41 | self._cleanup_thread.start()
42 | logger.info(f"Cleanup manager started (session timeout: {self.session_timeout_minutes}min)")
43 |
44 | def _cleanup_worker(self):
45 | """Background worker for periodic cleanup."""
46 | while not self._shutdown:
47 | try:
48 | self._perform_cleanup()
49 | time.sleep(self.cleanup_interval_minutes * 60)
50 | except Exception as e:
51 | logger.error(f"Error in cleanup worker: {e}")
52 | time.sleep(60) # Wait 1 minute on error
53 |
54 | def _perform_cleanup(self):
55 | """Perform all cleanup operations."""
56 | logger.debug("Starting periodic cleanup")
57 |
58 | # Clean up session files
59 | self._cleanup_session_files()
60 |
61 | # Clean up temporary files
62 | self._cleanup_temp_files()
63 |
64 | # Clean up old log files
65 | self._cleanup_log_files()
66 |
67 | logger.debug("Periodic cleanup completed")
68 |
69 | def _cleanup_session_files(self):
70 | """Clean up old session database files."""
71 | try:
72 | sessions_dir = "sessions"
73 | if not os.path.exists(sessions_dir):
74 | return
75 |
76 | cutoff_time = datetime.now() - timedelta(minutes=self.session_timeout_minutes)
77 | cleaned_count = 0
78 |
79 | # Find all session database files
80 | session_files = glob.glob(os.path.join(sessions_dir, "session_*.db"))
81 |
82 | for session_file in session_files:
83 | try:
84 | # Check file modification time
85 | file_mtime = datetime.fromtimestamp(os.path.getmtime(session_file))
86 |
87 | if file_mtime < cutoff_time:
88 | # Remove old session file
89 | os.remove(session_file)
90 | cleaned_count += 1
91 | logger.debug(f"Cleaned up old session file: {session_file}")
92 |
93 | # Also remove any associated WAL and SHM files
94 | for ext in ['-wal', '-shm']:
95 | wal_file = session_file + ext
96 | if os.path.exists(wal_file):
97 | os.remove(wal_file)
98 |
99 | except Exception as e:
100 | logger.warning(f"Error cleaning session file {session_file}: {e}")
101 |
102 | if cleaned_count > 0:
103 | logger.info(f"Cleaned up {cleaned_count} old session files")
104 |
105 | except Exception as e:
106 | logger.error(f"Error in session file cleanup: {e}")
107 |
108 | def _cleanup_temp_files(self):
109 | """Clean up temporary files and directories."""
110 | try:
111 | temp_patterns = [
112 | "*.tmp",
113 | "*.temp",
114 | "__pycache__/*.pyc",
115 | "*.log.old"
116 | ]
117 |
118 | cleaned_count = 0
119 |
120 | for pattern in temp_patterns:
121 | temp_files = glob.glob(pattern, recursive=True)
122 |
123 | for temp_file in temp_files:
124 | try:
125 | # Check if file is older than retention period
126 | file_mtime = datetime.fromtimestamp(os.path.getmtime(temp_file))
127 | cutoff_time = datetime.now() - timedelta(minutes=self.result_retention_minutes)
128 |
129 | if file_mtime < cutoff_time:
130 | if os.path.isfile(temp_file):
131 | os.remove(temp_file)
132 | cleaned_count += 1
133 | elif os.path.isdir(temp_file):
134 | import shutil
135 | shutil.rmtree(temp_file)
136 | cleaned_count += 1
137 |
138 | except Exception as e:
139 | logger.warning(f"Error cleaning temp file {temp_file}: {e}")
140 |
141 | if cleaned_count > 0:
142 | logger.info(f"Cleaned up {cleaned_count} temporary files")
143 |
144 | except Exception as e:
145 | logger.error(f"Error in temp file cleanup: {e}")
146 |
147 | def _cleanup_log_files(self):
148 | """Clean up old log files."""
149 | try:
150 | logs_dir = "logs"
151 | if not os.path.exists(logs_dir):
152 | return
153 |
154 | # Keep logs for 7 days
155 | cutoff_time = datetime.now() - timedelta(days=7)
156 | cleaned_count = 0
157 |
158 | log_files = glob.glob(os.path.join(logs_dir, "*.log.*"))
159 |
160 | for log_file in log_files:
161 | try:
162 | file_mtime = datetime.fromtimestamp(os.path.getmtime(log_file))
163 |
164 | if file_mtime < cutoff_time:
165 | os.remove(log_file)
166 | cleaned_count += 1
167 | logger.debug(f"Cleaned up old log file: {log_file}")
168 |
169 | except Exception as e:
170 | logger.warning(f"Error cleaning log file {log_file}: {e}")
171 |
172 | if cleaned_count > 0:
173 | logger.info(f"Cleaned up {cleaned_count} old log files")
174 |
175 | except Exception as e:
176 | logger.error(f"Error in log file cleanup: {e}")
177 |
178 | def force_cleanup(self):
179 | """Force immediate cleanup of all resources."""
180 | logger.info("Forcing immediate cleanup")
181 | self._perform_cleanup()
182 |
183 | # Also clean up from session manager and parallel executor
184 | try:
185 | from . import get_session_manager, get_parallel_executor
186 |
187 | # Clean up session manager
188 | session_manager = get_session_manager()
189 | session_manager._cleanup_expired_sessions()
190 |
191 | # Clean up parallel executor results
192 | executor = get_parallel_executor()
193 | executor.clear_results(older_than_minutes=self.result_retention_minutes)
194 |
195 | logger.info("Force cleanup completed")
196 |
197 | except Exception as e:
198 | logger.error(f"Error in force cleanup: {e}")
199 |
200 | def get_cleanup_stats(self) -> Dict[str, Any]:
201 | """Get statistics about cleanup operations."""
202 | try:
203 | stats = {
204 | 'session_timeout_minutes': self.session_timeout_minutes,
205 | 'result_retention_minutes': self.result_retention_minutes,
206 | 'cleanup_interval_minutes': self.cleanup_interval_minutes,
207 | 'cleanup_thread_alive': self._cleanup_thread.is_alive() if self._cleanup_thread else False,
208 | 'sessions_directory_exists': os.path.exists('sessions'),
209 | 'logs_directory_exists': os.path.exists('logs')
210 | }
211 |
212 | # Count current files
213 | if os.path.exists('sessions'):
214 | session_files = glob.glob('sessions/session_*.db')
215 | stats['active_session_files'] = len(session_files)
216 | else:
217 | stats['active_session_files'] = 0
218 |
219 | if os.path.exists('logs'):
220 | log_files = glob.glob('logs/*.log*')
221 | stats['log_files'] = len(log_files)
222 | else:
223 | stats['log_files'] = 0
224 |
225 | return stats
226 |
227 | except Exception as e:
228 | logger.error(f"Error getting cleanup stats: {e}")
229 | return {'error': str(e)}
230 |
231 | def update_settings(self,
232 | session_timeout_minutes: Optional[int] = None,
233 | result_retention_minutes: Optional[int] = None,
234 | cleanup_interval_minutes: Optional[int] = None):
235 | """Update cleanup settings."""
236 | if session_timeout_minutes is not None:
237 | self.session_timeout_minutes = session_timeout_minutes
238 | logger.info(f"Updated session timeout to {session_timeout_minutes} minutes")
239 |
240 | if result_retention_minutes is not None:
241 | self.result_retention_minutes = result_retention_minutes
242 | logger.info(f"Updated result retention to {result_retention_minutes} minutes")
243 |
244 | if cleanup_interval_minutes is not None:
245 | self.cleanup_interval_minutes = cleanup_interval_minutes
246 | logger.info(f"Updated cleanup interval to {cleanup_interval_minutes} minutes")
247 |
248 | def shutdown(self):
249 | """Shutdown the cleanup manager."""
250 | logger.info("Shutting down cleanup manager")
251 | self._shutdown = True
252 |
253 | # Perform final cleanup
254 | try:
255 | self._perform_cleanup()
256 | except Exception as e:
257 | logger.error(f"Error in final cleanup: {e}")
258 |
259 | # Wait for cleanup thread to finish
260 | if self._cleanup_thread and self._cleanup_thread.is_alive():
261 | self._cleanup_thread.join(timeout=10)
262 |
263 | # Global cleanup manager instance
264 | _cleanup_manager = None
265 |
266 | def get_cleanup_manager() -> CleanupManager:
267 | """Get the global cleanup manager instance."""
268 | global _cleanup_manager
269 | if _cleanup_manager is None:
270 | _cleanup_manager = CleanupManager()
271 | return _cleanup_manager
```
--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_cloudwatch_pagination_architecture.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Test CloudWatch Pagination Architecture
4 |
5 | This test validates that the CloudWatch pagination system works correctly:
6 | 1. Fetches ALL data from AWS using proper NextToken pagination
7 | 2. Sorts client-side by estimated cost (descending)
8 | 3. Applies client-side pagination for MCP responses
9 |
10 | The key insight: "Pagination breaking" isn't an API error - it's the correct
11 | architecture handling large datasets that may cause performance issues.
12 | """
13 |
14 | import pytest
15 | import asyncio
16 | from unittest.mock import Mock, patch, AsyncMock
17 | from playbooks.cloudwatch.result_processor import CloudWatchResultProcessor
18 | from services.cloudwatch_service import CloudWatchService, CloudWatchOperationResult
19 |
20 |
21 | class TestCloudWatchPaginationArchitecture:
22 | """Test the CloudWatch pagination architecture end-to-end."""
23 |
24 | def setup_method(self):
25 | """Set up test fixtures."""
26 | self.result_processor = CloudWatchResultProcessor()
27 |
28 | def test_pagination_metadata_calculation(self):
29 | """Test that pagination metadata is calculated correctly."""
30 | # Test with 25 items, page size 10
31 | total_items = 25
32 |
33 | # Page 1: items 0-9
34 | metadata_p1 = self.result_processor.create_pagination_metadata(total_items, 1)
35 | assert metadata_p1.current_page == 1
36 | assert metadata_p1.page_size == 10
37 | assert metadata_p1.total_items == 25
38 | assert metadata_p1.total_pages == 3
39 | assert metadata_p1.has_next_page == True
40 | assert metadata_p1.has_previous_page == False
41 |
42 | # Page 2: items 10-19
43 | metadata_p2 = self.result_processor.create_pagination_metadata(total_items, 2)
44 | assert metadata_p2.current_page == 2
45 | assert metadata_p2.has_next_page == True
46 | assert metadata_p2.has_previous_page == True
47 |
48 | # Page 3: items 20-24 (partial page)
49 | metadata_p3 = self.result_processor.create_pagination_metadata(total_items, 3)
50 | assert metadata_p3.current_page == 3
51 | assert metadata_p3.has_next_page == False
52 | assert metadata_p3.has_previous_page == True
53 |
54 | # Page 4: beyond data (empty)
55 | metadata_p4 = self.result_processor.create_pagination_metadata(total_items, 4)
56 | assert metadata_p4.current_page == 4
57 | assert metadata_p4.has_next_page == False
58 | assert metadata_p4.has_previous_page == True
59 |
60 | def test_client_side_pagination_slicing(self):
61 | """Test that client-side pagination slices data correctly."""
62 | # Create test data
63 | items = [{'id': i, 'name': f'item_{i}'} for i in range(25)]
64 |
65 | # Test page 1 (items 0-9)
66 | result_p1 = self.result_processor.paginate_results(items, 1)
67 | assert len(result_p1['items']) == 10
68 | assert result_p1['items'][0]['id'] == 0
69 | assert result_p1['items'][9]['id'] == 9
70 | assert result_p1['pagination']['current_page'] == 1
71 | assert result_p1['pagination']['total_pages'] == 3
72 |
73 | # Test page 2 (items 10-19)
74 | result_p2 = self.result_processor.paginate_results(items, 2)
75 | assert len(result_p2['items']) == 10
76 | assert result_p2['items'][0]['id'] == 10
77 | assert result_p2['items'][9]['id'] == 19
78 |
79 | # Test page 3 (items 20-24, partial page)
80 | result_p3 = self.result_processor.paginate_results(items, 3)
81 | assert len(result_p3['items']) == 5
82 | assert result_p3['items'][0]['id'] == 20
83 | assert result_p3['items'][4]['id'] == 24
84 |
85 | # Test page 4 (beyond data, empty)
86 | result_p4 = self.result_processor.paginate_results(items, 4)
87 | assert len(result_p4['items']) == 0
88 | assert result_p4['pagination']['current_page'] == 4
89 |
90 | def test_cost_based_sorting_before_pagination(self):
91 | """Test that items are sorted by cost before pagination."""
92 | # Create test metrics with different estimated costs
93 | metrics = [
94 | {'MetricName': 'LowCost', 'Namespace': 'AWS/EC2', 'Dimensions': []},
95 | {'MetricName': 'HighCost', 'Namespace': 'Custom/App', 'Dimensions': [{'Name': 'Instance', 'Value': 'i-123'}]},
96 | {'MetricName': 'MediumCost', 'Namespace': 'AWS/Lambda', 'Dimensions': [{'Name': 'Function', 'Value': 'test'}]},
97 | ]
98 |
99 | # Process with cost enrichment and sorting
100 | enriched = self.result_processor.enrich_items_with_cost_estimates(metrics, 'metrics')
101 | sorted_metrics = self.result_processor.sort_by_cost_descending(enriched)
102 |
103 | # Verify that enrichment adds cost estimates
104 | assert all('estimated_monthly_cost' in metric for metric in sorted_metrics)
105 |
106 | # Verify sorting works (items are in descending cost order)
107 | costs = [metric['estimated_monthly_cost'] for metric in sorted_metrics]
108 | assert costs == sorted(costs, reverse=True) # Should be in descending order
109 |
110 | # Verify custom namespace gets higher cost estimate than AWS namespaces
111 | custom_metrics = [m for m in sorted_metrics if not m['Namespace'].startswith('AWS/')]
112 | aws_metrics = [m for m in sorted_metrics if m['Namespace'].startswith('AWS/')]
113 |
114 | if custom_metrics and aws_metrics:
115 | # Custom metrics should generally have higher costs than AWS metrics
116 | max_custom_cost = max(m['estimated_monthly_cost'] for m in custom_metrics)
117 | max_aws_cost = max(m['estimated_monthly_cost'] for m in aws_metrics)
118 | # Note: This might be 0.0 for both in test environment, which is fine
119 |
120 | @pytest.mark.skip(reason="Test needs refactoring - mock setup is incorrect")
121 | @pytest.mark.asyncio
122 | async def test_aws_pagination_architecture(self):
123 | """Test that AWS API pagination works correctly (NextToken only)."""
124 |
125 | # Mock CloudWatch service
126 | mock_cloudwatch_service = Mock(spec=CloudWatchService)
127 |
128 | # Mock paginated response from AWS
129 | mock_response_page1 = CloudWatchOperationResult(
130 | success=True,
131 | data={
132 | 'metrics': [{'MetricName': f'Metric_{i}', 'Namespace': 'AWS/EC2'} for i in range(500)],
133 | 'total_count': 500,
134 | 'filtered': False
135 | },
136 | operation_name='list_metrics'
137 | )
138 |
139 | mock_cloudwatch_service.list_metrics.return_value = mock_response_page1
140 |
141 | # Test that service is called correctly (no MaxRecords parameter)
142 | result = await mock_cloudwatch_service.list_metrics(namespace='AWS/EC2')
143 |
144 | # Verify the call was made without MaxRecords
145 | mock_cloudwatch_service.list_metrics.assert_called_once_with(namespace='AWS/EC2')
146 |
147 | # Verify we got the expected data structure
148 | assert result.success == True
149 | assert len(result.data['metrics']) == 500
150 | assert result.data['total_count'] == 500
151 |
152 | def test_pagination_architecture_documentation(self):
153 | """Document the pagination architecture for future reference."""
154 |
155 | architecture_doc = {
156 | "cloudwatch_pagination_architecture": {
157 | "step_1_aws_fetch": {
158 | "description": "Fetch ALL data from AWS using proper NextToken pagination",
159 | "method": "AWS paginator with NextToken (no MaxRecords)",
160 | "apis_used": ["list_metrics", "describe_alarms", "describe_log_groups"],
161 | "result": "Complete dataset in arbitrary AWS order"
162 | },
163 | "step_2_client_sort": {
164 | "description": "Sort client-side by estimated cost (descending)",
165 | "method": "Cost estimation using free metadata + sorting",
166 | "cost": "Zero additional API calls",
167 | "result": "Dataset ordered by cost (highest first)"
168 | },
169 | "step_3_client_paginate": {
170 | "description": "Apply client-side pagination for MCP response",
171 | "method": "Array slicing with 10 items per page",
172 | "page_size": 10,
173 | "result": "Paginated response with metadata"
174 | }
175 | },
176 | "why_this_architecture": {
177 | "aws_limitation": "AWS APIs return data in arbitrary order, not by cost",
178 | "sorting_requirement": "Users want to see highest-cost items first",
179 | "solution": "Fetch all, sort by cost, then paginate for display"
180 | },
181 | "performance_considerations": {
182 | "large_datasets": "4000+ metrics may cause timeouts",
183 | "memory_usage": "All data loaded into memory for sorting",
184 | "optimization": "Caching and progressive loading implemented"
185 | }
186 | }
187 |
188 | # This test passes if the architecture is documented
189 | assert architecture_doc["cloudwatch_pagination_architecture"]["step_1_aws_fetch"]["method"] == "AWS paginator with NextToken (no MaxRecords)"
190 | assert architecture_doc["cloudwatch_pagination_architecture"]["step_3_client_paginate"]["page_size"] == 10
191 |
192 | def test_edge_cases_pagination(self):
193 | """Test edge cases in pagination."""
194 |
195 | # Empty dataset
196 | empty_result = self.result_processor.paginate_results([], 1)
197 | assert len(empty_result['items']) == 0
198 | assert empty_result['pagination']['total_pages'] == 0
199 | assert empty_result['pagination']['has_next_page'] == False
200 |
201 | # Single item
202 | single_item = [{'id': 1}]
203 | single_result = self.result_processor.paginate_results(single_item, 1)
204 | assert len(single_result['items']) == 1
205 | assert single_result['pagination']['total_pages'] == 1
206 |
207 | # Exactly page size (10 items)
208 | exact_page = [{'id': i} for i in range(10)]
209 | exact_result = self.result_processor.paginate_results(exact_page, 1)
210 | assert len(exact_result['items']) == 10
211 | assert exact_result['pagination']['total_pages'] == 1
212 | assert exact_result['pagination']['has_next_page'] == False
213 |
214 | # Invalid page numbers
215 | items = [{'id': i} for i in range(5)]
216 |
217 | # Page 0 should default to page 1
218 | page_0_result = self.result_processor.paginate_results(items, 0)
219 | assert page_0_result['pagination']['current_page'] == 1
220 |
221 | # Negative page should default to page 1
222 | negative_result = self.result_processor.paginate_results(items, -1)
223 | assert negative_result['pagination']['current_page'] == 1
224 |
225 |
226 | if __name__ == "__main__":
227 | pytest.main([__file__, "-v"])
```
--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_mcp_pagination_bug.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Unit tests to identify MCP pagination bug in CloudWatch metrics optimization.
3 |
4 | Tests the complete flow from MCP tool call through orchestrator to result processor
5 | to identify where pagination is being bypassed.
6 | """
7 |
8 | import pytest
9 | import asyncio
10 | import json
11 | from unittest.mock import patch, AsyncMock, MagicMock
12 | from mcp.types import TextContent
13 |
14 | # Import the functions under test
15 | from runbook_functions import run_cloudwatch_metrics_optimization
16 |
17 |
18 | @pytest.mark.skip(reason="Tests need refactoring to match actual API structure")
19 | class TestMCPPaginationBug:
20 | """Test suite to identify where pagination is failing in the MCP flow."""
21 |
22 | @pytest.fixture
23 | def mock_large_metrics_dataset(self):
24 | """Create a large dataset of 25 metrics for pagination testing."""
25 | return [
26 | {
27 | 'MetricName': f'CustomMetric{i:02d}',
28 | 'Namespace': 'Custom/Application' if i % 2 == 0 else 'AWS/EC2',
29 | 'Dimensions': [{'Name': 'InstanceId', 'Value': f'i-{i:08x}'}],
30 | 'estimated_monthly_cost': 10.0 - (i * 0.2) # Decreasing cost
31 | }
32 | for i in range(25)
33 | ]
34 |
35 | @pytest.fixture
36 | def mock_orchestrator_response(self, mock_large_metrics_dataset):
37 | """Mock orchestrator response with proper structure."""
38 | def create_response(page=1):
39 | # Simulate orchestrator pagination - should return only 10 items per page
40 | start_idx = (page - 1) * 10
41 | end_idx = start_idx + 10
42 | page_metrics = mock_large_metrics_dataset[start_idx:end_idx]
43 |
44 | return {
45 | 'status': 'success',
46 | 'data': {
47 | 'metrics_configuration_analysis': {
48 | 'metrics': {
49 | 'metrics': page_metrics,
50 | 'pagination': {
51 | 'current_page': page,
52 | 'page_size': 10,
53 | 'total_items': 25,
54 | 'total_pages': 3,
55 | 'has_next_page': page < 3,
56 | 'has_previous_page': page > 1
57 | },
58 | 'total_count': 25,
59 | 'namespace': 'all',
60 | 'filtered': False
61 | }
62 | }
63 | },
64 | 'orchestrator_metadata': {
65 | 'session_id': 'test-session',
66 | 'region': 'us-east-1'
67 | }
68 | }
69 | return create_response
70 |
71 | @pytest.mark.asyncio
72 | async def test_orchestrator_pagination_works(self, mock_orchestrator_response):
73 | """Test that orchestrator correctly applies pagination."""
74 |
75 | with patch('runbook_functions.CloudWatchOptimizationOrchestrator') as mock_orchestrator_class:
76 | # Setup mock orchestrator instance
77 | mock_orchestrator = AsyncMock()
78 | mock_orchestrator_class.return_value = mock_orchestrator
79 |
80 | # Mock execute_analysis to return paginated responses
81 | def mock_execute_analysis(analysis_type, **kwargs):
82 | page = kwargs.get('page', 1)
83 | return mock_orchestrator_response(page)
84 |
85 | mock_orchestrator.execute_analysis = AsyncMock(side_effect=mock_execute_analysis)
86 |
87 | # Test page 1
88 | result_p1 = await run_cloudwatch_metrics_optimization({
89 | 'region': 'us-east-1',
90 | 'page': 1,
91 | 'lookback_days': 30
92 | })
93 |
94 | # Verify result structure
95 | assert len(result_p1) == 1
96 | assert isinstance(result_p1[0], TextContent)
97 |
98 | # Parse JSON response
99 | data_p1 = json.loads(result_p1[0].text)
100 | assert data_p1['status'] == 'success'
101 |
102 | # Check metrics data
103 | metrics_data_p1 = data_p1['data']['metrics_configuration_analysis']['metrics']
104 | assert len(metrics_data_p1['metrics']) == 10, "Page 1 should have 10 metrics"
105 | assert metrics_data_p1['pagination']['current_page'] == 1
106 | assert metrics_data_p1['pagination']['total_items'] == 25
107 |
108 | # Test page 2
109 | result_p2 = await run_cloudwatch_metrics_optimization({
110 | 'region': 'us-east-1',
111 | 'page': 2,
112 | 'lookback_days': 30
113 | })
114 |
115 | data_p2 = json.loads(result_p2[0].text)
116 | metrics_data_p2 = data_p2['data']['metrics_configuration_analysis']['metrics']
117 | assert len(metrics_data_p2['metrics']) == 10, "Page 2 should have 10 metrics"
118 | assert metrics_data_p2['pagination']['current_page'] == 2
119 |
120 | # Verify different metrics on different pages
121 | p1_names = [m['MetricName'] for m in metrics_data_p1['metrics']]
122 | p2_names = [m['MetricName'] for m in metrics_data_p2['metrics']]
123 | assert p1_names != p2_names, "Page 1 and Page 2 should have different metrics"
124 |
125 | # Verify orchestrator was called with correct parameters
126 | assert mock_orchestrator.execute_analysis.call_count == 2
127 |
128 | # Check first call (page 1)
129 | first_call_args = mock_orchestrator.execute_analysis.call_args_list[0]
130 | assert first_call_args[0][0] == 'metrics_optimization'
131 | assert first_call_args[1]['page'] == 1
132 |
133 | # Check second call (page 2)
134 | second_call_args = mock_orchestrator.execute_analysis.call_args_list[1]
135 | assert second_call_args[1]['page'] == 2
136 |
137 | @pytest.mark.asyncio
138 | async def test_mcp_tool_bypasses_pagination(self):
139 | """Test to identify if MCP tool is bypassing orchestrator pagination."""
140 |
141 | # This test will help identify if there's a direct MCP call bypassing the orchestrator
142 | with patch('runbook_functions.CloudWatchOptimizationOrchestrator') as mock_orchestrator_class:
143 | mock_orchestrator = AsyncMock()
144 | mock_orchestrator_class.return_value = mock_orchestrator
145 |
146 | # Mock orchestrator to return a response indicating it was called
147 | mock_orchestrator.execute_analysis = AsyncMock(return_value={
148 | 'status': 'success',
149 | 'data': {
150 | 'metrics_configuration_analysis': {
151 | 'metrics': {
152 | 'metrics': [],
153 | 'pagination': {'current_page': 1, 'total_items': 0},
154 | 'orchestrator_called': True # Flag to verify orchestrator was used
155 | }
156 | }
157 | }
158 | })
159 |
160 | # Also patch any potential direct MCP calls
161 | with patch('runbook_functions.mcp_cfm_tips_cloudwatch_metrics_optimization') as mock_mcp:
162 | mock_mcp.return_value = {
163 | 'status': 'success',
164 | 'data': {'direct_mcp_call': True} # Flag to identify direct MCP call
165 | }
166 |
167 | result = await run_cloudwatch_metrics_optimization({
168 | 'region': 'us-east-1',
169 | 'page': 1
170 | })
171 |
172 | # Parse result
173 | data = json.loads(result[0].text)
174 |
175 | # Check if orchestrator was used (expected behavior)
176 | if 'orchestrator_called' in str(data):
177 | print("✅ Orchestrator was called - pagination should work")
178 | assert mock_orchestrator.execute_analysis.called
179 | assert not mock_mcp.called, "Direct MCP call should not be made"
180 |
181 | # Check if direct MCP call was made (bug scenario)
182 | elif 'direct_mcp_call' in str(data):
183 | pytest.fail("❌ BUG IDENTIFIED: Direct MCP call bypassing orchestrator pagination")
184 |
185 | else:
186 | pytest.fail("❌ Unable to determine call path - check test setup")
187 |
188 | @pytest.mark.asyncio
189 | async def test_result_processor_pagination(self):
190 | """Test that result processor correctly paginates metrics."""
191 |
192 | from playbooks.cloudwatch.result_processor import CloudWatchResultProcessor
193 |
194 | # Create test metrics
195 | test_metrics = [
196 | {'MetricName': f'Metric{i}', 'estimated_monthly_cost': 10 - i}
197 | for i in range(25)
198 | ]
199 |
200 | processor = CloudWatchResultProcessor()
201 |
202 | # Test page 1
203 | result_p1 = processor.process_metrics_results(test_metrics, page=1)
204 | assert len(result_p1['items']) == 10
205 | assert result_p1['pagination']['current_page'] == 1
206 | assert result_p1['pagination']['total_items'] == 25
207 |
208 | # Test page 2
209 | result_p2 = processor.process_metrics_results(test_metrics, page=2)
210 | assert len(result_p2['items']) == 10
211 | assert result_p2['pagination']['current_page'] == 2
212 |
213 | # Verify different items
214 | p1_names = [item['MetricName'] for item in result_p1['items']]
215 | p2_names = [item['MetricName'] for item in result_p2['items']]
216 | assert p1_names != p2_names
217 |
218 | @pytest.mark.asyncio
219 | async def test_orchestrator_apply_result_processing(self):
220 | """Test that orchestrator's _apply_result_processing works correctly."""
221 |
222 | from playbooks.cloudwatch.optimization_orchestrator import CloudWatchOptimizationOrchestrator
223 |
224 | # Create mock result with metrics
225 | mock_result = {
226 | 'status': 'success',
227 | 'data': {
228 | 'metrics_configuration_analysis': {
229 | 'metrics': {
230 | 'metrics': [
231 | {'MetricName': f'Metric{i}', 'estimated_monthly_cost': 10 - i}
232 | for i in range(25)
233 | ],
234 | 'total_count': 25
235 | }
236 | }
237 | }
238 | }
239 |
240 | orchestrator = CloudWatchOptimizationOrchestrator(region='us-east-1')
241 |
242 | # Test pagination application
243 | processed_result = orchestrator._apply_result_processing(mock_result, page=1)
244 |
245 | metrics_data = processed_result['data']['metrics_configuration_analysis']['metrics']
246 | assert len(metrics_data['metrics']) == 10, "Should be paginated to 10 items"
247 | assert 'pagination' in metrics_data, "Should have pagination metadata"
248 | assert metrics_data['pagination']['current_page'] == 1
249 | assert metrics_data['pagination']['total_items'] == 25
```
--------------------------------------------------------------------------------
/diagnose_cost_optimization_hub_v2.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Diagnostic script for Cost Optimization Hub issues - Updated with correct permissions
4 | """
5 |
6 | import boto3
7 | import json
8 | from botocore.exceptions import ClientError
9 |
10 | def check_cost_optimization_hub():
11 | """Check Cost Optimization Hub status and common issues."""
12 | print("🔍 Diagnosing Cost Optimization Hub Issues")
13 | print("=" * 50)
14 |
15 | # Test different regions where Cost Optimization Hub is available
16 | regions_to_test = ['us-east-1', 'us-west-2', 'eu-west-1', 'ap-southeast-1']
17 |
18 | for region in regions_to_test:
19 | print(f"\n📍 Testing region: {region}")
20 | try:
21 | client = boto3.client('cost-optimization-hub', region_name=region)
22 |
23 | # Test 1: Check enrollment statuses (correct API call)
24 | print(" ✅ Testing enrollment statuses...")
25 | try:
26 | enrollment_response = client.list_enrollment_statuses()
27 | print(f" 📊 Enrollment Response: {json.dumps(enrollment_response, indent=2, default=str)}")
28 |
29 | # Check if any accounts are enrolled
30 | items = enrollment_response.get('items', [])
31 | if items:
32 | active_enrollments = [item for item in items if item.get('status') == 'Active']
33 | if active_enrollments:
34 | print(f" ✅ Found {len(active_enrollments)} active enrollments")
35 |
36 | # Test 2: Try to list recommendations
37 | print(" ✅ Testing list recommendations...")
38 | try:
39 | recommendations = client.list_recommendations(maxResults=5)
40 | print(f" 📊 Found {len(recommendations.get('items', []))} recommendations")
41 | print(" ✅ Cost Optimization Hub is working correctly!")
42 | return True
43 |
44 | except ClientError as rec_error:
45 | print(f" ❌ Error listing recommendations: {rec_error.response['Error']['Code']} - {rec_error.response['Error']['Message']}")
46 | else:
47 | print(" ⚠️ No active enrollments found")
48 | print(" 💡 You need to enable Cost Optimization Hub in the AWS Console")
49 | else:
50 | print(" ⚠️ No enrollment information found")
51 | print(" 💡 Cost Optimization Hub may not be set up for this account")
52 |
53 | except ClientError as enrollment_error:
54 | error_code = enrollment_error.response['Error']['Code']
55 | error_message = enrollment_error.response['Error']['Message']
56 |
57 | if error_code == 'AccessDeniedException':
58 | print(" ❌ Access denied - check IAM permissions")
59 | print(" 💡 Required permissions: cost-optimization-hub:ListEnrollmentStatuses")
60 | elif error_code == 'ValidationException':
61 | print(f" ❌ Validation error: {error_message}")
62 | else:
63 | print(f" ❌ Error: {error_code} - {error_message}")
64 |
65 | except Exception as e:
66 | print(f" ❌ Failed to create client for region {region}: {str(e)}")
67 |
68 | return False
69 |
70 | def check_iam_permissions():
71 | """Check IAM permissions for Cost Optimization Hub."""
72 | print("\n🔐 Checking IAM Permissions")
73 | print("=" * 30)
74 |
75 | try:
76 | # Get current user/role
77 | sts_client = boto3.client('sts')
78 | identity = sts_client.get_caller_identity()
79 | print(f"Current identity: {identity.get('Arn', 'Unknown')}")
80 |
81 | # Correct required actions for Cost Optimization Hub
82 | required_actions = [
83 | 'cost-optimization-hub:ListEnrollmentStatuses',
84 | 'cost-optimization-hub:UpdateEnrollmentStatus',
85 | 'cost-optimization-hub:GetPreferences',
86 | 'cost-optimization-hub:UpdatePreferences',
87 | 'cost-optimization-hub:GetRecommendation',
88 | 'cost-optimization-hub:ListRecommendations',
89 | 'cost-optimization-hub:ListRecommendationSummaries'
90 | ]
91 |
92 | print("\nRequired permissions for Cost Optimization Hub:")
93 | for action in required_actions:
94 | print(f" - {action}")
95 |
96 | print("\nMinimal permissions for read-only access:")
97 | minimal_actions = [
98 | 'cost-optimization-hub:ListEnrollmentStatuses',
99 | 'cost-optimization-hub:ListRecommendations',
100 | 'cost-optimization-hub:GetRecommendation',
101 | 'cost-optimization-hub:ListRecommendationSummaries'
102 | ]
103 | for action in minimal_actions:
104 | print(f" - {action}")
105 |
106 | except Exception as e:
107 | print(f"Error checking IAM: {str(e)}")
108 |
109 | def test_individual_apis():
110 | """Test individual Cost Optimization Hub APIs."""
111 | print("\n🧪 Testing Individual APIs")
112 | print("=" * 30)
113 |
114 | try:
115 | client = boto3.client('cost-optimization-hub', region_name='us-east-1')
116 |
117 | # Test 1: List Enrollment Statuses
118 | print("\n1. Testing list_enrollment_statuses...")
119 | try:
120 | response = client.list_enrollment_statuses()
121 | print(f" ✅ Success: Found {len(response.get('items', []))} enrollment records")
122 | except ClientError as e:
123 | print(f" ❌ Failed: {e.response['Error']['Code']} - {e.response['Error']['Message']}")
124 |
125 | # Test 2: List Recommendations
126 | print("\n2. Testing list_recommendations...")
127 | try:
128 | response = client.list_recommendations(maxResults=5)
129 | print(f" ✅ Success: Found {len(response.get('items', []))} recommendations")
130 | except ClientError as e:
131 | print(f" ❌ Failed: {e.response['Error']['Code']} - {e.response['Error']['Message']}")
132 |
133 | # Test 3: List Recommendation Summaries
134 | print("\n3. Testing list_recommendation_summaries...")
135 | try:
136 | response = client.list_recommendation_summaries(maxResults=5)
137 | print(f" ✅ Success: Found {len(response.get('items', []))} summaries")
138 | except ClientError as e:
139 | print(f" ❌ Failed: {e.response['Error']['Code']} - {e.response['Error']['Message']}")
140 |
141 | except Exception as e:
142 | print(f"Error testing APIs: {str(e)}")
143 |
144 | def provide_correct_iam_policy():
145 | """Provide the correct IAM policy for Cost Optimization Hub."""
146 | print("\n📋 Correct IAM Policy")
147 | print("=" * 25)
148 |
149 | policy = {
150 | "Version": "2012-10-17",
151 | "Statement": [
152 | {
153 | "Effect": "Allow",
154 | "Action": [
155 | "cost-optimization-hub:ListEnrollmentStatuses",
156 | "cost-optimization-hub:UpdateEnrollmentStatus",
157 | "cost-optimization-hub:GetPreferences",
158 | "cost-optimization-hub:UpdatePreferences",
159 | "cost-optimization-hub:GetRecommendation",
160 | "cost-optimization-hub:ListRecommendations",
161 | "cost-optimization-hub:ListRecommendationSummaries"
162 | ],
163 | "Resource": "*"
164 | }
165 | ]
166 | }
167 |
168 | print("Full IAM Policy:")
169 | print(json.dumps(policy, indent=2))
170 |
171 | minimal_policy = {
172 | "Version": "2012-10-17",
173 | "Statement": [
174 | {
175 | "Effect": "Allow",
176 | "Action": [
177 | "cost-optimization-hub:ListEnrollmentStatuses",
178 | "cost-optimization-hub:ListRecommendations",
179 | "cost-optimization-hub:GetRecommendation",
180 | "cost-optimization-hub:ListRecommendationSummaries"
181 | ],
182 | "Resource": "*"
183 | }
184 | ]
185 | }
186 |
187 | print("\nMinimal Read-Only Policy:")
188 | print(json.dumps(minimal_policy, indent=2))
189 |
190 | def provide_solutions():
191 | """Provide solutions for common Cost Optimization Hub issues."""
192 | print("\n🛠️ Updated Solutions")
193 | print("=" * 20)
194 |
195 | solutions = [
196 | {
197 | "issue": "AccessDeniedException",
198 | "solution": [
199 | "1. Add the correct IAM permissions (see policy above)",
200 | "2. The service uses different permission names than other AWS services",
201 | "3. Use 'cost-optimization-hub:ListEnrollmentStatuses' not 'GetEnrollmentStatus'",
202 | "4. Attach the policy to your IAM user/role"
203 | ]
204 | },
205 | {
206 | "issue": "No enrollment found",
207 | "solution": [
208 | "1. Go to AWS Console → Cost Optimization Hub",
209 | "2. Enable the service for your account",
210 | "3. Wait for enrollment to complete",
211 | "4. URL: https://console.aws.amazon.com/cost-optimization-hub/"
212 | ]
213 | },
214 | {
215 | "issue": "Service not available",
216 | "solution": [
217 | "1. Cost Optimization Hub is only available in specific regions",
218 | "2. Use us-east-1, us-west-2, eu-west-1, or ap-southeast-1",
219 | "3. The service may not be available in your region yet"
220 | ]
221 | },
222 | {
223 | "issue": "No recommendations found",
224 | "solution": [
225 | "1. Cost Optimization Hub needs time to analyze your resources",
226 | "2. Ensure you have resources running for at least 14 days",
227 | "3. The service needs sufficient usage data to generate recommendations",
228 | "4. Check if you have any EC2, RDS, or other supported resources"
229 | ]
230 | }
231 | ]
232 |
233 | for solution in solutions:
234 | print(f"\n🔧 {solution['issue']}:")
235 | for step in solution['solution']:
236 | print(f" {step}")
237 |
238 | def main():
239 | """Main diagnostic function."""
240 | print("AWS Cost Optimization Hub Diagnostic Tool v2")
241 | print("=" * 50)
242 |
243 | try:
244 | # Run diagnostics
245 | hub_working = check_cost_optimization_hub()
246 | check_iam_permissions()
247 | test_individual_apis()
248 | provide_correct_iam_policy()
249 | provide_solutions()
250 |
251 | print("\n" + "=" * 60)
252 | if hub_working:
253 | print("✅ DIAGNOSIS: Cost Optimization Hub appears to be working!")
254 | else:
255 | print("❌ DIAGNOSIS: Cost Optimization Hub needs to be set up.")
256 |
257 | print("\n📝 Next Steps:")
258 | print("1. Apply the correct IAM policy shown above")
259 | print("2. Enable Cost Optimization Hub in the AWS Console if needed")
260 | print("3. Use the updated MCP server (mcp_server_fixed_v3.py)")
261 | print("4. Test with the enrollment status tool first")
262 |
263 | except Exception as e:
264 | print(f"\n❌ Diagnostic failed: {str(e)}")
265 | print("Please check your AWS credentials and try again.")
266 |
267 | if __name__ == "__main__":
268 | main()
269 |
```
--------------------------------------------------------------------------------
/utils/parallel_executor.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Parallel Execution Engine for CFM Tips MCP Server
3 |
4 | Provides optimized parallel execution of AWS service calls with session integration.
5 | """
6 |
7 | import logging
8 | import time
9 | import threading
10 | from concurrent.futures import ThreadPoolExecutor, as_completed, Future
11 | from typing import Dict, List, Any, Optional, Callable, Union
12 | from dataclasses import dataclass
13 | from datetime import datetime
14 |
15 | logger = logging.getLogger(__name__)
16 |
17 | @dataclass
18 | class TaskResult:
19 | """Result of a parallel task execution."""
20 | task_id: str
21 | service: str
22 | operation: str
23 | status: str # 'success', 'error', 'timeout'
24 | data: Any = None
25 | error: Optional[str] = None
26 | execution_time: float = 0.0
27 | timestamp: datetime = None
28 |
29 | def __post_init__(self):
30 | if self.timestamp is None:
31 | self.timestamp = datetime.now()
32 |
33 | @dataclass
34 | class ParallelTask:
35 | """Definition of a task to be executed in parallel."""
36 | task_id: str
37 | service: str
38 | operation: str
39 | function: Callable
40 | args: tuple = ()
41 | kwargs: Dict[str, Any] = None
42 | timeout: float = 30.0
43 | priority: int = 1 # Higher number = higher priority
44 |
45 | def __post_init__(self):
46 | if self.kwargs is None:
47 | self.kwargs = {}
48 |
49 | class ParallelExecutor:
50 | """Executes AWS service calls in parallel with optimized resource management."""
51 |
52 | def __init__(self, max_workers: int = 10, default_timeout: float = 30.0):
53 | self.max_workers = max_workers
54 | self.default_timeout = default_timeout
55 | self.executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="CFM-Worker")
56 | self._active_tasks: Dict[str, Future] = {}
57 | self._results: Dict[str, TaskResult] = {}
58 | self._lock = threading.RLock()
59 |
60 | logger.info(f"Initialized ParallelExecutor with {max_workers} workers")
61 |
62 | def submit_task(self, task: ParallelTask) -> str:
63 | """Submit a task for parallel execution."""
64 | with self._lock:
65 | if task.task_id in self._active_tasks:
66 | raise ValueError(f"Task {task.task_id} already exists")
67 |
68 | future = self.executor.submit(self._execute_task, task)
69 | self._active_tasks[task.task_id] = future
70 |
71 | logger.debug(f"Submitted task {task.task_id} ({task.service}.{task.operation})")
72 | return task.task_id
73 |
74 | def submit_batch(self, tasks: List[ParallelTask]) -> List[str]:
75 | """Submit multiple tasks for parallel execution."""
76 | # Sort by priority (higher first)
77 | sorted_tasks = sorted(tasks, key=lambda t: t.priority, reverse=True)
78 |
79 | task_ids = []
80 | for task in sorted_tasks:
81 | try:
82 | task_id = self.submit_task(task)
83 | task_ids.append(task_id)
84 | except Exception as e:
85 | logger.error(f"Error submitting task {task.task_id}: {e}")
86 | # Create error result
87 | error_result = TaskResult(
88 | task_id=task.task_id,
89 | service=task.service,
90 | operation=task.operation,
91 | status='error',
92 | error=str(e)
93 | )
94 | with self._lock:
95 | self._results[task.task_id] = error_result
96 |
97 | logger.info(f"Submitted batch of {len(task_ids)} tasks")
98 | return task_ids
99 |
100 | def _execute_task(self, task: ParallelTask) -> TaskResult:
101 | """Execute a single task with timeout and error handling."""
102 | start_time = time.time()
103 |
104 | try:
105 | logger.debug(f"Executing task {task.task_id}")
106 |
107 | # Execute the function with timeout
108 | result_data = task.function(*task.args, **task.kwargs)
109 |
110 | execution_time = time.time() - start_time
111 |
112 | result = TaskResult(
113 | task_id=task.task_id,
114 | service=task.service,
115 | operation=task.operation,
116 | status='success',
117 | data=result_data,
118 | execution_time=execution_time
119 | )
120 |
121 | logger.debug(f"Task {task.task_id} completed in {execution_time:.2f}s")
122 |
123 | except Exception as e:
124 | execution_time = time.time() - start_time
125 | error_msg = str(e)
126 |
127 | result = TaskResult(
128 | task_id=task.task_id,
129 | service=task.service,
130 | operation=task.operation,
131 | status='error',
132 | error=error_msg,
133 | execution_time=execution_time
134 | )
135 |
136 | logger.error(f"Task {task.task_id} failed after {execution_time:.2f}s: {error_msg}")
137 |
138 | # Store result
139 | with self._lock:
140 | self._results[task.task_id] = result
141 | if task.task_id in self._active_tasks:
142 | del self._active_tasks[task.task_id]
143 |
144 | return result
145 |
146 | def wait_for_tasks(self, task_ids: List[str], timeout: Optional[float] = None) -> Dict[str, TaskResult]:
147 | """Wait for specific tasks to complete."""
148 | if timeout is None:
149 | timeout = self.default_timeout
150 |
151 | results = {}
152 | remaining_tasks = set(task_ids)
153 | start_time = time.time()
154 |
155 | while remaining_tasks and (time.time() - start_time) < timeout:
156 | completed_tasks = set()
157 |
158 | with self._lock:
159 | for task_id in remaining_tasks:
160 | if task_id in self._results:
161 | results[task_id] = self._results[task_id]
162 | completed_tasks.add(task_id)
163 | elif task_id not in self._active_tasks:
164 | # Task not found, create error result
165 | error_result = TaskResult(
166 | task_id=task_id,
167 | service='unknown',
168 | operation='unknown',
169 | status='error',
170 | error='Task not found'
171 | )
172 | results[task_id] = error_result
173 | completed_tasks.add(task_id)
174 |
175 | remaining_tasks -= completed_tasks
176 |
177 | if remaining_tasks:
178 | time.sleep(0.1) # Small delay to avoid busy waiting
179 |
180 | # Handle timeout for remaining tasks
181 | for task_id in remaining_tasks:
182 | timeout_result = TaskResult(
183 | task_id=task_id,
184 | service='unknown',
185 | operation='unknown',
186 | status='timeout',
187 | error=f'Task timed out after {timeout}s'
188 | )
189 | results[task_id] = timeout_result
190 |
191 | logger.info(f"Completed waiting for {len(task_ids)} tasks, {len(results)} results")
192 | return results
193 |
194 | def wait_for_all(self, timeout: Optional[float] = None) -> Dict[str, TaskResult]:
195 | """Wait for all active tasks to complete."""
196 | with self._lock:
197 | active_task_ids = list(self._active_tasks.keys())
198 |
199 | if not active_task_ids:
200 | return {}
201 |
202 | return self.wait_for_tasks(active_task_ids, timeout)
203 |
204 | def get_result(self, task_id: str) -> Optional[TaskResult]:
205 | """Get result for a specific task."""
206 | with self._lock:
207 | return self._results.get(task_id)
208 |
209 | def get_all_results(self) -> Dict[str, TaskResult]:
210 | """Get all available results."""
211 | with self._lock:
212 | return self._results.copy()
213 |
214 | def cancel_task(self, task_id: str) -> bool:
215 | """Cancel a running task."""
216 | with self._lock:
217 | if task_id in self._active_tasks:
218 | future = self._active_tasks[task_id]
219 | cancelled = future.cancel()
220 | if cancelled:
221 | del self._active_tasks[task_id]
222 | # Create cancelled result
223 | cancel_result = TaskResult(
224 | task_id=task_id,
225 | service='unknown',
226 | operation='unknown',
227 | status='error',
228 | error='Task cancelled'
229 | )
230 | self._results[task_id] = cancel_result
231 | return cancelled
232 | return False
233 |
234 | def get_status(self) -> Dict[str, Any]:
235 | """Get executor status information."""
236 | with self._lock:
237 | active_count = len(self._active_tasks)
238 | completed_count = len(self._results)
239 |
240 | # Count results by status
241 | status_counts = {}
242 | for result in self._results.values():
243 | status_counts[result.status] = status_counts.get(result.status, 0) + 1
244 |
245 | return {
246 | 'max_workers': self.max_workers,
247 | 'active_tasks': active_count,
248 | 'completed_tasks': completed_count,
249 | 'status_breakdown': status_counts,
250 | 'executor_alive': not self.executor._shutdown
251 | }
252 |
253 | def clear_results(self, older_than_minutes: int = 60):
254 | """Clear old results to free memory."""
255 | cutoff_time = datetime.now().timestamp() - (older_than_minutes * 60)
256 |
257 | with self._lock:
258 | old_task_ids = []
259 | for task_id, result in self._results.items():
260 | if result.timestamp.timestamp() < cutoff_time:
261 | old_task_ids.append(task_id)
262 |
263 | for task_id in old_task_ids:
264 | del self._results[task_id]
265 |
266 | logger.info(f"Cleared {len(old_task_ids)} old results")
267 |
268 | def shutdown(self, wait: bool = True):
269 | """Shutdown the executor and clean up resources."""
270 | logger.info("Shutting down ParallelExecutor")
271 |
272 | with self._lock:
273 | # Cancel all active tasks
274 | for task_id, future in self._active_tasks.items():
275 | future.cancel()
276 | self._active_tasks.clear()
277 |
278 | # Shutdown executor
279 | self.executor.shutdown(wait=wait)
280 | logger.info("ParallelExecutor shutdown complete")
281 |
282 | # Global executor instance
283 | _parallel_executor = None
284 |
285 | def get_parallel_executor() -> ParallelExecutor:
286 | """Get the global parallel executor instance."""
287 | global _parallel_executor
288 | if _parallel_executor is None:
289 | _parallel_executor = ParallelExecutor()
290 | return _parallel_executor
291 |
292 | def create_task(task_id: str, service: str, operation: str, function: Callable,
293 | args: tuple = (), kwargs: Dict[str, Any] = None,
294 | timeout: float = 30.0, priority: int = 1) -> ParallelTask:
295 | """Helper function to create a ParallelTask."""
296 | return ParallelTask(
297 | task_id=task_id,
298 | service=service,
299 | operation=operation,
300 | function=function,
301 | args=args,
302 | kwargs=kwargs or {},
303 | timeout=timeout,
304 | priority=priority
305 | )
```
--------------------------------------------------------------------------------
/utils/logging_config.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Centralized logging configuration for CFM Tips MCP Server
3 |
4 | """
5 |
6 | import logging
7 | import sys
8 | import os
9 | import json
10 | import tempfile
11 | from datetime import datetime
12 | from typing import Dict, Any, Optional, List
13 |
14 |
15 | class StructuredFormatter(logging.Formatter):
16 | """Custom formatter for structured logging with JSON output."""
17 |
18 | def format(self, record):
19 | """Format log record as structured JSON."""
20 | log_entry = {
21 | 'timestamp': datetime.fromtimestamp(record.created).isoformat(),
22 | 'level': record.levelname,
23 | 'logger': record.name,
24 | 'module': record.module,
25 | 'function': record.funcName,
26 | 'line': record.lineno,
27 | 'message': record.getMessage(),
28 | 'thread': record.thread,
29 | 'thread_name': record.threadName
30 | }
31 |
32 | # Add exception information if present
33 | if record.exc_info:
34 | log_entry['exception'] = self.formatException(record.exc_info)
35 |
36 | # Add extra fields from record
37 | for key, value in record.__dict__.items():
38 | if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname',
39 | 'filename', 'module', 'lineno', 'funcName', 'created',
40 | 'msecs', 'relativeCreated', 'thread', 'threadName',
41 | 'processName', 'process', 'getMessage', 'exc_info',
42 | 'exc_text', 'stack_info']:
43 | log_entry[key] = value
44 |
45 | return json.dumps(log_entry)
46 |
47 |
48 | class StandardFormatter(logging.Formatter):
49 | """Enhanced standard formatter with more context."""
50 |
51 | def __init__(self):
52 | super().__init__(
53 | '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
54 | )
55 |
56 |
57 | def setup_logging(structured: bool = False, log_level: str = "INFO"):
58 | """
59 | Configure comprehensive logging for the application.
60 |
61 | Args:
62 | structured: Whether to use structured JSON logging
63 | log_level: Logging level (DEBUG, INFO, WARNING, ERROR)
64 | """
65 |
66 | # Create appropriate formatter
67 | if structured:
68 | formatter = StructuredFormatter()
69 | else:
70 | formatter = StandardFormatter()
71 |
72 | # Configure root logger
73 | root_logger = logging.getLogger()
74 | root_logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
75 |
76 | # Remove existing handlers
77 | for handler in root_logger.handlers[:]:
78 | root_logger.removeHandler(handler)
79 |
80 | # Add file handlers
81 | try:
82 | # Try to create logs directory if it doesn't exist
83 | log_dir = 'logs'
84 | if not os.path.exists(log_dir):
85 | os.makedirs(log_dir, exist_ok=True)
86 |
87 | # Try main log file in logs directory first
88 | log_file = os.path.join(log_dir, 'cfm_tips_mcp.log')
89 | file_handler = logging.FileHandler(log_file)
90 | file_handler.setLevel(logging.INFO)
91 | file_handler.setFormatter(formatter)
92 | root_logger.addHandler(file_handler)
93 |
94 | # Try error log file
95 | error_file = os.path.join(log_dir, 'cfm_tips_mcp_errors.log')
96 | error_handler = logging.FileHandler(error_file)
97 | error_handler.setLevel(logging.ERROR)
98 | error_handler.setFormatter(formatter)
99 | root_logger.addHandler(error_handler)
100 |
101 | except (OSError, PermissionError) as e:
102 | # If we can't write to logs directory, try current directory
103 | try:
104 | file_handler = logging.FileHandler('cfm_tips_mcp.log')
105 | file_handler.setLevel(logging.INFO)
106 | file_handler.setFormatter(formatter)
107 | root_logger.addHandler(file_handler)
108 |
109 | error_handler = logging.FileHandler('cfm_tips_mcp_errors.log')
110 | error_handler.setLevel(logging.ERROR)
111 | error_handler.setFormatter(formatter)
112 | root_logger.addHandler(error_handler)
113 |
114 | except (OSError, PermissionError):
115 | # If we can't write anywhere, try temp directory
116 | try:
117 | temp_dir = tempfile.gettempdir()
118 | temp_log = os.path.join(temp_dir, 'cfm_tips_mcp.log')
119 | file_handler = logging.FileHandler(temp_log)
120 | file_handler.setLevel(logging.INFO)
121 | file_handler.setFormatter(formatter)
122 | root_logger.addHandler(file_handler)
123 |
124 | temp_error = os.path.join(temp_dir, 'cfm_tips_mcp_errors.log')
125 | error_handler = logging.FileHandler(temp_error)
126 | error_handler.setLevel(logging.ERROR)
127 | error_handler.setFormatter(formatter)
128 | root_logger.addHandler(error_handler)
129 |
130 | # Log where we're writing files
131 | print(f"Warning: Using temp directory for logs: {temp_dir}")
132 |
133 | except (OSError, PermissionError):
134 | # If all else fails, raise error since we need file logging
135 | raise RuntimeError("Could not create log files in any location")
136 |
137 | return logging.getLogger(__name__)
138 |
139 | def log_function_entry(logger, func_name, **kwargs):
140 | """Log function entry with parameters."""
141 | logger.info(f"Entering {func_name} with params: {kwargs}")
142 |
143 | def log_function_exit(logger, func_name, result_status=None, execution_time=None):
144 | """Log function exit with results."""
145 | msg = f"Exiting {func_name}"
146 | if result_status:
147 | msg += f" - Status: {result_status}"
148 | if execution_time:
149 | msg += f" - Time: {execution_time:.2f}s"
150 | logger.info(msg)
151 |
152 | def log_aws_api_call(logger, service, operation, **params):
153 | """Log AWS API calls."""
154 | logger.info(f"AWS API Call: {service}.{operation} with params: {params}")
155 |
156 | def log_aws_api_error(logger, service, operation, error):
157 | """Log AWS API errors."""
158 | logger.error(f"AWS API Error: {service}.{operation} - {str(error)}")
159 |
160 |
161 | def create_structured_logger(name: str, extra_fields: Optional[Dict[str, Any]] = None) -> logging.Logger:
162 | """
163 | Create a logger with structured logging capabilities.
164 |
165 | Args:
166 | name: Logger name
167 | extra_fields: Additional fields to include in all log messages
168 |
169 | Returns:
170 | Configured logger instance
171 | """
172 | logger = logging.getLogger(name)
173 |
174 | if extra_fields:
175 | # Create adapter to add extra fields
176 | logger = logging.LoggerAdapter(logger, extra_fields)
177 |
178 | return logger
179 |
180 |
181 | def log_s3_operation(logger, operation: str, bucket_name: Optional[str] = None,
182 | object_key: Optional[str] = None, **kwargs):
183 | """
184 | Log S3 operations with structured data.
185 |
186 | Args:
187 | logger: Logger instance
188 | operation: S3 operation name
189 | bucket_name: S3 bucket name
190 | object_key: S3 object key
191 | **kwargs: Additional operation parameters
192 | """
193 | log_data = {
194 | 'operation_type': 's3_operation',
195 | 'operation': operation,
196 | 'bucket_name': bucket_name,
197 | 'object_key': object_key
198 | }
199 | log_data.update(kwargs)
200 |
201 | logger.info(f"S3 Operation: {operation}", extra=log_data)
202 |
203 |
204 | def log_analysis_start(logger, analysis_type: str, session_id: Optional[str] = None, **kwargs):
205 | """
206 | Log analysis start with structured data.
207 |
208 | Args:
209 | logger: Logger instance
210 | analysis_type: Type of analysis
211 | session_id: Session identifier
212 | **kwargs: Additional analysis parameters
213 | """
214 | log_data = {
215 | 'event_type': 'analysis_start',
216 | 'analysis_type': analysis_type,
217 | 'session_id': session_id
218 | }
219 | log_data.update(kwargs)
220 |
221 | logger.info(f"Starting analysis: {analysis_type}", extra=log_data)
222 |
223 |
224 | def log_analysis_complete(logger, analysis_type: str, status: str, execution_time: float,
225 | session_id: Optional[str] = None, **kwargs):
226 | """
227 | Log analysis completion with structured data.
228 |
229 | Args:
230 | logger: Logger instance
231 | analysis_type: Type of analysis
232 | status: Analysis status
233 | execution_time: Execution time in seconds
234 | session_id: Session identifier
235 | **kwargs: Additional analysis results
236 | """
237 | log_data = {
238 | 'event_type': 'analysis_complete',
239 | 'analysis_type': analysis_type,
240 | 'status': status,
241 | 'execution_time': execution_time,
242 | 'session_id': session_id
243 | }
244 | log_data.update(kwargs)
245 |
246 | logger.info(f"Completed analysis: {analysis_type} - Status: {status}", extra=log_data)
247 |
248 |
249 | def log_cost_optimization_finding(logger, finding_type: str, resource_id: str,
250 | potential_savings: Optional[float] = None, **kwargs):
251 | """
252 | Log cost optimization findings with structured data.
253 |
254 | Args:
255 | logger: Logger instance
256 | finding_type: Type of optimization finding
257 | resource_id: Resource identifier
258 | potential_savings: Estimated cost savings
259 | **kwargs: Additional finding details
260 | """
261 | log_data = {
262 | 'event_type': 'cost_optimization_finding',
263 | 'finding_type': finding_type,
264 | 'resource_id': resource_id,
265 | 'potential_savings': potential_savings
266 | }
267 | log_data.update(kwargs)
268 |
269 | logger.info(f"Cost optimization finding: {finding_type} for {resource_id}", extra=log_data)
270 |
271 |
272 | def log_session_operation(logger, operation: str, session_id: str, **kwargs):
273 | """
274 | Log session operations with structured data.
275 |
276 | Args:
277 | logger: Logger instance
278 | operation: Session operation
279 | session_id: Session identifier
280 | **kwargs: Additional operation details
281 | """
282 | log_data = {
283 | 'event_type': 'session_operation',
284 | 'operation': operation,
285 | 'session_id': session_id
286 | }
287 | log_data.update(kwargs)
288 |
289 | logger.info(f"Session operation: {operation} for session {session_id}", extra=log_data)
290 |
291 |
292 | def log_cloudwatch_operation(logger, operation: str, component: Optional[str] = None,
293 | cost_incurred: bool = False, **kwargs):
294 | """
295 | Log CloudWatch operations with structured data and cost tracking.
296 |
297 | Args:
298 | logger: Logger instance
299 | operation: CloudWatch operation name
300 | component: CloudWatch component (logs, metrics, alarms, dashboards)
301 | cost_incurred: Whether the operation incurred costs
302 | **kwargs: Additional operation parameters
303 | """
304 | log_data = {
305 | 'operation_type': 'cloudwatch_operation',
306 | 'operation': operation,
307 | 'component': component,
308 | 'cost_incurred': cost_incurred
309 | }
310 | log_data.update(kwargs)
311 |
312 | if cost_incurred:
313 | logger.warning(f"CloudWatch Operation (COST INCURRED): {operation}", extra=log_data)
314 | else:
315 | logger.info(f"CloudWatch Operation: {operation}", extra=log_data)
316 |
317 |
318 | # CloudWatch-specific logging methods consolidated into log_cloudwatch_operation
319 | # These specialized methods have been removed in favor of the generic log_cloudwatch_operation method
320 |
321 |
322 | # Removed setup_cloudwatch_logging - use setup_logging instead with log_cloudwatch_operation for CloudWatch-specific events
```