#
tokens: 47285/50000 18/161 files (page 2/19)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 2 of 19. Use http://codebase.md/aws-samples/sample-cfm-tips-mcp?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .gitignore
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── diagnose_cost_optimization_hub_v2.py
├── LICENSE
├── logging_config.py
├── mcp_runbooks.json
├── mcp_server_with_runbooks.py
├── playbooks
│   ├── __init__.py
│   ├── aws_lambda
│   │   ├── __init__.py
│   │   └── lambda_optimization.py
│   ├── cloudtrail
│   │   ├── __init__.py
│   │   └── cloudtrail_optimization.py
│   ├── cloudtrail_optimization.py
│   ├── cloudwatch
│   │   ├── __init__.py
│   │   ├── aggregation_queries.py
│   │   ├── alarms_and_dashboards_analyzer.py
│   │   ├── analysis_engine.py
│   │   ├── base_analyzer.py
│   │   ├── cloudwatch_optimization_analyzer.py
│   │   ├── cloudwatch_optimization_tool.py
│   │   ├── cloudwatch_optimization.py
│   │   ├── cost_controller.py
│   │   ├── general_spend_analyzer.py
│   │   ├── logs_optimization_analyzer.py
│   │   ├── metrics_optimization_analyzer.py
│   │   ├── optimization_orchestrator.py
│   │   └── result_processor.py
│   ├── comprehensive_optimization.py
│   ├── ebs
│   │   ├── __init__.py
│   │   └── ebs_optimization.py
│   ├── ebs_optimization.py
│   ├── ec2
│   │   ├── __init__.py
│   │   └── ec2_optimization.py
│   ├── ec2_optimization.py
│   ├── lambda_optimization.py
│   ├── rds
│   │   ├── __init__.py
│   │   └── rds_optimization.py
│   ├── rds_optimization.py
│   └── s3
│       ├── __init__.py
│       ├── analyzers
│       │   ├── __init__.py
│       │   ├── api_cost_analyzer.py
│       │   ├── archive_optimization_analyzer.py
│       │   ├── general_spend_analyzer.py
│       │   ├── governance_analyzer.py
│       │   ├── multipart_cleanup_analyzer.py
│       │   └── storage_class_analyzer.py
│       ├── base_analyzer.py
│       ├── s3_aggregation_queries.py
│       ├── s3_analysis_engine.py
│       ├── s3_comprehensive_optimization_tool.py
│       ├── s3_optimization_orchestrator.py
│       └── s3_optimization.py
├── README.md
├── requirements.txt
├── runbook_functions_extended.py
├── runbook_functions.py
├── RUNBOOKS_GUIDE.md
├── services
│   ├── __init__.py
│   ├── cloudwatch_pricing.py
│   ├── cloudwatch_service_vended_log.py
│   ├── cloudwatch_service.py
│   ├── compute_optimizer.py
│   ├── cost_explorer.py
│   ├── optimization_hub.py
│   ├── performance_insights.py
│   ├── pricing.py
│   ├── s3_pricing.py
│   ├── s3_service.py
│   ├── storage_lens_service.py
│   └── trusted_advisor.py
├── setup.py
├── test_runbooks.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   ├── cloudwatch
│   │   │   └── test_cloudwatch_integration.py
│   │   ├── test_cloudwatch_comprehensive_tool_integration.py
│   │   ├── test_cloudwatch_orchestrator_integration.py
│   │   ├── test_integration_suite.py
│   │   └── test_orchestrator_integration.py
│   ├── legacy
│   │   ├── example_output_with_docs.py
│   │   ├── example_wellarchitected_output.py
│   │   ├── test_aws_session_management.py
│   │   ├── test_cloudwatch_orchestrator_pagination.py
│   │   ├── test_cloudwatch_pagination_integration.py
│   │   ├── test_cloudwatch_performance_optimizations.py
│   │   ├── test_cloudwatch_result_processor.py
│   │   ├── test_cloudwatch_timeout_issue.py
│   │   ├── test_documentation_links.py
│   │   ├── test_metrics_pagination_count.py
│   │   ├── test_orchestrator_integration.py
│   │   ├── test_pricing_cache_fix_moved.py
│   │   ├── test_pricing_cache_fix.py
│   │   ├── test_runbook_integration.py
│   │   ├── test_runbooks.py
│   │   ├── test_setup_verification.py
│   │   └── test_stack_trace_fix.py
│   ├── performance
│   │   ├── __init__.py
│   │   ├── cloudwatch
│   │   │   └── test_cloudwatch_performance.py
│   │   ├── test_cloudwatch_parallel_execution.py
│   │   ├── test_parallel_execution.py
│   │   └── test_performance_suite.py
│   ├── pytest-cloudwatch.ini
│   ├── pytest.ini
│   ├── README.md
│   ├── requirements-test.txt
│   ├── run_cloudwatch_tests.py
│   ├── run_tests.py
│   ├── test_setup_verification.py
│   ├── test_suite_main.py
│   └── unit
│       ├── __init__.py
│       ├── analyzers
│       │   ├── __init__.py
│       │   ├── conftest_cloudwatch.py
│       │   ├── test_alarms_and_dashboards_analyzer.py
│       │   ├── test_base_analyzer.py
│       │   ├── test_cloudwatch_base_analyzer.py
│       │   ├── test_cloudwatch_cost_constraints.py
│       │   ├── test_cloudwatch_general_spend_analyzer.py
│       │   ├── test_general_spend_analyzer.py
│       │   ├── test_logs_optimization_analyzer.py
│       │   └── test_metrics_optimization_analyzer.py
│       ├── cloudwatch
│       │   ├── test_cache_control.py
│       │   ├── test_cloudwatch_api_mocking.py
│       │   ├── test_cloudwatch_metrics_pagination.py
│       │   ├── test_cloudwatch_pagination_architecture.py
│       │   ├── test_cloudwatch_pagination_comprehensive_fixed.py
│       │   ├── test_cloudwatch_pagination_comprehensive.py
│       │   ├── test_cloudwatch_pagination_fixed.py
│       │   ├── test_cloudwatch_pagination_real_format.py
│       │   ├── test_cloudwatch_pagination_simple.py
│       │   ├── test_cloudwatch_query_pagination.py
│       │   ├── test_cloudwatch_unit_suite.py
│       │   ├── test_general_spend_tips_refactor.py
│       │   ├── test_import_error.py
│       │   ├── test_mcp_pagination_bug.py
│       │   └── test_mcp_surface_pagination.py
│       ├── s3
│       │   └── live
│       │       ├── test_bucket_listing.py
│       │       ├── test_s3_governance_bucket_discovery.py
│       │       └── test_top_buckets.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── test_cloudwatch_cost_controller.py
│       │   ├── test_cloudwatch_query_service.py
│       │   ├── test_cloudwatch_service.py
│       │   ├── test_cost_control_routing.py
│       │   └── test_s3_service.py
│       └── test_unit_suite.py
└── utils
    ├── __init__.py
    ├── aws_client_factory.py
    ├── cache_decorator.py
    ├── cleanup_manager.py
    ├── cloudwatch_cache.py
    ├── documentation_links.py
    ├── error_handler.py
    ├── intelligent_cache.py
    ├── logging_config.py
    ├── memory_manager.py
    ├── parallel_executor.py
    ├── performance_monitor.py
    ├── progressive_timeout.py
    ├── service_orchestrator.py
    └── session_manager.py
```

# Files

--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_cloudwatch_metrics_pagination.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Unit tests for CloudWatch metrics pagination functionality.
  4 | """
  5 | 
  6 | import pytest
  7 | import sys
  8 | import os
  9 | from unittest.mock import patch, MagicMock
 10 | 
 11 | # Add the project root to the path
 12 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))
 13 | 
 14 | from playbooks.cloudwatch.result_processor import CloudWatchResultProcessor
 15 | 
 16 | 
 17 | class TestCloudWatchMetricsPagination:
 18 |     """Unit tests for CloudWatch metrics pagination functionality."""
 19 |     
 20 |     def test_metrics_pagination_with_large_dataset(self):
 21 |         """Test that metrics pagination correctly limits results to 10 items per page."""
 22 |         processor = CloudWatchResultProcessor()
 23 |         
 24 |         # Create 25 metrics to test pagination (should result in 3 pages)
 25 |         metrics = []
 26 |         for i in range(25):
 27 |             metric = {
 28 |                 'MetricName': f'CustomMetric{i:02d}',
 29 |                 'Namespace': f'MyApp/Service{i % 3}',  # Mix of namespaces
 30 |                 'Dimensions': [
 31 |                     {'Name': 'InstanceId', 'Value': f'i-{i:010d}'},
 32 |                     {'Name': 'Environment', 'Value': 'production' if i % 2 == 0 else 'staging'}
 33 |                 ]
 34 |             }
 35 |             metrics.append(metric)
 36 |         
 37 |         # Test page 1 - should have exactly 10 items
 38 |         result_p1 = processor.process_metrics_results(metrics, page=1)
 39 |         
 40 |         assert len(result_p1['items']) == 10, f"Page 1 should have exactly 10 items, got {len(result_p1['items'])}"
 41 |         assert result_p1['pagination']['current_page'] == 1
 42 |         assert result_p1['pagination']['total_items'] == 25
 43 |         assert result_p1['pagination']['total_pages'] == 3
 44 |         assert result_p1['pagination']['has_next_page'] is True
 45 |         assert result_p1['pagination']['has_previous_page'] is False
 46 |         
 47 |         # Test page 2 - should have exactly 10 items
 48 |         result_p2 = processor.process_metrics_results(metrics, page=2)
 49 |         
 50 |         assert len(result_p2['items']) == 10, f"Page 2 should have exactly 10 items, got {len(result_p2['items'])}"
 51 |         assert result_p2['pagination']['current_page'] == 2
 52 |         assert result_p2['pagination']['has_next_page'] is True
 53 |         assert result_p2['pagination']['has_previous_page'] is True
 54 |         
 55 |         # Test page 3 - should have exactly 5 items (remainder)
 56 |         result_p3 = processor.process_metrics_results(metrics, page=3)
 57 |         
 58 |         assert len(result_p3['items']) == 5, f"Page 3 should have exactly 5 items, got {len(result_p3['items'])}"
 59 |         assert result_p3['pagination']['current_page'] == 3
 60 |         assert result_p3['pagination']['has_next_page'] is False
 61 |         assert result_p3['pagination']['has_previous_page'] is True
 62 |         
 63 |         # Verify dimensions are preserved (not truncated)
 64 |         for item in result_p1['items']:
 65 |             assert 'Dimensions' in item, "Dimensions should be preserved"
 66 |             assert len(item['Dimensions']) == 2, "All dimensions should be preserved"
 67 |     
 68 |     def test_metrics_cost_sorting(self):
 69 |         """Test that metrics are sorted by cost (custom metrics first)."""
 70 |         processor = CloudWatchResultProcessor()
 71 |         
 72 |         # Create mix of AWS and custom metrics
 73 |         metrics = [
 74 |             {'MetricName': 'CPUUtilization', 'Namespace': 'AWS/EC2'},  # Free
 75 |             {'MetricName': 'CustomMetric1', 'Namespace': 'MyApp/Performance'},  # $0.30
 76 |             {'MetricName': 'NetworkIn', 'Namespace': 'AWS/EC2'},  # Free
 77 |             {'MetricName': 'CustomMetric2', 'Namespace': 'MyApp/Business'},  # $0.30
 78 |         ]
 79 |         
 80 |         result = processor.process_metrics_results(metrics, page=1)
 81 |         items = result['items']
 82 |         
 83 |         # Custom metrics should be first (higher cost)
 84 |         assert items[0]['Namespace'] in ['MyApp/Performance', 'MyApp/Business']
 85 |         assert items[0]['estimated_monthly_cost'] == 0.30
 86 |         assert items[1]['Namespace'] in ['MyApp/Performance', 'MyApp/Business']
 87 |         assert items[1]['estimated_monthly_cost'] == 0.30
 88 |         
 89 |         # AWS metrics should be last (free)
 90 |         assert items[2]['Namespace'] == 'AWS/EC2'
 91 |         assert items[2]['estimated_monthly_cost'] == 0.0
 92 |         assert items[3]['Namespace'] == 'AWS/EC2'
 93 |         assert items[3]['estimated_monthly_cost'] == 0.0
 94 |     
 95 |     def test_metrics_dimensions_preservation(self):
 96 |         """Test that metric dimensions are fully preserved, not truncated."""
 97 |         processor = CloudWatchResultProcessor()
 98 |         
 99 |         # Create metric with many dimensions
100 |         metrics = [{
101 |             'MetricName': 'ComplexMetric',
102 |             'Namespace': 'MyApp/Complex',
103 |             'Dimensions': [
104 |                 {'Name': 'InstanceId', 'Value': 'i-1234567890abcdef0'},
105 |                 {'Name': 'Environment', 'Value': 'production'},
106 |                 {'Name': 'Service', 'Value': 'web-server'},
107 |                 {'Name': 'Region', 'Value': 'us-east-1'},
108 |                 {'Name': 'AZ', 'Value': 'us-east-1a'},
109 |                 {'Name': 'Version', 'Value': 'v2.1.3'},
110 |                 {'Name': 'Team', 'Value': 'platform-engineering'},
111 |             ]
112 |         }]
113 |         
114 |         result = processor.process_metrics_results(metrics, page=1)
115 |         item = result['items'][0]
116 |         
117 |         # All dimensions should be preserved
118 |         assert len(item['Dimensions']) == 7, f"Expected 7 dimensions, got {len(item['Dimensions'])}"
119 |         
120 |         # Verify specific dimensions are present
121 |         dimension_names = [d['Name'] for d in item['Dimensions']]
122 |         expected_names = ['InstanceId', 'Environment', 'Service', 'Region', 'AZ', 'Version', 'Team']
123 |         
124 |         for expected_name in expected_names:
125 |             assert expected_name in dimension_names, f"Dimension {expected_name} should be preserved"
126 |     
127 |     def test_empty_metrics_pagination(self):
128 |         """Test pagination with empty metrics list."""
129 |         processor = CloudWatchResultProcessor()
130 |         
131 |         result = processor.process_metrics_results([], page=1)
132 |         
133 |         assert len(result['items']) == 0
134 |         assert result['pagination']['current_page'] == 1
135 |         assert result['pagination']['total_items'] == 0
136 |         assert result['pagination']['total_pages'] == 0
137 |         assert result['pagination']['has_next_page'] is False
138 |         assert result['pagination']['has_previous_page'] is False
139 |     
140 |     def test_single_page_metrics(self):
141 |         """Test pagination with metrics that fit in a single page."""
142 |         processor = CloudWatchResultProcessor()
143 |         
144 |         # Create 5 metrics (less than page size of 10)
145 |         metrics = [
146 |             {'MetricName': f'Metric{i}', 'Namespace': 'MyApp/Test'}
147 |             for i in range(5)
148 |         ]
149 |         
150 |         result = processor.process_metrics_results(metrics, page=1)
151 |         
152 |         assert len(result['items']) == 5
153 |         assert result['pagination']['current_page'] == 1
154 |         assert result['pagination']['total_items'] == 5
155 |         assert result['pagination']['total_pages'] == 1
156 |         assert result['pagination']['has_next_page'] is False
157 |         assert result['pagination']['has_previous_page'] is False
158 | 
159 | 
160 | if __name__ == '__main__':
161 |     pytest.main([__file__, '-v'])
```

--------------------------------------------------------------------------------
/tests/legacy/test_runbooks.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Test script for CFM Tips AWS Cost Optimization MCP Server
  4 | """
  5 | 
  6 | import sys
  7 | import os
  8 | 
  9 | # Add current directory to path
 10 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
 11 | 
 12 | def test_imports():
 13 |     """Test that all imports work correctly."""
 14 |     print("Testing imports...")
 15 |     
 16 |     try:
 17 |         # Test MCP server imports
 18 |         from mcp.server import Server
 19 |         from mcp.server.stdio import stdio_server
 20 |         from mcp.types import Tool, TextContent
 21 |         print("✅ MCP imports successful")
 22 |         
 23 |         # Test AWS imports
 24 |         import boto3
 25 |         from botocore.exceptions import ClientError, NoCredentialsError
 26 |         print("✅ AWS imports successful")
 27 |         
 28 |         # Test runbook functions import
 29 |         # Import functions directly from playbooks
 30 |         from playbooks.ec2.ec2_optimization import (
 31 |             run_ec2_right_sizing_analysis,
 32 |             generate_ec2_right_sizing_report
 33 |         )
 34 |         from playbooks.ebs.ebs_optimization import (
 35 |             run_ebs_optimization_analysis,
 36 |             identify_unused_ebs_volumes,
 37 |             generate_ebs_optimization_report_mcp as generate_ebs_optimization_report
 38 |         )
 39 |         from playbooks.rds.rds_optimization import (
 40 |             run_rds_optimization_analysis,
 41 |             identify_idle_rds_instances_wrapper as identify_idle_rds_instances,
 42 |             generate_rds_optimization_report
 43 |         )
 44 |         from playbooks.aws_lambda.lambda_optimization import (
 45 |             run_lambda_optimization_analysis,
 46 |             identify_unused_lambda_functions_mcp as identify_unused_lambda_functions,
 47 |             generate_lambda_optimization_report
 48 |         )
 49 |         from playbooks.comprehensive_optimization import run_comprehensive_cost_analysis
 50 |         from playbooks.cloudtrail.cloudtrail_optimization import (
 51 |             get_management_trails_mcp as get_management_trails,
 52 |             run_cloudtrail_trails_analysis_mcp as run_cloudtrail_trails_analysis,
 53 |             generate_cloudtrail_report_mcp as generate_cloudtrail_report
 54 |         )
 55 |         print("✅ Runbook functions import successful")
 56 |         
 57 |         return True
 58 |         
 59 |     except ImportError as e:
 60 |         print(f"❌ Import error: {e}")
 61 |         return False
 62 |     except Exception as e:
 63 |         print(f"❌ Unexpected error: {e}")
 64 |         return False
 65 | 
 66 | def test_server_creation():
 67 |     """Test that the MCP server can be created."""
 68 |     print("\nTesting server creation...")
 69 |     
 70 |     try:
 71 |         # Import the server module
 72 |         import mcp_server_with_runbooks
 73 |         print("✅ Server module imported successfully")
 74 |         
 75 |         # Check if server is created
 76 |         if hasattr(mcp_server_with_runbooks, 'server'):
 77 |             print("✅ Server object created successfully")
 78 |             
 79 |             # Check server name
 80 |             if mcp_server_with_runbooks.server.name == "cfm_tips":
 81 |                 print("✅ Server name is correct: cfm_tips")
 82 |             else:
 83 |                 print(f"⚠️  Server name: {mcp_server_with_runbooks.server.name}")
 84 |             
 85 |             return True
 86 |         else:
 87 |             print("❌ Server object not found")
 88 |             return False
 89 |             
 90 |     except Exception as e:
 91 |         print(f"❌ Server creation error: {str(e)}")
 92 |         return False
 93 | 
 94 | def test_cloudtrail_functions():
 95 |     """Test CloudTrail optimization functions."""
 96 |     print("\nTesting CloudTrail functions...")
 97 |     
 98 |     try:
 99 |         from playbooks.cloudtrail.cloudtrail_optimization import (
100 |             get_management_trails_mcp as get_management_trails,
101 |             run_cloudtrail_trails_analysis_mcp as run_cloudtrail_trails_analysis,
102 |             generate_cloudtrail_report_mcp as generate_cloudtrail_report
103 |         )
104 |         print("✅ CloudTrail functions imported successfully")
105 |         
106 |         # Test function signatures
107 |         import inspect
108 |         
109 |         # Check get_management_trails
110 |         sig = inspect.signature(get_management_trails)
111 |         if 'arguments' in sig.parameters:
112 |             print("✅ get_management_trails has correct signature")
113 |         else:
114 |             print("❌ get_management_trails signature incorrect")
115 |             return False
116 |             
117 |         # Check run_cloudtrail_trails_analysis
118 |         sig = inspect.signature(run_cloudtrail_trails_analysis)
119 |         if 'arguments' in sig.parameters:
120 |             print("✅ run_cloudtrail_trails_analysis has correct signature")
121 |         else:
122 |             print("❌ run_cloudtrail_trails_analysis signature incorrect")
123 |             return False
124 |             
125 |         # Check generate_cloudtrail_report
126 |         sig = inspect.signature(generate_cloudtrail_report)
127 |         if 'arguments' in sig.parameters:
128 |             print("✅ generate_cloudtrail_report has correct signature")
129 |         else:
130 |             print("❌ generate_cloudtrail_report signature incorrect")
131 |             return False
132 |             
133 |         return True
134 |         
135 |     except ImportError as e:
136 |         print(f"❌ CloudTrail import error: {e}")
137 |         return False
138 |     except Exception as e:
139 |         print(f"❌ CloudTrail test error: {e}")
140 |         return False
141 | 
142 | def test_tool_names():
143 |     """Test that tool names are within MCP limits."""
144 |     print("\nTesting tool name lengths...")
145 |     
146 |     server_name = "cfm_tips"
147 |     sample_tools = [
148 |         "ec2_rightsizing",
149 |         "ebs_optimization", 
150 |         "rds_idle",
151 |         "lambda_unused",
152 |         "comprehensive_analysis",
153 |         "get_coh_recommendations",
154 |         "cloudtrail_optimization"
155 |     ]
156 |     
157 |     max_length = 0
158 |     for tool in sample_tools:
159 |         combined = f"{server_name}___{tool}"
160 |         length = len(combined)
161 |         max_length = max(max_length, length)
162 |         
163 |         if length > 64:
164 |             print(f"❌ Tool name too long: {combined} ({length} chars)")
165 |             return False
166 |     
167 |     print(f"✅ All tool names within limit (max: {max_length} chars)")
168 |     return True
169 | 
170 | def main():
171 |     """Run all tests."""
172 |     print("CFM Tips AWS Cost Optimization MCP Server - Integration Test")
173 |     print("=" * 65)
174 |     
175 |     tests_passed = 0
176 |     total_tests = 4
177 |     
178 |     # Test imports
179 |     if test_imports():
180 |         tests_passed += 1
181 |     
182 |     # Test server creation
183 |     if test_server_creation():
184 |         tests_passed += 1
185 |     
186 |     # Test CloudTrail functions
187 |     if test_cloudtrail_functions():
188 |         tests_passed += 1
189 |     
190 |     # Test tool names
191 |     if test_tool_names():
192 |         tests_passed += 1
193 |     
194 |     print(f"\n" + "=" * 65)
195 |     print(f"Tests passed: {tests_passed}/{total_tests}")
196 |     
197 |     if tests_passed == total_tests:
198 |         print("✅ All integration tests passed!")
199 |         print("\nNext steps:")
200 |         print("1. Configure AWS credentials: aws configure")
201 |         print("2. Apply the correct IAM permissions (see CORRECTED_PERMISSIONS.md)")
202 |         print("3. Start the server: q chat --mcp-config \"$(pwd)/mcp_runbooks.json\"")
203 |         print("4. Test with: \"Run comprehensive cost analysis for us-east-1\"")
204 |         print("\n🎉 CFM Tips is ready to help optimize your AWS costs!")
205 |         return True
206 |     else:
207 |         print("❌ Some tests failed. Check the errors above.")
208 |         return False
209 | 
210 | if __name__ == "__main__":
211 |     success = main()
212 |     sys.exit(0 if success else 1)
```

--------------------------------------------------------------------------------
/playbooks/comprehensive_optimization.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Comprehensive Cost Optimization Playbook
  3 | 
  4 | This module provides multi-service cost optimization analysis functions.
  5 | Includes both core optimization functions and MCP runbook functions.
  6 | """
  7 | 
  8 | import asyncio
  9 | import json
 10 | import logging
 11 | import time
 12 | from datetime import datetime
 13 | from typing import Dict, List, Any
 14 | from mcp.types import TextContent
 15 | 
 16 | from utils.error_handler import ResponseFormatter, handle_aws_error
 17 | from utils.service_orchestrator import ServiceOrchestrator
 18 | from utils.parallel_executor import create_task
 19 | from utils.documentation_links import add_documentation_links
 20 | 
 21 | # Import playbook modules
 22 | from playbooks.ec2.ec2_optimization import get_underutilized_instances
 23 | from playbooks.ebs.ebs_optimization import get_underutilized_volumes
 24 | from playbooks.rds.rds_optimization import get_underutilized_rds_instances, identify_idle_rds_instances
 25 | from playbooks.aws_lambda.lambda_optimization import get_underutilized_lambda_functions, identify_unused_lambda_functions
 26 | from playbooks.cloudtrail.cloudtrail_optimization import run_cloudtrail_optimization
 27 | from playbooks.cloudwatch.cloudwatch_optimization import run_cloudwatch_comprehensive_optimization_tool_mcp
 28 | 
 29 | logger = logging.getLogger(__name__)
 30 | 
 31 | 
 32 | @handle_aws_error
 33 | async def run_comprehensive_cost_analysis(arguments: Dict[str, Any]) -> List[TextContent]:
 34 |     """Run comprehensive cost analysis across multiple AWS services."""
 35 |     start_time = time.time()
 36 |     
 37 |     try:
 38 |         region = arguments.get("region")
 39 |         services = arguments.get("services", ["ec2", "ebs", "rds", "lambda", "cloudtrail", "s3", "cloudwatch"])
 40 |         lookback_period_days = arguments.get("lookback_period_days", 14)
 41 |         output_format = arguments.get("output_format", "json")
 42 |         
 43 |         # Initialize service orchestrator for parallel execution and session management
 44 |         orchestrator = ServiceOrchestrator()
 45 |         
 46 |         # Define parallel service calls based on requested services
 47 |         service_calls = []
 48 |         
 49 |         if "ec2" in services:
 50 |             service_calls.extend([
 51 |                 {
 52 |                     'service': 'ec2',
 53 |                     'operation': 'underutilized_instances',
 54 |                     'function': get_underutilized_instances,
 55 |                     'args': {
 56 |                         'region': region,
 57 |                         'lookback_period_days': lookback_period_days
 58 |                     }
 59 |                 },
 60 |                 {
 61 |                     'service': 'ec2',
 62 |                     'operation': 'stopped_instances',
 63 |                     'function': lambda **kwargs: {"stopped_instances": []},  # Placeholder
 64 |                     'args': {'region': region}
 65 |                 }
 66 |             ])
 67 |         
 68 |         if "ebs" in services:
 69 |             service_calls.extend([
 70 |                 {
 71 |                     'service': 'ebs',
 72 |                     'operation': 'underutilized_volumes',
 73 |                     'function': get_underutilized_volumes,
 74 |                     'args': {
 75 |                         'region': region,
 76 |                         'lookback_period_days': 30
 77 |                     }
 78 |                 },
 79 |                 {
 80 |                     'service': 'ebs',
 81 |                     'operation': 'unused_volumes',
 82 |                     'function': lambda **kwargs: {"unused_volumes": []},  # Placeholder
 83 |                     'args': {'region': region}
 84 |                 }
 85 |             ])
 86 |         
 87 |         if "rds" in services:
 88 |             service_calls.extend([
 89 |                 {
 90 |                     'service': 'rds',
 91 |                     'operation': 'underutilized_instances',
 92 |                     'function': get_underutilized_rds_instances,
 93 |                     'args': {
 94 |                         'region': region,
 95 |                         'lookback_period_days': lookback_period_days
 96 |                     }
 97 |                 },
 98 |                 {
 99 |                     'service': 'rds',
100 |                     'operation': 'idle_instances',
101 |                     'function': identify_idle_rds_instances,
102 |                     'args': {
103 |                         'region': region,
104 |                         'lookback_period_days': 7
105 |                     }
106 |                 }
107 |             ])
108 |         
109 |         if "lambda" in services:
110 |             service_calls.extend([
111 |                 {
112 |                     'service': 'lambda',
113 |                     'operation': 'underutilized_functions',
114 |                     'function': get_underutilized_lambda_functions,
115 |                     'args': {
116 |                         'region': region,
117 |                         'lookback_period_days': lookback_period_days
118 |                     }
119 |                 },
120 |                 {
121 |                     'service': 'lambda',
122 |                     'operation': 'unused_functions',
123 |                     'function': identify_unused_lambda_functions,
124 |                     'args': {
125 |                         'region': region,
126 |                         'lookback_period_days': 30
127 |                     }
128 |                 }
129 |             ])
130 |         
131 |         if "cloudtrail" in services:
132 |             service_calls.append({
133 |                 'service': 'cloudtrail',
134 |                 'operation': 'optimization',
135 |                 'function': run_cloudtrail_optimization,
136 |                 'args': {'region': region}
137 |             })
138 |         
139 |         if "cloudwatch" in services:
140 |             # CloudWatch uses its own comprehensive optimization tool
141 |             def cloudwatch_wrapper(region=None, lookback_days=30, **kwargs):
142 |                 return {
143 |                     'status': 'success',
144 |                     'service': 'cloudwatch',
145 |                     'message': 'CloudWatch analysis requires separate execution via cloudwatch_comprehensive_optimization_tool',
146 |                     'recommendation': 'Use the dedicated CloudWatch comprehensive optimization tool for detailed analysis',
147 |                     'region': region,
148 |                     'lookback_days': lookback_days,
149 |                     'note': 'CloudWatch has its own advanced parallel execution and memory management system'
150 |                 }
151 |             
152 |             service_calls.append({
153 |                 'service': 'cloudwatch',
154 |                 'operation': 'comprehensive_optimization',
155 |                 'function': cloudwatch_wrapper,
156 |                 'args': {
157 |                     'region': region,
158 |                     'lookback_days': lookback_period_days
159 |                 }
160 |             })
161 |         
162 |         # Execute parallel analysis
163 |         results = orchestrator.execute_parallel_analysis(
164 |             service_calls=service_calls,
165 |             store_results=True,
166 |             timeout=120.0
167 |         )
168 |         
169 |         # Add documentation links
170 |         results = add_documentation_links(results)
171 |         
172 |         execution_time = time.time() - start_time
173 |         
174 |         # Format response with metadata
175 |         results["comprehensive_analysis"] = {
176 |             "analysis_type": "multi_service_comprehensive",
177 |             "services_analyzed": services,
178 |             "region": region,
179 |             "lookback_period_days": lookback_period_days,
180 |             "session_id": results.get("report_metadata", {}).get("session_id"),
181 |             "parallel_execution": True,
182 |             "sql_storage": True
183 |         }
184 |         
185 |         return ResponseFormatter.to_text_content(
186 |             ResponseFormatter.success_response(
187 |                 data=results,
188 |                 message=f"Comprehensive analysis completed for {len(services)} services",
189 |                 analysis_type="comprehensive_analysis",
190 |                 execution_time=execution_time
191 |             )
192 |         )
193 |         
194 |     except Exception as e:
195 |         logger.error(f"Error in comprehensive cost analysis: {str(e)}")
196 |         raise
```

--------------------------------------------------------------------------------
/utils/error_handler.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Centralized Error Handler for AWS Cost Optimization MCP Server
  3 | 
  4 | Provides consistent error handling and formatting across all modules.
  5 | """
  6 | 
  7 | import logging
  8 | from typing import Dict, Any, List, Optional
  9 | from botocore.exceptions import ClientError, NoCredentialsError
 10 | from mcp.types import TextContent
 11 | import json
 12 | 
 13 | logger = logging.getLogger(__name__)
 14 | 
 15 | 
 16 | class AWSErrorHandler:
 17 |     """Centralized AWS error handling and formatting."""
 18 |     
 19 |     # Common AWS error codes and their required permissions
 20 |     PERMISSION_MAP = {
 21 |         'AccessDenied': 'Check IAM permissions for the requested service',
 22 |         'UnauthorizedOperation': 'Verify IAM policy allows the requested operation',
 23 |         'InvalidUserID.NotFound': 'Check AWS credentials configuration',
 24 |         'TokenRefreshRequired': 'AWS credentials may have expired',
 25 |         'OptInRequired': 'Service may need to be enabled in AWS Console',
 26 |         'ServiceUnavailable': 'AWS service temporarily unavailable',
 27 |         'ThrottlingException': 'Request rate exceeded, implement retry logic',
 28 |         'ValidationException': 'Check request parameters and format'
 29 |     }
 30 |     
 31 |     @staticmethod
 32 |     def format_client_error(e: ClientError, context: str, 
 33 |                           required_permissions: Optional[List[str]] = None) -> Dict[str, Any]:
 34 |         """
 35 |         Format AWS ClientError into standardized error response.
 36 |         
 37 |         Args:
 38 |             e: The ClientError exception
 39 |             context: Context where the error occurred
 40 |             required_permissions: List of required IAM permissions
 41 |             
 42 |         Returns:
 43 |             Standardized error response dictionary
 44 |         """
 45 |         error_code = e.response.get('Error', {}).get('Code', 'Unknown')
 46 |         error_message = e.response.get('Error', {}).get('Message', str(e))
 47 |         
 48 |         logger.error(f"AWS API Error in {context}: {error_code} - {error_message}")
 49 |         
 50 |         response = {
 51 |             "status": "error",
 52 |             "error_code": error_code,
 53 |             "message": f"AWS API Error: {error_code} - {error_message}",
 54 |             "context": context,
 55 |             "timestamp": logger.handlers[0].formatter.formatTime(logger.makeRecord(
 56 |                 logger.name, logging.ERROR, __file__, 0, "", (), None
 57 |             )) if logger.handlers else None
 58 |         }
 59 |         
 60 |         # Add permission guidance
 61 |         if required_permissions:
 62 |             response["required_permissions"] = required_permissions
 63 |         elif error_code in AWSErrorHandler.PERMISSION_MAP:
 64 |             response["permission_guidance"] = AWSErrorHandler.PERMISSION_MAP[error_code]
 65 |         
 66 |         # Add retry guidance for throttling
 67 |         if error_code in ['ThrottlingException', 'RequestLimitExceeded']:
 68 |             response["retry_guidance"] = {
 69 |                 "retryable": True,
 70 |                 "suggested_delay": "exponential backoff starting at 1 second"
 71 |             }
 72 |         
 73 |         return response
 74 |     
 75 |     @staticmethod
 76 |     def format_no_credentials_error(context: str) -> Dict[str, Any]:
 77 |         """Format NoCredentialsError into standardized response."""
 78 |         logger.error(f"AWS credentials not found in {context}")
 79 |         
 80 |         return {
 81 |             "status": "error",
 82 |             "error_code": "NoCredentialsError",
 83 |             "message": "AWS credentials not configured",
 84 |             "context": context,
 85 |             "setup_guidance": {
 86 |                 "aws_cli": "Run 'aws configure' to set up credentials",
 87 |                 "environment": "Set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY",
 88 |                 "iam_role": "Ensure EC2 instance has appropriate IAM role attached"
 89 |             }
 90 |         }
 91 |     
 92 |     @staticmethod
 93 |     def format_general_error(e: Exception, context: str) -> Dict[str, Any]:
 94 |         """Format general exceptions into standardized response."""
 95 |         logger.error(f"General error in {context}: {str(e)}")
 96 |         
 97 |         return {
 98 |             "status": "error",
 99 |             "error_code": type(e).__name__,
100 |             "message": str(e),
101 |             "context": context
102 |         }
103 |     
104 |     @staticmethod
105 |     def to_text_content(error_dict: Dict[str, Any]) -> List[TextContent]:
106 |         """Convert error dictionary to MCP TextContent format."""
107 |         return [TextContent(type="text", text=json.dumps(error_dict, indent=2, default=str))]
108 | 
109 | 
110 | class ResponseFormatter:
111 |     """Standardized response formatting for MCP tools."""
112 |     
113 |     @staticmethod
114 |     def success_response(data: Any, message: str, analysis_type: str = None, 
115 |                         execution_time: float = None, metadata: Dict = None) -> Dict[str, Any]:
116 |         """
117 |         Format successful response with consistent structure.
118 |         
119 |         Args:
120 |             data: The response data
121 |             message: Success message
122 |             analysis_type: Type of analysis performed
123 |             execution_time: Execution time in seconds
124 |             metadata: Additional metadata
125 |             
126 |         Returns:
127 |             Standardized success response
128 |         """
129 |         response = {
130 |             "status": "success",
131 |             "data": data,
132 |             "message": message
133 |         }
134 |         
135 |         if analysis_type:
136 |             response["analysis_type"] = analysis_type
137 |         
138 |         if execution_time is not None:
139 |             response["execution_time"] = execution_time
140 |         
141 |         if metadata:
142 |             response["metadata"] = metadata
143 |         
144 |         # Add Well-Architected Framework hint for LLMs
145 |         response["wellarchitected_hint"] = "Analyze these findings to provide AWS Well-Architected Framework Cost Optimization pillar recommendations focusing on right-sizing, eliminating waste, leveraging pricing models, and optimizing over time."
146 |         
147 |         return response
148 |     
149 |     @staticmethod
150 |     def error_response(error: Exception, context: str, 
151 |                       required_permissions: Optional[List[str]] = None) -> Dict[str, Any]:
152 |         """
153 |         Format error response based on exception type.
154 |         
155 |         Args:
156 |             error: The exception that occurred
157 |             context: Context where error occurred
158 |             required_permissions: Required IAM permissions
159 |             
160 |         Returns:
161 |             Standardized error response
162 |         """
163 |         if isinstance(error, ClientError):
164 |             return AWSErrorHandler.format_client_error(error, context, required_permissions)
165 |         elif isinstance(error, NoCredentialsError):
166 |             return AWSErrorHandler.format_no_credentials_error(context)
167 |         else:
168 |             return AWSErrorHandler.format_general_error(error, context)
169 |     
170 |     @staticmethod
171 |     def to_text_content(response_dict: Dict[str, Any]) -> List[TextContent]:
172 |         """Convert response dictionary to MCP TextContent format."""
173 |         return [TextContent(type="text", text=json.dumps(response_dict, indent=2, default=str))]
174 | 
175 | 
176 | # Convenience functions for common use cases
177 | def handle_aws_error(func):
178 |     """Decorator for consistent AWS error handling in MCP tools."""
179 |     def wrapper(*args, **kwargs):
180 |         try:
181 |             return func(*args, **kwargs)
182 |         except ClientError as e:
183 |             context = f"{func.__name__}"
184 |             error_response = AWSErrorHandler.format_client_error(e, context)
185 |             return AWSErrorHandler.to_text_content(error_response)
186 |         except NoCredentialsError:
187 |             context = f"{func.__name__}"
188 |             error_response = AWSErrorHandler.format_no_credentials_error(context)
189 |             return AWSErrorHandler.to_text_content(error_response)
190 |         except Exception as e:
191 |             context = f"{func.__name__}"
192 |             error_response = AWSErrorHandler.format_general_error(e, context)
193 |             return AWSErrorHandler.to_text_content(error_response)
194 |     
195 |     return wrapper
```

--------------------------------------------------------------------------------
/utils/aws_client_factory.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | AWS Client Factory for Centralized Client Management
  3 | 
  4 | Provides consistent AWS client creation with proper error handling and configuration.
  5 | """
  6 | 
  7 | import boto3
  8 | import logging
  9 | from typing import Dict, Any, Optional
 10 | from botocore.exceptions import ClientError, NoCredentialsError
 11 | from botocore.config import Config
 12 | 
 13 | logger = logging.getLogger(__name__)
 14 | 
 15 | 
 16 | class AWSClientFactory:
 17 |     """Centralized AWS client creation and management."""
 18 |     
 19 |     # Default configurations for different services
 20 |     DEFAULT_CONFIGS = {
 21 |         'cost-optimization-hub': {
 22 |             'region_name': 'us-east-1',  # COH is only available in us-east-1
 23 |             'config': Config(
 24 |                 retries={'max_attempts': 3, 'mode': 'adaptive'},
 25 |                 read_timeout=60
 26 |             )
 27 |         },
 28 |         'ce': {  # Cost Explorer
 29 |             'region_name': 'us-east-1',  # CE is only available in us-east-1
 30 |             'config': Config(
 31 |                 retries={'max_attempts': 3, 'mode': 'adaptive'},
 32 |                 read_timeout=60
 33 |             )
 34 |         },
 35 |         'support': {  # Trusted Advisor
 36 |             'region_name': 'us-east-1',  # Support API only in us-east-1
 37 |             'config': Config(
 38 |                 retries={'max_attempts': 3, 'mode': 'adaptive'},
 39 |                 read_timeout=60
 40 |             )
 41 |         },
 42 |         'compute-optimizer': {
 43 |             'config': Config(
 44 |                 retries={'max_attempts': 3, 'mode': 'adaptive'},
 45 |                 read_timeout=60
 46 |             )
 47 |         },
 48 |         'pi': {  # Performance Insights
 49 |             'config': Config(
 50 |                 retries={'max_attempts': 3, 'mode': 'adaptive'},
 51 |                 read_timeout=30
 52 |             )
 53 |         },
 54 |         'default': {
 55 |             'config': Config(
 56 |                 retries={'max_attempts': 3, 'mode': 'adaptive'},
 57 |                 read_timeout=30
 58 |             )
 59 |         }
 60 |     }
 61 |     
 62 |     _clients: Dict[str, Any] = {}  # Client cache
 63 |     
 64 |     @classmethod
 65 |     def get_client(cls, service_name: str, region: Optional[str] = None, 
 66 |                    force_new: bool = False) -> boto3.client:
 67 |         """
 68 |         Get AWS client with proper configuration and caching.
 69 |         
 70 |         Args:
 71 |             service_name: AWS service name (e.g., 'ec2', 's3', 'cost-optimization-hub')
 72 |             region: AWS region (optional, uses service defaults or session default)
 73 |             force_new: Force creation of new client instead of using cache
 74 |             
 75 |         Returns:
 76 |             Configured boto3 client
 77 |             
 78 |         Raises:
 79 |             NoCredentialsError: If AWS credentials are not configured
 80 |             ClientError: If client creation fails
 81 |         """
 82 |         # Create cache key
 83 |         cache_key = f"{service_name}:{region or 'default'}"
 84 |         
 85 |         # Return cached client if available and not forcing new
 86 |         if not force_new and cache_key in cls._clients:
 87 |             logger.debug(f"Using cached client for {service_name}")
 88 |             return cls._clients[cache_key]
 89 |         
 90 |         try:
 91 |             # Get service configuration
 92 |             service_config = cls.DEFAULT_CONFIGS.get(service_name, cls.DEFAULT_CONFIGS['default'])
 93 |             
 94 |             # Prepare client arguments
 95 |             client_args = {
 96 |                 'service_name': service_name,
 97 |                 'config': service_config.get('config')
 98 |             }
 99 |             
100 |             # Set region - priority: parameter > service default > session default
101 |             if region:
102 |                 client_args['region_name'] = region
103 |             elif 'region_name' in service_config:
104 |                 client_args['region_name'] = service_config['region_name']
105 |             
106 |             # Create client
107 |             client = boto3.client(**client_args)
108 |             
109 |             # Cache the client
110 |             cls._clients[cache_key] = client
111 |             
112 |             logger.debug(f"Created new {service_name} client for region {client_args.get('region_name', 'default')}")
113 |             return client
114 |             
115 |         except NoCredentialsError:
116 |             logger.error(f"AWS credentials not configured for {service_name} client")
117 |             raise
118 |         except Exception as e:
119 |             logger.error(f"Failed to create {service_name} client: {str(e)}")
120 |             raise
121 |     
122 |     @classmethod
123 |     def get_session(cls, region: Optional[str] = None) -> boto3.Session:
124 |         """
125 |         Get AWS session with proper configuration.
126 |         
127 |         Args:
128 |             region: AWS region (optional)
129 |             
130 |         Returns:
131 |             Configured boto3 session
132 |         """
133 |         try:
134 |             session_args = {}
135 |             if region:
136 |                 session_args['region_name'] = region
137 |             
138 |             session = boto3.Session(**session_args)
139 |             
140 |             # Verify credentials by making a simple call
141 |             sts_client = session.client('sts')
142 |             sts_client.get_caller_identity()
143 |             
144 |             logger.debug(f"Created AWS session for region {region or 'default'}")
145 |             return session
146 |             
147 |         except NoCredentialsError:
148 |             logger.error("AWS credentials not configured")
149 |             raise
150 |         except Exception as e:
151 |             logger.error(f"Failed to create AWS session: {str(e)}")
152 |             raise
153 |     
154 |     @classmethod
155 |     def clear_cache(cls):
156 |         """Clear the client cache."""
157 |         cls._clients.clear()
158 |         logger.debug("Cleared AWS client cache")
159 |     
160 |     @classmethod
161 |     def get_available_regions(cls, service_name: str) -> list:
162 |         """
163 |         Get available regions for a service.
164 |         
165 |         Args:
166 |             service_name: AWS service name
167 |             
168 |         Returns:
169 |             List of available regions
170 |         """
171 |         try:
172 |             session = boto3.Session()
173 |             return session.get_available_regions(service_name)
174 |         except Exception as e:
175 |             logger.warning(f"Could not get regions for {service_name}: {str(e)}")
176 |             return []
177 |     
178 |     @classmethod
179 |     def validate_region(cls, service_name: str, region: str) -> bool:
180 |         """
181 |         Validate if a region is available for a service.
182 |         
183 |         Args:
184 |             service_name: AWS service name
185 |             region: AWS region to validate
186 |             
187 |         Returns:
188 |             True if region is valid for service
189 |         """
190 |         available_regions = cls.get_available_regions(service_name)
191 |         return region in available_regions if available_regions else True
192 |     
193 |     @classmethod
194 |     def get_caller_identity(cls) -> Dict[str, Any]:
195 |         """
196 |         Get AWS caller identity information.
197 |         
198 |         Returns:
199 |             Dictionary with account ID, user ARN, etc.
200 |         """
201 |         try:
202 |             sts_client = cls.get_client('sts')
203 |             return sts_client.get_caller_identity()
204 |         except Exception as e:
205 |             logger.error(f"Failed to get caller identity: {str(e)}")
206 |             raise
207 | 
208 | 
209 | # Convenience functions
210 | def get_cost_explorer_client() -> boto3.client:
211 |     """Get Cost Explorer client (always us-east-1)."""
212 |     return AWSClientFactory.get_client('ce')
213 | 
214 | 
215 | def get_cost_optimization_hub_client() -> boto3.client:
216 |     """Get Cost Optimization Hub client (always us-east-1)."""
217 |     return AWSClientFactory.get_client('cost-optimization-hub')
218 | 
219 | 
220 | def get_compute_optimizer_client(region: Optional[str] = None) -> boto3.client:
221 |     """Get Compute Optimizer client."""
222 |     return AWSClientFactory.get_client('compute-optimizer', region)
223 | 
224 | 
225 | def get_trusted_advisor_client() -> boto3.client:
226 |     """Get Trusted Advisor client (always us-east-1)."""
227 |     return AWSClientFactory.get_client('support')
228 | 
229 | 
230 | def get_performance_insights_client(region: Optional[str] = None) -> boto3.client:
231 |     """Get Performance Insights client."""
232 |     return AWSClientFactory.get_client('pi', region)
233 | 
234 | 
235 | def get_regional_client(service_name: str, region: str) -> boto3.client:
236 |     """Get regional client for EC2, EBS, RDS, Lambda, S3, etc."""
237 |     return AWSClientFactory.get_client(service_name, region)
```

--------------------------------------------------------------------------------
/tests/run_tests.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Test runner for S3 optimization system.
  4 | 
  5 | This script provides a convenient way to run different test suites
  6 | with appropriate configurations and reporting.
  7 | """
  8 | 
  9 | import sys
 10 | import os
 11 | import subprocess
 12 | import argparse
 13 | from pathlib import Path
 14 | from typing import List, Optional
 15 | 
 16 | 
 17 | def run_command(cmd: List[str], description: str) -> int:
 18 |     """Run a command and return the exit code."""
 19 |     print(f"\n{'='*60}")
 20 |     print(f"Running: {description}")
 21 |     print(f"Command: {' '.join(cmd)}")
 22 |     print(f"{'='*60}")
 23 |     
 24 |     result = subprocess.run(cmd, cwd=Path(__file__).parent.parent)
 25 |     return result.returncode
 26 | 
 27 | 
 28 | def run_unit_tests(verbose: bool = False, coverage: bool = True) -> int:
 29 |     """Run unit tests."""
 30 |     cmd = ["python", "-m", "pytest", "tests/unit/"]
 31 |     
 32 |     if verbose:
 33 |         cmd.append("-v")
 34 |     
 35 |     if coverage:
 36 |         cmd.extend([
 37 |             "--cov=core",
 38 |             "--cov=services", 
 39 |             "--cov-report=term-missing",
 40 |             "--cov-report=html:htmlcov/unit"
 41 |         ])
 42 |     
 43 |     cmd.extend([
 44 |         "-m", "unit",
 45 |         "--tb=short"
 46 |     ])
 47 |     
 48 |     return run_command(cmd, "Unit Tests")
 49 | 
 50 | 
 51 | def run_integration_tests(verbose: bool = False) -> int:
 52 |     """Run integration tests."""
 53 |     cmd = ["python", "-m", "pytest", "tests/integration/"]
 54 |     
 55 |     if verbose:
 56 |         cmd.append("-v")
 57 |     
 58 |     cmd.extend([
 59 |         "-m", "integration",
 60 |         "--tb=short"
 61 |     ])
 62 |     
 63 |     return run_command(cmd, "Integration Tests")
 64 | 
 65 | 
 66 | def run_performance_tests(verbose: bool = False) -> int:
 67 |     """Run performance tests."""
 68 |     cmd = ["python", "-m", "pytest", "tests/performance/"]
 69 |     
 70 |     if verbose:
 71 |         cmd.append("-v")
 72 |     
 73 |     cmd.extend([
 74 |         "-m", "performance",
 75 |         "--tb=short",
 76 |         "--benchmark-only",
 77 |         "--benchmark-sort=mean"
 78 |     ])
 79 |     
 80 |     return run_command(cmd, "Performance Tests")
 81 | 
 82 | 
 83 | def run_cost_validation_tests(verbose: bool = False) -> int:
 84 |     """Run critical no-cost constraint validation tests."""
 85 |     cmd = ["python", "-m", "pytest", "tests/no_cost_validation/"]
 86 |     
 87 |     if verbose:
 88 |         cmd.append("-v")
 89 |     
 90 |     cmd.extend([
 91 |         "-m", "no_cost_validation",
 92 |         "--tb=long",  # More detailed output for critical tests
 93 |         "--strict-markers"
 94 |     ])
 95 |     
 96 |     return run_command(cmd, "No-Cost Constraint Validation Tests (CRITICAL)")
 97 | 
 98 | 
 99 | def run_all_tests(verbose: bool = False, coverage: bool = True) -> int:
100 |     """Run all test suites."""
101 |     cmd = ["python", "-m", "pytest", "tests/"]
102 |     
103 |     if verbose:
104 |         cmd.append("-v")
105 |     
106 |     if coverage:
107 |         cmd.extend([
108 |             "--cov=core",
109 |             "--cov=services",
110 |             "--cov-report=term-missing", 
111 |             "--cov-report=html:htmlcov/all",
112 |             "--cov-fail-under=80"
113 |         ])
114 |     
115 |     cmd.extend([
116 |         "--tb=short",
117 |         "--durations=10"
118 |     ])
119 |     
120 |     return run_command(cmd, "All Tests")
121 | 
122 | 
123 | def run_specific_test(test_path: str, verbose: bool = False) -> int:
124 |     """Run a specific test file or directory."""
125 |     cmd = ["python", "-m", "pytest", test_path]
126 |     
127 |     if verbose:
128 |         cmd.append("-v")
129 |     
130 |     cmd.extend(["--tb=short"])
131 |     
132 |     return run_command(cmd, f"Specific Test: {test_path}")
133 | 
134 | 
135 | def check_test_environment() -> bool:
136 |     """Check if the test environment is properly set up."""
137 |     print("Checking test environment...")
138 |     
139 |     # Check if pytest is available
140 |     try:
141 |         import pytest
142 |         print(f"✓ pytest {pytest.__version__} is available")
143 |     except ImportError:
144 |         print("✗ pytest is not installed")
145 |         return False
146 |     
147 |     # Check if moto is available for AWS mocking
148 |     try:
149 |         import moto
150 |         print(f"✓ moto {moto.__version__} is available")
151 |     except ImportError:
152 |         print("✗ moto is not installed")
153 |         return False
154 |     
155 |     # Check if core modules can be imported
156 |     try:
157 |         sys.path.insert(0, str(Path(__file__).parent.parent))
158 |         from playbooks.s3.base_analyzer import BaseAnalyzer
159 |         from services.s3_service import S3Service
160 |         print("✓ Core modules can be imported")
161 |     except ImportError as e:
162 |         print(f"✗ Cannot import core modules: {e}")
163 |         return False
164 |     
165 |     print("✓ Test environment is ready")
166 |     return True
167 | 
168 | 
169 | def generate_test_report() -> int:
170 |     """Generate comprehensive test report."""
171 |     cmd = [
172 |         "python", "-m", "pytest", "tests/",
173 |         "--html=test_report.html",
174 |         "--self-contained-html",
175 |         "--json-report",
176 |         "--json-report-file=test_report.json",
177 |         "--cov=core",
178 |         "--cov=services",
179 |         "--cov-report=html:htmlcov/report",
180 |         "--tb=short"
181 |     ]
182 |     
183 |     return run_command(cmd, "Comprehensive Test Report Generation")
184 | 
185 | 
186 | def main():
187 |     """Main test runner function."""
188 |     parser = argparse.ArgumentParser(
189 |         description="Test runner for S3 optimization system",
190 |         formatter_class=argparse.RawDescriptionHelpFormatter,
191 |         epilog="""
192 | Examples:
193 |   python run_tests.py --unit                    # Run unit tests only
194 |   python run_tests.py --integration             # Run integration tests only
195 |   python run_tests.py --performance             # Run performance tests only
196 |   python run_tests.py --cost-validation         # Run cost validation tests only
197 |   python run_tests.py --all                     # Run all tests
198 |   python run_tests.py --specific tests/unit/    # Run specific test directory
199 |   python run_tests.py --report                  # Generate comprehensive report
200 |   python run_tests.py --check                   # Check test environment
201 |         """
202 |     )
203 |     
204 |     # Test suite selection
205 |     parser.add_argument("--unit", action="store_true", help="Run unit tests")
206 |     parser.add_argument("--integration", action="store_true", help="Run integration tests")
207 |     parser.add_argument("--performance", action="store_true", help="Run performance tests")
208 |     parser.add_argument("--cost-validation", action="store_true", help="Run cost validation tests")
209 |     parser.add_argument("--all", action="store_true", help="Run all tests")
210 |     parser.add_argument("--specific", type=str, help="Run specific test file or directory")
211 |     
212 |     # Utility options
213 |     parser.add_argument("--report", action="store_true", help="Generate comprehensive test report")
214 |     parser.add_argument("--check", action="store_true", help="Check test environment")
215 |     
216 |     # Test options
217 |     parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
218 |     parser.add_argument("--no-coverage", action="store_true", help="Disable coverage reporting")
219 |     
220 |     args = parser.parse_args()
221 |     
222 |     # Check environment if requested
223 |     if args.check:
224 |         if check_test_environment():
225 |             return 0
226 |         else:
227 |             return 1
228 |     
229 |     # Generate report if requested
230 |     if args.report:
231 |         return generate_test_report()
232 |     
233 |     # Determine which tests to run
234 |     exit_code = 0
235 |     coverage = not args.no_coverage
236 |     
237 |     if args.unit:
238 |         exit_code = run_unit_tests(args.verbose, coverage)
239 |     elif args.integration:
240 |         exit_code = run_integration_tests(args.verbose)
241 |     elif args.performance:
242 |         exit_code = run_performance_tests(args.verbose)
243 |     elif args.cost_validation:
244 |         exit_code = run_cost_validation_tests(args.verbose)
245 |     elif args.all:
246 |         exit_code = run_all_tests(args.verbose, coverage)
247 |     elif args.specific:
248 |         exit_code = run_specific_test(args.specific, args.verbose)
249 |     else:
250 |         # Default: run unit and integration tests
251 |         print("No specific test suite selected. Running unit and integration tests...")
252 |         exit_code = run_unit_tests(args.verbose, coverage)
253 |         if exit_code == 0:
254 |             exit_code = run_integration_tests(args.verbose)
255 |     
256 |     # Summary
257 |     if exit_code == 0:
258 |         print(f"\n{'='*60}")
259 |         print("✓ All tests passed successfully!")
260 |         print(f"{'='*60}")
261 |     else:
262 |         print(f"\n{'='*60}")
263 |         print("✗ Some tests failed. Check the output above for details.")
264 |         print(f"{'='*60}")
265 |     
266 |     return exit_code
267 | 
268 | 
269 | if __name__ == "__main__":
270 |     sys.exit(main())
```

--------------------------------------------------------------------------------
/tests/legacy/test_setup_verification.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Setup verification test to ensure the testing framework is working correctly.
  3 | 
  4 | This test validates that the testing infrastructure is properly configured
  5 | and can run basic tests with mocked AWS services.
  6 | """
  7 | 
  8 | import pytest
  9 | import asyncio
 10 | from unittest.mock import Mock, AsyncMock, patch
 11 | from datetime import datetime
 12 | 
 13 | 
 14 | @pytest.mark.unit
 15 | class TestSetupVerification:
 16 |     """Verify that the testing setup is working correctly."""
 17 |     
 18 |     def test_pytest_is_working(self):
 19 |         """Test that pytest is working correctly."""
 20 |         assert True
 21 |     
 22 |     def test_fixtures_are_available(self, mock_s3_service, mock_storage_lens_service, 
 23 |                                   mock_pricing_service):
 24 |         """Test that common fixtures are available."""
 25 |         assert mock_s3_service is not None
 26 |         assert mock_storage_lens_service is not None
 27 |         assert mock_pricing_service is not None
 28 |     
 29 |     @pytest.mark.asyncio
 30 |     async def test_async_testing_works(self):
 31 |         """Test that async testing is working."""
 32 |         async def async_function():
 33 |             await asyncio.sleep(0.001)
 34 |             return "async_result"
 35 |         
 36 |         result = await async_function()
 37 |         assert result == "async_result"
 38 |     
 39 |     def test_mocking_works(self):
 40 |         """Test that mocking is working correctly."""
 41 |         mock_service = Mock()
 42 |         mock_service.test_method.return_value = "mocked_result"
 43 |         
 44 |         result = mock_service.test_method()
 45 |         assert result == "mocked_result"
 46 |         mock_service.test_method.assert_called_once()
 47 |     
 48 |     def test_aws_mocking_works(self, mock_aws_credentials):
 49 |         """Test that AWS service mocking is working."""
 50 |         with patch('boto3.client') as mock_boto_client:
 51 |             mock_client = Mock()
 52 |             mock_client.list_buckets.return_value = {"Buckets": []}
 53 |             mock_boto_client.return_value = mock_client
 54 |             
 55 |             import boto3
 56 |             s3_client = boto3.client('s3')
 57 |             result = s3_client.list_buckets()
 58 |             
 59 |             assert result == {"Buckets": []}
 60 |     
 61 |     def test_cost_constraint_validator_works(self, cost_constraint_validator):
 62 |         """Test that cost constraint validator is working."""
 63 |         # Should allow valid operations
 64 |         assert cost_constraint_validator.validate_operation('list_buckets') is True
 65 |         
 66 |         # Should reject forbidden operations
 67 |         with pytest.raises(ValueError):
 68 |             cost_constraint_validator.validate_operation('list_objects_v2')
 69 |         
 70 |         summary = cost_constraint_validator.get_operation_summary()
 71 |         assert summary["total_operations"] == 2
 72 |         assert "list_buckets" in summary["allowed_called"]
 73 |         assert "list_objects_v2" in summary["forbidden_called"]
 74 |     
 75 |     def test_performance_tracker_works(self, performance_tracker):
 76 |         """Test that performance tracker is working."""
 77 |         performance_tracker.start_timer("test_operation")
 78 |         # Simulate some work
 79 |         import time
 80 |         time.sleep(0.01)
 81 |         duration = performance_tracker.end_timer("test_operation")
 82 |         
 83 |         assert duration > 0
 84 |         assert duration < 1.0  # Should be very quick
 85 |         
 86 |         metrics = performance_tracker.get_metrics()
 87 |         assert "test_operation" in metrics
 88 |         assert metrics["test_operation"] > 0
 89 |     
 90 |     def test_test_data_factory_works(self, test_data_factory):
 91 |         """Test that test data factory is working."""
 92 |         buckets = test_data_factory.create_bucket_data(count=3)
 93 |         assert len(buckets) == 3
 94 |         assert all("Name" in bucket for bucket in buckets)
 95 |         assert all("CreationDate" in bucket for bucket in buckets)
 96 |         
 97 |         cost_data = test_data_factory.create_cost_data(days=5)
 98 |         assert "ResultsByTime" in cost_data
 99 |         assert len(cost_data["ResultsByTime"]) == 5
100 |         
101 |         analysis_result = test_data_factory.create_analysis_result("test_analysis")
102 |         assert analysis_result["status"] == "success"
103 |         assert analysis_result["analysis_type"] == "test_analysis"
104 | 
105 | 
106 | @pytest.mark.integration
107 | class TestIntegrationSetupVerification:
108 |     """Verify that integration testing setup is working."""
109 |     
110 |     @pytest.mark.asyncio
111 |     async def test_orchestrator_can_be_mocked(self, mock_service_orchestrator):
112 |         """Test that orchestrator can be properly mocked for integration tests."""
113 |         # This would normally import the real orchestrator, but we'll mock it
114 |         with patch('core.s3_optimization_orchestrator.ServiceOrchestrator', return_value=mock_service_orchestrator), \
115 |              patch('core.s3_optimization_orchestrator.get_performance_monitor'), \
116 |              patch('core.s3_optimization_orchestrator.get_memory_manager'), \
117 |              patch('core.s3_optimization_orchestrator.get_timeout_handler'), \
118 |              patch('core.s3_optimization_orchestrator.get_pricing_cache'), \
119 |              patch('core.s3_optimization_orchestrator.get_bucket_metadata_cache'), \
120 |              patch('core.s3_optimization_orchestrator.get_analysis_results_cache'):
121 |             
122 |             from playbooks.s3.s3_optimization_orchestrator import S3OptimizationOrchestrator
123 |             
124 |             orchestrator = S3OptimizationOrchestrator(region="us-east-1")
125 |             assert orchestrator.region == "us-east-1"
126 |             assert orchestrator.service_orchestrator == mock_service_orchestrator
127 | 
128 | 
129 | @pytest.mark.performance
130 | class TestPerformanceSetupVerification:
131 |     """Verify that performance testing setup is working."""
132 |     
133 |     @pytest.mark.asyncio
134 |     async def test_performance_measurement_works(self, performance_tracker):
135 |         """Test that performance measurement is working."""
136 |         performance_tracker.start_timer("performance_test")
137 |         
138 |         # Simulate some async work
139 |         await asyncio.sleep(0.01)
140 |         
141 |         duration = performance_tracker.end_timer("performance_test")
142 |         
143 |         assert duration > 0.005  # Should be at least 5ms
144 |         assert duration < 0.1    # Should be less than 100ms
145 |         
146 |         # Test performance assertion
147 |         performance_tracker.assert_performance("performance_test", 0.1)
148 | 
149 | 
150 | @pytest.mark.no_cost_validation
151 | class TestCostValidationSetupVerification:
152 |     """Verify that cost validation testing setup is working."""
153 |     
154 |     def test_cost_constraint_system_is_active(self):
155 |         """Test that cost constraint validation system is active."""
156 |         from services.s3_service import ALLOWED_S3_OPERATIONS, FORBIDDEN_S3_OPERATIONS
157 |         
158 |         # Verify that the constraint lists are populated
159 |         assert len(ALLOWED_S3_OPERATIONS) > 0
160 |         assert len(FORBIDDEN_S3_OPERATIONS) > 0
161 |         
162 |         # Verify critical operations are in the right lists
163 |         assert 'list_buckets' in ALLOWED_S3_OPERATIONS
164 |         assert 'list_objects_v2' in FORBIDDEN_S3_OPERATIONS
165 |     
166 |     def test_cost_constraint_violation_error_works(self, mock_aws_credentials):
167 |         """Test that cost constraint violation errors work correctly."""
168 |         from services.s3_service import S3Service, S3CostConstraintViolationError
169 |         
170 |         service = S3Service(region="us-east-1")
171 |         
172 |         with pytest.raises(S3CostConstraintViolationError):
173 |             service._validate_s3_operation('list_objects_v2')
174 |     
175 |     def test_cost_validator_fixture_works(self, cost_constraint_validator):
176 |         """Test that cost validator fixture is working correctly."""
177 |         # Should track operations
178 |         cost_constraint_validator.validate_operation('list_buckets')
179 |         
180 |         # Should reject forbidden operations
181 |         with pytest.raises(ValueError):
182 |             cost_constraint_validator.validate_operation('get_object')
183 |         
184 |         summary = cost_constraint_validator.get_operation_summary()
185 |         assert summary["total_operations"] == 2
186 |         assert len(summary["forbidden_called"]) == 1
187 |         assert len(summary["allowed_called"]) == 1
188 | 
189 | 
190 | class TestMarkerSystem:
191 |     """Test that the pytest marker system is working."""
192 |     
193 |     def test_markers_are_configured(self):
194 |         """Test that pytest markers are properly configured."""
195 |         # This test itself uses markers, so if it runs, markers are working
196 |         assert True
197 |     
198 |     def test_can_run_specific_marker_tests(self):
199 |         """Test that we can run tests with specific markers."""
200 |         # This would be tested by running: pytest -m unit
201 |         # If this test runs when using -m unit, then markers work
202 |         assert True
```

--------------------------------------------------------------------------------
/tests/test_setup_verification.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Setup verification test to ensure the testing framework is working correctly.
  3 | 
  4 | This test validates that the testing infrastructure is properly configured
  5 | and can run basic tests with mocked AWS services.
  6 | """
  7 | 
  8 | import pytest
  9 | import asyncio
 10 | from unittest.mock import Mock, AsyncMock, patch
 11 | from datetime import datetime
 12 | 
 13 | 
 14 | @pytest.mark.unit
 15 | class TestSetupVerification:
 16 |     """Verify that the testing setup is working correctly."""
 17 |     
 18 |     def test_pytest_is_working(self):
 19 |         """Test that pytest is working correctly."""
 20 |         assert True
 21 |     
 22 |     def test_fixtures_are_available(self, mock_s3_service, mock_storage_lens_service, 
 23 |                                   mock_pricing_service):
 24 |         """Test that common fixtures are available."""
 25 |         assert mock_s3_service is not None
 26 |         assert mock_storage_lens_service is not None
 27 |         assert mock_pricing_service is not None
 28 |     
 29 |     @pytest.mark.asyncio
 30 |     async def test_async_testing_works(self):
 31 |         """Test that async testing is working."""
 32 |         async def async_function():
 33 |             await asyncio.sleep(0.001)
 34 |             return "async_result"
 35 |         
 36 |         result = await async_function()
 37 |         assert result == "async_result"
 38 |     
 39 |     def test_mocking_works(self):
 40 |         """Test that mocking is working correctly."""
 41 |         mock_service = Mock()
 42 |         mock_service.test_method.return_value = "mocked_result"
 43 |         
 44 |         result = mock_service.test_method()
 45 |         assert result == "mocked_result"
 46 |         mock_service.test_method.assert_called_once()
 47 |     
 48 |     def test_aws_mocking_works(self, mock_aws_credentials):
 49 |         """Test that AWS service mocking is working."""
 50 |         with patch('boto3.client') as mock_boto_client:
 51 |             mock_client = Mock()
 52 |             mock_client.list_buckets.return_value = {"Buckets": []}
 53 |             mock_boto_client.return_value = mock_client
 54 |             
 55 |             import boto3
 56 |             s3_client = boto3.client('s3')
 57 |             result = s3_client.list_buckets()
 58 |             
 59 |             assert result == {"Buckets": []}
 60 |     
 61 |     def test_cost_constraint_validator_works(self, cost_constraint_validator):
 62 |         """Test that cost constraint validator is working."""
 63 |         # Should allow valid operations
 64 |         assert cost_constraint_validator.validate_operation('list_buckets') is True
 65 |         
 66 |         # Should reject forbidden operations
 67 |         with pytest.raises(ValueError):
 68 |             cost_constraint_validator.validate_operation('list_objects_v2')
 69 |         
 70 |         summary = cost_constraint_validator.get_operation_summary()
 71 |         assert summary["total_operations"] == 2
 72 |         assert "list_buckets" in summary["allowed_called"]
 73 |         assert "list_objects_v2" in summary["forbidden_called"]
 74 |     
 75 |     def test_performance_tracker_works(self, performance_tracker):
 76 |         """Test that performance tracker is working."""
 77 |         performance_tracker.start_timer("test_operation")
 78 |         # Simulate some work
 79 |         import time
 80 |         time.sleep(0.01)
 81 |         duration = performance_tracker.end_timer("test_operation")
 82 |         
 83 |         assert duration > 0
 84 |         assert duration < 1.0  # Should be very quick
 85 |         
 86 |         metrics = performance_tracker.get_metrics()
 87 |         assert "test_operation" in metrics
 88 |         assert metrics["test_operation"] > 0
 89 |     
 90 |     def test_test_data_factory_works(self, test_data_factory):
 91 |         """Test that test data factory is working."""
 92 |         buckets = test_data_factory.create_bucket_data(count=3)
 93 |         assert len(buckets) == 3
 94 |         assert all("Name" in bucket for bucket in buckets)
 95 |         assert all("CreationDate" in bucket for bucket in buckets)
 96 |         
 97 |         cost_data = test_data_factory.create_cost_data(days=5)
 98 |         assert "ResultsByTime" in cost_data
 99 |         assert len(cost_data["ResultsByTime"]) == 5
100 |         
101 |         analysis_result = test_data_factory.create_analysis_result("test_analysis")
102 |         assert analysis_result["status"] == "success"
103 |         assert analysis_result["analysis_type"] == "test_analysis"
104 | 
105 | 
106 | @pytest.mark.integration
107 | class TestIntegrationSetupVerification:
108 |     """Verify that integration testing setup is working."""
109 |     
110 |     @pytest.mark.asyncio
111 |     async def test_orchestrator_can_be_mocked(self, mock_service_orchestrator):
112 |         """Test that orchestrator can be properly mocked for integration tests."""
113 |         # This would normally import the real orchestrator, but we'll mock it
114 |         with patch('core.s3_optimization_orchestrator.ServiceOrchestrator', return_value=mock_service_orchestrator), \
115 |              patch('core.s3_optimization_orchestrator.get_performance_monitor'), \
116 |              patch('core.s3_optimization_orchestrator.get_memory_manager'), \
117 |              patch('core.s3_optimization_orchestrator.get_timeout_handler'), \
118 |              patch('core.s3_optimization_orchestrator.get_pricing_cache'), \
119 |              patch('core.s3_optimization_orchestrator.get_bucket_metadata_cache'), \
120 |              patch('core.s3_optimization_orchestrator.get_analysis_results_cache'):
121 |             
122 |             from playbooks.s3.s3_optimization_orchestrator import S3OptimizationOrchestrator
123 |             
124 |             orchestrator = S3OptimizationOrchestrator(region="us-east-1")
125 |             assert orchestrator.region == "us-east-1"
126 |             assert orchestrator.service_orchestrator == mock_service_orchestrator
127 | 
128 | 
129 | @pytest.mark.performance
130 | class TestPerformanceSetupVerification:
131 |     """Verify that performance testing setup is working."""
132 |     
133 |     @pytest.mark.asyncio
134 |     async def test_performance_measurement_works(self, performance_tracker):
135 |         """Test that performance measurement is working."""
136 |         performance_tracker.start_timer("performance_test")
137 |         
138 |         # Simulate some async work
139 |         await asyncio.sleep(0.01)
140 |         
141 |         duration = performance_tracker.end_timer("performance_test")
142 |         
143 |         assert duration > 0.005  # Should be at least 5ms
144 |         assert duration < 0.1    # Should be less than 100ms
145 |         
146 |         # Test performance assertion
147 |         performance_tracker.assert_performance("performance_test", 0.1)
148 | 
149 | 
150 | @pytest.mark.no_cost_validation
151 | class TestCostValidationSetupVerification:
152 |     """Verify that cost validation testing setup is working."""
153 |     
154 |     def test_cost_constraint_system_is_active(self):
155 |         """Test that cost constraint validation system is active."""
156 |         from services.s3_service import ALLOWED_S3_OPERATIONS, FORBIDDEN_S3_OPERATIONS
157 |         
158 |         # Verify that the constraint lists are populated
159 |         assert len(ALLOWED_S3_OPERATIONS) > 0
160 |         assert len(FORBIDDEN_S3_OPERATIONS) > 0
161 |         
162 |         # Verify critical operations are in the right lists
163 |         assert 'list_buckets' in ALLOWED_S3_OPERATIONS
164 |         assert 'list_objects_v2' in FORBIDDEN_S3_OPERATIONS
165 |     
166 |     def test_cost_constraint_violation_error_works(self, mock_aws_credentials):
167 |         """Test that cost constraint violation errors work correctly."""
168 |         from services.s3_service import S3Service, S3CostConstraintViolationError
169 |         
170 |         service = S3Service(region="us-east-1")
171 |         
172 |         with pytest.raises(S3CostConstraintViolationError):
173 |             service._validate_s3_operation('list_objects_v2')
174 |     
175 |     def test_cost_validator_fixture_works(self, cost_constraint_validator):
176 |         """Test that cost validator fixture is working correctly."""
177 |         # Should track operations
178 |         cost_constraint_validator.validate_operation('list_buckets')
179 |         
180 |         # Should reject forbidden operations
181 |         with pytest.raises(ValueError):
182 |             cost_constraint_validator.validate_operation('get_object')
183 |         
184 |         summary = cost_constraint_validator.get_operation_summary()
185 |         assert summary["total_operations"] == 2
186 |         assert len(summary["forbidden_called"]) == 1
187 |         assert len(summary["allowed_called"]) == 1
188 | 
189 | 
190 | class TestMarkerSystem:
191 |     """Test that the pytest marker system is working."""
192 |     
193 |     def test_markers_are_configured(self):
194 |         """Test that pytest markers are properly configured."""
195 |         # This test itself uses markers, so if it runs, markers are working
196 |         assert True
197 |     
198 |     def test_can_run_specific_marker_tests(self):
199 |         """Test that we can run tests with specific markers."""
200 |         # This would be tested by running: pytest -m unit
201 |         # If this test runs when using -m unit, then markers work
202 |         assert True
```

--------------------------------------------------------------------------------
/tests/legacy/example_wellarchitected_output.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Example showing enhanced tool outputs with Well-Architected Framework recommendations
  4 | """
  5 | 
  6 | import json
  7 | from utils.documentation_links import add_documentation_links
  8 | 
  9 | def show_enhanced_examples():
 10 |     """Show examples of enhanced tool outputs with Well-Architected recommendations"""
 11 |     
 12 |     print("CFM Tips - Enhanced Output with Well-Architected Framework")
 13 |     print("=" * 70)
 14 |     print()
 15 |     
 16 |     # Example 1: EC2 Right-sizing with Well-Architected guidance
 17 |     print("1. EC2 Right-sizing Analysis - Enhanced Output:")
 18 |     print("-" * 50)
 19 |     ec2_result = {
 20 |         "status": "success",
 21 |         "data": {
 22 |             "underutilized_instances": [
 23 |                 {
 24 |                     "instance_id": "i-1234567890abcdef0",
 25 |                     "instance_type": "m5.2xlarge",
 26 |                     "finding": "Overprovisioned",
 27 |                     "avg_cpu_utilization": 8.5,
 28 |                     "avg_memory_utilization": 12.3,
 29 |                     "recommendation": {
 30 |                         "recommended_instance_type": "m5.large",
 31 |                         "estimated_monthly_savings": 180.50,
 32 |                         "confidence": "High"
 33 |                     }
 34 |                 },
 35 |                 {
 36 |                     "instance_id": "i-0987654321fedcba0",
 37 |                     "instance_type": "c5.xlarge",
 38 |                     "finding": "Underprovisioned",
 39 |                     "avg_cpu_utilization": 85.2,
 40 |                     "recommendation": {
 41 |                         "recommended_instance_type": "c5.2xlarge",
 42 |                         "estimated_monthly_cost_increase": 120.00,
 43 |                         "performance_improvement": "40%"
 44 |                     }
 45 |                 }
 46 |             ],
 47 |             "count": 2,
 48 |             "total_monthly_savings": 180.50,
 49 |             "analysis_period": "14 days",
 50 |             "data_source": "AWS Compute Optimizer"
 51 |         },
 52 |         "message": "Found 2 EC2 instances with optimization opportunities"
 53 |     }
 54 |     
 55 |     enhanced_ec2 = add_documentation_links(ec2_result, "ec2", "underutilized")
 56 |     
 57 |     # Show key sections
 58 |     print("Key Findings:")
 59 |     for instance in enhanced_ec2["data"]["underutilized_instances"]:
 60 |         print(f"  • {instance['instance_id']}: {instance['finding']} - Save ${instance['recommendation'].get('estimated_monthly_savings', 0)}/month")
 61 |     
 62 |     print(f"\nTotal Monthly Savings: ${enhanced_ec2['data']['total_monthly_savings']}")
 63 |     
 64 |     print("\nWell-Architected Framework Guidance:")
 65 |     wa_framework = enhanced_ec2["wellarchitected_framework"]
 66 |     print(f"  Cost Optimization Pillar: {wa_framework['cost_optimization_pillar']}")
 67 |     
 68 |     print("\n  Applicable Principles:")
 69 |     for principle in wa_framework["applicable_principles"]:
 70 |         print(f"    • {principle['title']}: {principle['description']}")
 71 |     
 72 |     print("\n  High Priority Recommendations:")
 73 |     for rec in wa_framework["implementation_priority"]["high"]:
 74 |         print(f"    • {rec}")
 75 |     
 76 |     print("\n  Service-Specific Best Practices:")
 77 |     for rec in wa_framework["service_specific_recommendations"][:2]:  # Show first 2
 78 |         print(f"    • {rec['practice']} ({rec['impact']})")
 79 |         print(f"      Implementation: {rec['implementation']}")
 80 |     
 81 |     print("\n" + "=" * 70)
 82 |     
 83 |     # Example 2: S3 Storage Optimization
 84 |     print("\n2. S3 Storage Optimization - Enhanced Output:")
 85 |     print("-" * 50)
 86 |     s3_result = {
 87 |         "status": "success",
 88 |         "comprehensive_s3_optimization": {
 89 |             "overview": {
 90 |                 "total_potential_savings": "$2,450.75",
 91 |                 "analyses_completed": "6/6",
 92 |                 "buckets_analyzed": 25,
 93 |                 "execution_time": "42.3s"
 94 |             },
 95 |             "key_findings": [
 96 |                 "15 buckets using suboptimal storage classes",
 97 |                 "Found 45 incomplete multipart uploads",
 98 |                 "Identified $1,200 in lifecycle policy savings",
 99 |                 "3 buckets with high request costs suitable for CloudFront"
100 |             ],
101 |             "top_recommendations": [
102 |                 {
103 |                     "type": "storage_class_optimization",
104 |                     "bucket": "analytics-data-lake",
105 |                     "finding": "Standard storage for infrequently accessed data",
106 |                     "recommendation": "Transition to Standard-IA after 30 days",
107 |                     "potential_savings": "$850.25/month",
108 |                     "priority": "High"
109 |                 },
110 |                 {
111 |                     "type": "lifecycle_policy",
112 |                     "bucket": "backup-archives",
113 |                     "finding": "No lifecycle policy for old backups",
114 |                     "recommendation": "Archive to Glacier Deep Archive after 90 days",
115 |                     "potential_savings": "$650.50/month",
116 |                     "priority": "High"
117 |                 }
118 |             ]
119 |         }
120 |     }
121 |     
122 |     enhanced_s3 = add_documentation_links(s3_result, "s3", "storage_optimization")
123 |     
124 |     print("Key Findings:")
125 |     for finding in enhanced_s3["comprehensive_s3_optimization"]["key_findings"]:
126 |         print(f"  • {finding}")
127 |     
128 |     print(f"\nTotal Potential Savings: {enhanced_s3['comprehensive_s3_optimization']['overview']['total_potential_savings']}")
129 |     
130 |     print("\nTop Recommendations:")
131 |     for rec in enhanced_s3["comprehensive_s3_optimization"]["top_recommendations"]:
132 |         print(f"  • {rec['bucket']}: {rec['recommendation']} - {rec['potential_savings']}")
133 |     
134 |     print("\nWell-Architected Framework Guidance:")
135 |     wa_s3 = enhanced_s3["wellarchitected_framework"]
136 |     
137 |     print("  High Priority Actions:")
138 |     for action in wa_s3["implementation_priority"]["high"]:
139 |         print(f"    • {action}")
140 |     
141 |     print("  Medium Priority Actions:")
142 |     for action in wa_s3["implementation_priority"]["medium"][:2]:  # Show first 2
143 |         print(f"    • {action}")
144 |     
145 |     print("\n" + "=" * 70)
146 |     
147 |     # Example 3: Multi-Service Comprehensive Analysis
148 |     print("\n3. Multi-Service Comprehensive Analysis - Enhanced Output:")
149 |     print("-" * 50)
150 |     comprehensive_result = {
151 |         "status": "success",
152 |         "comprehensive_analysis": {
153 |             "overview": {
154 |                 "total_monthly_cost": "$8,450.25",
155 |                 "total_potential_savings": "$2,180.75",
156 |                 "savings_percentage": "25.8%",
157 |                 "services_analyzed": ["EC2", "EBS", "RDS", "Lambda", "S3"]
158 |             },
159 |             "service_breakdown": {
160 |                 "ec2": {"current_cost": 3200, "potential_savings": 640, "optimization_opportunities": 12},
161 |                 "ebs": {"current_cost": 850, "potential_savings": 180, "optimization_opportunities": 8},
162 |                 "rds": {"current_cost": 2100, "potential_savings": 420, "optimization_opportunities": 3},
163 |                 "lambda": {"current_cost": 150, "potential_savings": 45, "optimization_opportunities": 15},
164 |                 "s3": {"current_cost": 2150, "potential_savings": 895, "optimization_opportunities": 22}
165 |             },
166 |             "top_opportunities": [
167 |                 {"service": "S3", "type": "Storage Class Optimization", "savings": 895, "effort": "Low"},
168 |                 {"service": "EC2", "type": "Right-sizing", "savings": 640, "effort": "Medium"},
169 |                 {"service": "RDS", "type": "Reserved Instances", "savings": 420, "effort": "Low"}
170 |             ]
171 |         }
172 |     }
173 |     
174 |     enhanced_comprehensive = add_documentation_links(comprehensive_result, None, "comprehensive")
175 |     
176 |     print("Cost Overview:")
177 |     overview = enhanced_comprehensive["comprehensive_analysis"]["overview"]
178 |     print(f"  • Current Monthly Cost: ${overview['total_monthly_cost']}")
179 |     print(f"  • Potential Savings: ${overview['total_potential_savings']} ({overview['savings_percentage']})")
180 |     
181 |     print("\nTop Optimization Opportunities:")
182 |     for opp in enhanced_comprehensive["comprehensive_analysis"]["top_opportunities"]:
183 |         print(f"  • {opp['service']} - {opp['type']}: ${opp['savings']}/month ({opp['effort']} effort)")
184 |     
185 |     print("\nWell-Architected Framework Principles:")
186 |     wa_comp = enhanced_comprehensive["wellarchitected_framework"]
187 |     for principle in wa_comp["principles"][:3]:  # Show first 3
188 |         print(f"  • {principle['title']}")
189 |         print(f"    {principle['description']}")
190 |         print(f"    Key practices: {', '.join(principle['best_practices'][:2])}")
191 |         print()
192 |     
193 |     print("=" * 70)
194 |     print("\nEnhanced Features Summary:")
195 |     print("✓ Documentation links to AWS best practices")
196 |     print("✓ Well-Architected Framework Cost Optimization pillar mapping")
197 |     print("✓ Service-specific implementation guidance")
198 |     print("✓ Impact assessment and priority ranking")
199 |     print("✓ Principle-based recommendations")
200 |     print("✓ Actionable next steps with implementation details")
201 | 
202 | if __name__ == "__main__":
203 |     show_enhanced_examples()
```

--------------------------------------------------------------------------------
/RUNBOOKS_GUIDE.md:
--------------------------------------------------------------------------------

```markdown
  1 | # AWS Cost Optimization Runbooks with MCP v3
  2 | 
  3 | This guide shows how to use the AWS Cost Optimization Runbooks with the MCP server that includes proper Cost Optimization Hub permissions.
  4 | 
  5 | ## What's Included
  6 | 
  7 | ### Core AWS Services
  8 | - ✅ **Cost Explorer** - Retrieve cost data and usage metrics
  9 | - ✅ **Cost Optimization Hub** - With correct permissions and API calls
 10 | - ✅ **Compute Optimizer** - Get right-sizing recommendations
 11 | - ✅ **Trusted Advisor** - Cost optimization checks
 12 | - ✅ **Performance Insights** - RDS performance metrics
 13 | 
 14 | ### Cost Optimization Runbooks
 15 | - 🔧 **EC2 Right Sizing** - Identify underutilized EC2 instances
 16 | - 💾 **EBS Optimization** - Find unused and underutilized volumes
 17 | - 🗄️ **RDS Optimization** - Identify idle and underutilized databases
 18 | - ⚡ **Lambda Optimization** - Find overprovisioned and unused functions
 19 | - 📋 **CloudTrail Optimization** - Identify duplicate management event trails
 20 | - 📊 **Comprehensive Analysis** - Multi-service cost analysis
 21 | 
 22 | ## Quick Start
 23 | 
 24 | ### 1. Setup
 25 | ```bash
 26 | cd <replace-with-project-folder>/
 27 | 
 28 | # Make sure all files are executable
 29 | chmod +x mcp_server_with_runbooks.py
 30 | 
 31 | # Test the server
 32 | python3 -m py_compile mcp_server_with_runbooks.py
 33 | python3 -c "from playbooks.ec2.ec2_optimization import run_ec2_right_sizing_analysis; print('Playbooks OK')"
 34 | ```
 35 | 
 36 | ### 2. Configure AWS Permissions
 37 | 
 38 | Apply the correct IAM policy for Cost Optimization Hub:
 39 | 
 40 | ```json
 41 | {
 42 |   "Version": "2012-10-17",
 43 |   "Statement": [
 44 |     {
 45 |       "Effect": "Allow",
 46 |       "Action": [
 47 |         "cost-optimization-hub:ListEnrollmentStatuses",
 48 |         "cost-optimization-hub:ListRecommendations",
 49 |         "cost-optimization-hub:GetRecommendation", 
 50 |         "cost-optimization-hub:ListRecommendationSummaries",
 51 |         "ce:GetCostAndUsage",
 52 |         "ce:GetCostForecast",
 53 |         "compute-optimizer:GetEC2InstanceRecommendations",
 54 |         "compute-optimizer:GetEBSVolumeRecommendations",
 55 |         "compute-optimizer:GetLambdaFunctionRecommendations",
 56 |         "ec2:DescribeInstances",
 57 |         "ec2:DescribeVolumes",
 58 |         "rds:DescribeDBInstances",
 59 |         "lambda:ListFunctions",
 60 |         "cloudwatch:GetMetricStatistics",
 61 |         "s3:ListBucket",
 62 |         "s3:ListObjectsV2",
 63 |         "support:DescribeTrustedAdvisorChecks",
 64 |         "support:DescribeTrustedAdvisorCheckResult",
 65 |         "pi:GetResourceMetrics",
 66 |         "cloudtrail:DescribeTrails",
 67 |         "cloudtrail:GetTrailStatus",
 68 |         "cloudtrail:GetEventSelectors"
 69 |       ],
 70 |       "Resource": "*"
 71 |     }
 72 |   ]
 73 | }
 74 | ```
 75 | 
 76 | ### 3. Install dependencies
 77 | pip install -r requirements_fixed.txt
 78 | 
 79 | ### 4. Configure AWS credentials
 80 | aws configure
 81 | 
 82 | ### 5.  Add the MCP server config to Amazon Q using the mcp_runbooks.json as a template
 83 | vi ~/.aws/amazonq/mcp.json
 84 | 
 85 | ## Available Runbook Tools
 86 | 
 87 | ### EC2 Right Sizing Runbooks
 88 | 
 89 | #### 1. `ec2_rightsizing`
 90 | Analyze EC2 instances for right-sizing opportunities.
 91 | 
 92 | **Example Usage:**
 93 | ```
 94 | "Run EC2 right-sizing analysis for us-east-1 region with 14-day lookback period"
 95 | ```
 96 | 
 97 | **Parameters:**
 98 | - `region`: AWS region to analyze
 99 | - `lookback_period_days`: Days to analyze (default: 14)
100 | - `cpu_threshold`: CPU utilization threshold % (default: 40.0)
101 | 
102 | #### 2. `ec2_report`
103 | Generate comprehensive EC2 right-sizing report.
104 | 
105 | **Example Usage:**
106 | ```
107 | "Generate an EC2 right-sizing report for us-east-1 in markdown format"
108 | ```
109 | 
110 | ### EBS Optimization Runbooks
111 | 
112 | #### 1. `ebs_optimization`
113 | Analyze EBS volumes for optimization opportunities.
114 | 
115 | **Example Usage:**
116 | ```
117 | "Analyze EBS volumes in us-east-1 for optimization opportunities"
118 | ```
119 | 
120 | #### 2. `ebs_unused`
121 | Find unused EBS volumes that can be deleted.
122 | 
123 | **Example Usage:**
124 | ```
125 | "Find unused EBS volumes older than 30 days in us-east-1"
126 | ```
127 | 
128 | #### 3. `ebs_report`
129 | Generate comprehensive EBS optimization report.
130 | 
131 | **Example Usage:**
132 | ```
133 | "Generate a comprehensive EBS optimization report for us-east-1"
134 | ```
135 | 
136 | ### RDS Optimization Runbooks
137 | 
138 | #### 1. `rds_optimization`
139 | Analyze RDS instances for optimization opportunities.
140 | 
141 | **Example Usage:**
142 | ```
143 | "Analyze RDS instances in us-east-1 for underutilization"
144 | ```
145 | 
146 | #### 2. `rds_idle`
147 | Find idle RDS instances with minimal activity.
148 | 
149 | **Example Usage:**
150 | ```
151 | "Find idle RDS instances with less than 1 connection in the last 7 days"
152 | ```
153 | 
154 | #### 3. `rds_report`
155 | Generate comprehensive RDS optimization report.
156 | 
157 | **Example Usage:**
158 | ```
159 | "Generate an RDS optimization report for us-east-1"
160 | ```
161 | 
162 | ### Lambda Optimization Runbooks
163 | 
164 | #### 1. `lambda_optimization`
165 | Analyze Lambda functions for optimization opportunities.
166 | 
167 | **Example Usage:**
168 | ```
169 | "Analyze Lambda functions in us-east-1 for memory optimization"
170 | ```
171 | 
172 | #### 2. `lambda_unused`
173 | Find unused Lambda functions.
174 | 
175 | **Example Usage:**
176 | ```
177 | "Find Lambda functions with less than 5 invocations in the last 30 days"
178 | ```
179 | 
180 | #### 3. `lambda_report`
181 | Generate comprehensive Lambda optimization report.
182 | 
183 | **Example Usage:**
184 | ```
185 | "Generate a Lambda optimization report for us-east-1"
186 | ```
187 | 
188 | ### CloudTrail Optimization Runbooks
189 | 
190 | #### 1. `get_management_trails`
191 | Get CloudTrail trails that have management events enabled.
192 | 
193 | **Example Usage:**
194 | ```
195 | "Show me all CloudTrail trails with management events enabled in us-east-1"
196 | ```
197 | 
198 | #### 2. `run_cloudtrail_trails_analysis`
199 | Analyze CloudTrail trails to identify duplicate management event trails.
200 | 
201 | **Example Usage:**
202 | ```
203 | "Analyze CloudTrail trails in us-east-1 for cost optimization opportunities"
204 | ```
205 | 
206 | **Parameters:**
207 | - `region`: AWS region to analyze
208 | 
209 | #### 3. `generate_cloudtrail_report`
210 | Generate comprehensive CloudTrail optimization report.
211 | 
212 | **Example Usage:**
213 | ```
214 | "Generate a CloudTrail optimization report for us-east-1 in markdown format"
215 | ```
216 | 
217 | **Parameters:**
218 | - `region`: AWS region to analyze
219 | - `format`: "json" or "markdown" (default: "json")
220 | 
221 | ### Comprehensive Analysis
222 | 
223 | #### `comprehensive_analysis`
224 | Run analysis across all services (EC2, EBS, RDS, Lambda).
225 | 
226 | **Example Usage:**
227 | ```
228 | "Run comprehensive cost analysis for us-east-1 covering all services"
229 | ```
230 | 
231 | **Parameters:**
232 | - `region`: AWS region to analyze
233 | - `services`: Array of services ["ec2", "ebs", "rds", "lambda"]
234 | - `lookback_period_days`: Days to analyze (default: 14)
235 | - `output_format`: "json" or "markdown"
236 | 
237 | ### Cost Optimization Hub Tools (Shortened)
238 | 
239 | #### 1. `list_coh_enrollment`
240 | Check Cost Optimization Hub enrollment status.
241 | 
242 | #### 2. `get_coh_recommendations`
243 | Get cost optimization recommendations.
244 | 
245 | #### 3. `get_coh_summaries`
246 | Get recommendation summaries.
247 | 
248 | #### 4. `get_coh_recommendation`
249 | Get specific recommendation by ID.
250 | 
251 | ## Sample Conversation Flow
252 | **Configure AWS credentials**
253 | ```aws configure```
254 | 
255 | **Add the MCP server config to Amazon Q using the mcp_runbooks.json as a template**
256 | ```vi ~/.aws/amazonq/mcp.json```
257 | 
258 | ```bash
259 | # Start Q with runbooks
260 | q chat
261 | ```
262 | 
263 | **User:** "What cost optimization tools are available?"
264 | 
265 | **Q:** "I can see several AWS cost optimization tools including Cost Optimization Hub, runbooks for EC2, EBS, RDS, and Lambda optimization..."
266 | 
267 | **User:** "Run a comprehensive cost analysis for us-east-1"
268 | 
269 | **Q:** "I'll run a comprehensive cost analysis across all services for the us-east-1 region..."
270 | *[Uses comprehensive_analysis tool]*
271 | 
272 | **User:** "Show me unused EBS volumes that are costing money"
273 | 
274 | **Q:** "Let me identify unused EBS volumes in your account..."
275 | *[Uses ebs_unused tool]*
276 | 
277 | **User:** "Generate an EC2 right-sizing report in markdown format"
278 | 
279 | **Q:** "I'll generate a detailed EC2 right-sizing report in markdown format..."
280 | *[Uses ec2_report tool]*
281 | 
282 | ## Tool Names (For Reference)
283 | 
284 | The tool names have been shortened to fit MCP's 64-character limit:
285 | 
286 | | Purpose | Tool Name |
287 | |----------|----------|
288 | | `Run EC2 right sizing analysis` | `ec2_rightsizing` |
289 | | `Generate EC2 right sizing report` | `ec2_report` |
290 | | `Run EBS optimization analysis` | `ebs_optimization` |
291 | | `Identify unused EBS volumes` | `ebs_unused` |
292 | | `Generate EBS optimization report` | `ebs_report` |
293 | | `Run RDS optimization analysis` | `rds_optimization` |
294 | | `Iidentify idle RDS instances` | `rds_idle` |
295 | | `Generate RDS optimization report` | `rds_report` |
296 | | `Run Lambda optimization analysis` | `lambda_optimization` |
297 | | `Identify unused Lambda functions` | `lambda_unused` |
298 | | `Generate Lambda optimization report` | `lambda_report` |
299 | | `Run comprehensive cost analysis` | `comprehensive_analysis` |
300 | | `Get CloudTrail management trails` | `get_management_trails` |
301 | | `Run CloudTrail trails analysis` | `run_cloudtrail_trails_analysis` |
302 | | `Generate CloudTrail optimization report` | `generate_cloudtrail_report` |
303 | | `List Cost Optimization Hub enrollment statuses` | `list_coh_enrollment` |
304 | | `Get Cost Optimization Hub recommendations` | `get_coh_recommendations` |
305 | | `Get Cost Optimization Hub recommendation summaries` | `get_coh_summaries` |
306 | | `Get a particular Cost Optimization Hub recommendation` | `get_coh_recommendation` |
307 | 
308 | ## Troubleshooting
309 | 
310 | ### Common Issues
311 | 
312 | 1. **Import Error for playbook functions**
313 |    ```bash
314 |    # Make sure PYTHONPATH is set in mcp_runbooks.json
315 |    export PYTHONPATH="<replace-with-project-folder>"
316 |    ```
317 | 
318 | 2. **Cost Optimization Hub Errors**
319 |    ```bash
320 |    # Run the diagnostic first
321 |    python3 diagnose_cost_optimization_hub_v2.py
322 |    ```
```

--------------------------------------------------------------------------------
/utils/cache_decorator.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Caching Decorator with TTL Support
  3 | 
  4 | Provides DAO-level caching for CloudWatch service methods with:
  5 | - Time-to-live (TTL) support
  6 | - Page-aware cache keys
  7 | - Thread-safe implementation
  8 | - Memory-efficient LRU eviction
  9 | - Optional caching (can be disabled via environment variable or parameter)
 10 | """
 11 | 
 12 | import functools
 13 | import hashlib
 14 | import json
 15 | import logging
 16 | import os
 17 | import time
 18 | from typing import Any, Callable, Dict, Optional
 19 | from threading import Lock
 20 | 
 21 | logger = logging.getLogger(__name__)
 22 | 
 23 | # Global flag to enable/disable caching
 24 | # Can be controlled via environment variable: CFM_ENABLE_CACHE=false
 25 | _CACHE_ENABLED = os.getenv('CFM_ENABLE_CACHE', 'true').lower() in ('true', '1', 'yes')
 26 | 
 27 | 
 28 | class TTLCache:
 29 |     """Thread-safe cache with TTL support."""
 30 |     
 31 |     def __init__(self, ttl_seconds: int = 600, max_size: int = 1000):
 32 |         """
 33 |         Initialize TTL cache.
 34 |         
 35 |         Args:
 36 |             ttl_seconds: Time-to-live in seconds (default: 600 = 10 minutes)
 37 |             max_size: Maximum cache entries (default: 1000)
 38 |         """
 39 |         self.ttl_seconds = ttl_seconds
 40 |         self.max_size = max_size
 41 |         self.cache: Dict[str, tuple[Any, float]] = {}
 42 |         self.lock = Lock()
 43 |         self.hits = 0
 44 |         self.misses = 0
 45 |     
 46 |     def _is_expired(self, timestamp: float) -> bool:
 47 |         """Check if cache entry is expired."""
 48 |         return (time.time() - timestamp) > self.ttl_seconds
 49 |     
 50 |     def _evict_expired(self):
 51 |         """Remove expired entries."""
 52 |         current_time = time.time()
 53 |         expired_keys = [
 54 |             key for key, (_, timestamp) in self.cache.items()
 55 |             if (current_time - timestamp) > self.ttl_seconds
 56 |         ]
 57 |         for key in expired_keys:
 58 |             del self.cache[key]
 59 |     
 60 |     def _evict_lru(self):
 61 |         """Evict least recently used entries if cache is full."""
 62 |         if len(self.cache) >= self.max_size:
 63 |             # Remove oldest 10% of entries
 64 |             sorted_items = sorted(self.cache.items(), key=lambda x: x[1][1])
 65 |             num_to_remove = max(1, len(sorted_items) // 10)
 66 |             for key, _ in sorted_items[:num_to_remove]:
 67 |                 del self.cache[key]
 68 |     
 69 |     def get(self, key: str) -> Optional[Any]:
 70 |         """Get value from cache if not expired."""
 71 |         with self.lock:
 72 |             if key in self.cache:
 73 |                 value, timestamp = self.cache[key]
 74 |                 if not self._is_expired(timestamp):
 75 |                     self.hits += 1
 76 |                     logger.debug(f"Cache HIT for key: {key[:50]}...")
 77 |                     return value
 78 |                 else:
 79 |                     # Remove expired entry
 80 |                     del self.cache[key]
 81 |                     logger.debug(f"Cache EXPIRED for key: {key[:50]}...")
 82 |             
 83 |             self.misses += 1
 84 |             logger.debug(f"Cache MISS for key: {key[:50]}...")
 85 |             return None
 86 |     
 87 |     def set(self, key: str, value: Any):
 88 |         """Set value in cache with current timestamp."""
 89 |         with self.lock:
 90 |             # Evict expired entries periodically
 91 |             if len(self.cache) % 100 == 0:
 92 |                 self._evict_expired()
 93 |             
 94 |             # Evict LRU if cache is full
 95 |             self._evict_lru()
 96 |             
 97 |             self.cache[key] = (value, time.time())
 98 |             logger.debug(f"Cache SET for key: {key[:50]}...")
 99 |     
100 |     def clear(self):
101 |         """Clear all cache entries."""
102 |         with self.lock:
103 |             self.cache.clear()
104 |             self.hits = 0
105 |             self.misses = 0
106 |             logger.info("Cache cleared")
107 |     
108 |     def get_stats(self) -> Dict[str, Any]:
109 |         """Get cache statistics."""
110 |         with self.lock:
111 |             total_requests = self.hits + self.misses
112 |             hit_rate = (self.hits / total_requests * 100) if total_requests > 0 else 0
113 |             return {
114 |                 'size': len(self.cache),
115 |                 'max_size': self.max_size,
116 |                 'hits': self.hits,
117 |                 'misses': self.misses,
118 |                 'hit_rate': round(hit_rate, 2),
119 |                 'ttl_seconds': self.ttl_seconds
120 |             }
121 | 
122 | 
123 | # Global cache instance for CloudWatch DAO methods
124 | _cloudwatch_cache = TTLCache(ttl_seconds=600, max_size=1000)
125 | 
126 | 
127 | def _generate_cache_key(func_name: str, args: tuple, kwargs: dict) -> str:
128 |     """
129 |     Generate cache key from function name and arguments.
130 |     
131 |     Includes page parameter to ensure pagination is cached correctly.
132 |     """
133 |     # Extract key parameters
134 |     key_parts = [func_name]
135 |     
136 |     # Add positional args (excluding self)
137 |     if args and len(args) > 1:
138 |         key_parts.extend(str(arg) for arg in args[1:])
139 |     
140 |     # Add important kwargs (including page)
141 |     important_params = [
142 |         'region', 'page', 'lookback_days', 'namespace_filter',
143 |         'log_group_name_prefix', 'alarm_name_prefix', 'dashboard_name_prefix',
144 |         'can_spend_for_estimate', 'can_spend_for_exact_usage_estimate'
145 |     ]
146 |     
147 |     for param in important_params:
148 |         if param in kwargs:
149 |             key_parts.append(f"{param}={kwargs[param]}")
150 |     
151 |     # Create hash of key parts
152 |     key_string = "|".join(str(part) for part in key_parts)
153 |     key_hash = hashlib.md5(key_string.encode()).hexdigest()
154 |     
155 |     return f"{func_name}:{key_hash}"
156 | 
157 | 
158 | def dao_cache(ttl_seconds: int = 600, enabled: Optional[bool] = None):
159 |     """
160 |     Decorator for caching DAO method results with TTL.
161 |     
162 |     Caching can be disabled globally via CFM_ENABLE_CACHE environment variable
163 |     or per-decorator via the enabled parameter.
164 |     
165 |     Args:
166 |         ttl_seconds: Time-to-live in seconds (default: 600 = 10 minutes)
167 |         enabled: Override global cache setting (None = use global, True/False = force)
168 |     
169 |     Usage:
170 |         # Use global cache setting
171 |         @dao_cache(ttl_seconds=600)
172 |         async def get_log_groups(self, page: int = 1, **kwargs):
173 |             pass
174 |         
175 |         # Force caching disabled for this method
176 |         @dao_cache(ttl_seconds=600, enabled=False)
177 |         async def get_real_time_data(self, **kwargs):
178 |             pass
179 |         
180 |         # Force caching enabled for this method
181 |         @dao_cache(ttl_seconds=600, enabled=True)
182 |         async def get_expensive_data(self, **kwargs):
183 |             pass
184 |     
185 |     Environment Variables:
186 |         CFM_ENABLE_CACHE: Set to 'false', '0', or 'no' to disable caching globally
187 |     """
188 |     def decorator(func: Callable) -> Callable:
189 |         @functools.wraps(func)
190 |         async def async_wrapper(*args, **kwargs):
191 |             # Check if caching is enabled
192 |             cache_enabled = enabled if enabled is not None else _CACHE_ENABLED
193 |             
194 |             if not cache_enabled:
195 |                 logger.debug(f"Cache disabled for {func.__name__}, calling function directly")
196 |                 return await func(*args, **kwargs)
197 |             
198 |             # Generate cache key
199 |             cache_key = _generate_cache_key(func.__name__, args, kwargs)
200 |             
201 |             # Try to get from cache
202 |             cached_value = _cloudwatch_cache.get(cache_key)
203 |             if cached_value is not None:
204 |                 logger.debug(f"Returning cached result for {func.__name__}")
205 |                 return cached_value
206 |             
207 |             # Call original function
208 |             result = await func(*args, **kwargs)
209 |             
210 |             # Cache the result
211 |             _cloudwatch_cache.set(cache_key, result)
212 |             
213 |             return result
214 |         
215 |         @functools.wraps(func)
216 |         def sync_wrapper(*args, **kwargs):
217 |             # Check if caching is enabled
218 |             cache_enabled = enabled if enabled is not None else _CACHE_ENABLED
219 |             
220 |             if not cache_enabled:
221 |                 logger.debug(f"Cache disabled for {func.__name__}, calling function directly")
222 |                 return func(*args, **kwargs)
223 |             
224 |             # Generate cache key
225 |             cache_key = _generate_cache_key(func.__name__, args, kwargs)
226 |             
227 |             # Try to get from cache
228 |             cached_value = _cloudwatch_cache.get(cache_key)
229 |             if cached_value is not None:
230 |                 logger.debug(f"Returning cached result for {func.__name__}")
231 |                 return cached_value
232 |             
233 |             # Call original function
234 |             result = func(*args, **kwargs)
235 |             
236 |             # Cache the result
237 |             _cloudwatch_cache.set(cache_key, result)
238 |             
239 |             return result
240 |         
241 |         # Return appropriate wrapper based on function type
242 |         if asyncio.iscoroutinefunction(func):
243 |             return async_wrapper
244 |         else:
245 |             return sync_wrapper
246 |     
247 |     return decorator
248 | 
249 | 
250 | def get_cache_stats() -> Dict[str, Any]:
251 |     """Get cache statistics."""
252 |     stats = _cloudwatch_cache.get_stats()
253 |     stats['enabled'] = _CACHE_ENABLED
254 |     return stats
255 | 
256 | 
257 | def clear_cache():
258 |     """Clear all cache entries."""
259 |     _cloudwatch_cache.clear()
260 | 
261 | 
262 | def is_cache_enabled() -> bool:
263 |     """Check if caching is currently enabled."""
264 |     return _CACHE_ENABLED
265 | 
266 | 
267 | def enable_cache():
268 |     """Enable caching globally (runtime override)."""
269 |     global _CACHE_ENABLED
270 |     _CACHE_ENABLED = True
271 |     logger.info("Cache enabled globally")
272 | 
273 | 
274 | def disable_cache():
275 |     """Disable caching globally (runtime override)."""
276 |     global _CACHE_ENABLED
277 |     _CACHE_ENABLED = False
278 |     logger.info("Cache disabled globally")
279 | 
280 | 
281 | def set_cache_enabled(enabled: bool):
282 |     """
283 |     Set cache enabled state.
284 |     
285 |     Args:
286 |         enabled: True to enable caching, False to disable
287 |     """
288 |     global _CACHE_ENABLED
289 |     _CACHE_ENABLED = enabled
290 |     logger.info(f"Cache {'enabled' if enabled else 'disabled'} globally")
291 | 
292 | 
293 | # Import asyncio at the end to avoid circular imports
294 | import asyncio
295 | 
```

--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_mcp_surface_pagination.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Test MCP surface pagination with real parameters to analyze response structure.
  4 | """
  5 | 
  6 | import pytest
  7 | import sys
  8 | import os
  9 | import json
 10 | from datetime import datetime
 11 | 
 12 | # Add the project root to the path
 13 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))
 14 | 
 15 | 
 16 | @pytest.mark.skip(reason="Tests need refactoring to match actual API structure")
 17 | class TestMCPSurfacePagination:
 18 |     """Test MCP surface pagination with real parameters."""
 19 |     
 20 |     @pytest.mark.asyncio
 21 |     async def test_mcp_cloudwatch_general_spend_analysis_surface(self):
 22 |         """Test MCP surface call with real parameters to analyze response structure."""
 23 |         
 24 |         # Import the MCP function directly
 25 |         from runbook_functions import run_cloudwatch_general_spend_analysis
 26 |         
 27 |         # Test parameters from user request
 28 |         test_params = {
 29 |             "region": "us-east-1",
 30 |             "lookback_days": 30,
 31 |             "allow_minimal_cost_metrics": True,
 32 |             "page": 1
 33 |         }
 34 |         
 35 |         print(f"\n=== Testing MCP Surface Call ===")
 36 |         print(f"Parameters: {json.dumps(test_params, indent=2)}")
 37 |         
 38 |         # Call the actual MCP function
 39 |         result = await run_cloudwatch_general_spend_analysis(test_params)
 40 |         
 41 |         # Parse the response
 42 |         response_text = result[0].text
 43 |         response_data = json.loads(response_text)
 44 |         
 45 |         print(f"\n=== Response Structure Analysis ===")
 46 |         print(f"Response keys: {list(response_data.keys())}")
 47 |         
 48 |         # Check if pagination exists in the response
 49 |         if 'data' in response_data:
 50 |             print(f"Data keys: {list(response_data['data'].keys())}")
 51 |             
 52 |             # Look for pagination in different places
 53 |             pagination_locations = []
 54 |             
 55 |             # Check top-level data pagination
 56 |             if 'pagination' in response_data['data']:
 57 |                 pagination_locations.append('data.pagination')
 58 |                 print(f"Found pagination at data.pagination: {response_data['data']['pagination']}")
 59 |             
 60 |             # Check configuration_analysis sections
 61 |             if 'configuration_analysis' in response_data['data']:
 62 |                 config_analysis = response_data['data']['configuration_analysis']
 63 |                 print(f"Configuration analysis keys: {list(config_analysis.keys())}")
 64 |                 
 65 |                 for section_name, section_data in config_analysis.items():
 66 |                     if isinstance(section_data, dict) and 'pagination' in section_data:
 67 |                         pagination_locations.append(f'data.configuration_analysis.{section_name}.pagination')
 68 |                         print(f"Found pagination at data.configuration_analysis.{section_name}.pagination: {section_data['pagination']}")
 69 |             
 70 |             print(f"\nPagination found at locations: {pagination_locations}")
 71 |             
 72 |             # Check for items/data arrays
 73 |             items_locations = []
 74 |             if 'configuration_analysis' in response_data['data']:
 75 |                 config_analysis = response_data['data']['configuration_analysis']
 76 |                 for section_name, section_data in config_analysis.items():
 77 |                     if isinstance(section_data, dict):
 78 |                         if 'items' in section_data:
 79 |                             items_count = len(section_data['items']) if isinstance(section_data['items'], list) else 'not_list'
 80 |                             items_locations.append(f'data.configuration_analysis.{section_name}.items ({items_count} items)')
 81 |                         
 82 |                         # Check for specific data arrays
 83 |                         for data_key in ['log_groups', 'metrics', 'alarms', 'dashboards']:
 84 |                             if data_key in section_data and isinstance(section_data[data_key], list):
 85 |                                 items_count = len(section_data[data_key])
 86 |                                 items_locations.append(f'data.configuration_analysis.{section_name}.{data_key} ({items_count} items)')
 87 |             
 88 |             print(f"Items/data arrays found at: {items_locations}")
 89 |         
 90 |         # Check response metadata
 91 |         if 'runbook_metadata' in response_data:
 92 |             print(f"Runbook metadata keys: {list(response_data['runbook_metadata'].keys())}")
 93 |         
 94 |         if 'orchestrator_metadata' in response_data:
 95 |             print(f"Orchestrator metadata keys: {list(response_data['orchestrator_metadata'].keys())}")
 96 |         
 97 |         # Check for page-related fields at top level
 98 |         page_fields = []
 99 |         for key in response_data.keys():
100 |             if 'page' in key.lower() or 'pagination' in key.lower():
101 |                 page_fields.append(f"{key}: {response_data[key]}")
102 |         
103 |         if page_fields:
104 |             print(f"Top-level page-related fields: {page_fields}")
105 |         
106 |         # Print full response structure (truncated for readability)
107 |         print(f"\n=== Full Response Structure (first 2000 chars) ===")
108 |         response_str = json.dumps(response_data, indent=2, default=str)
109 |         print(response_str[:2000] + "..." if len(response_str) > 2000 else response_str)
110 |         
111 |         # Assertions to verify the response structure
112 |         assert isinstance(response_data, dict), "Response should be a dictionary"
113 |         assert 'status' in response_data, "Response should have status field"
114 |         assert 'data' in response_data, "Response should have data field"
115 |         
116 |         # Test passes if we get a valid response structure
117 |         print(f"\n=== Test Result ===")
118 |         print(f"✅ MCP surface call successful")
119 |         print(f"✅ Response structure analyzed")
120 |         print(f"✅ Pagination locations identified: {len(pagination_locations) if 'pagination_locations' in locals() else 0}")
121 |     
122 |     @pytest.mark.asyncio
123 |     async def test_mcp_cloudwatch_metrics_optimization_surface(self):
124 |         """Test MCP metrics optimization surface call."""
125 |         
126 |         from runbook_functions import run_cloudwatch_metrics_optimization
127 |         
128 |         test_params = {
129 |             "region": "us-east-1",
130 |             "lookback_days": 30,
131 |             "allow_minimal_cost_metrics": True,
132 |             "page": 1
133 |         }
134 |         
135 |         print(f"\n=== Testing Metrics Optimization MCP Surface Call ===")
136 |         
137 |         result = await run_cloudwatch_metrics_optimization(test_params)
138 |         response_data = json.loads(result[0].text)
139 |         
140 |         # Check for pagination in metrics response
141 |         pagination_found = False
142 |         if 'data' in response_data and 'configuration_analysis' in response_data['data']:
143 |             config_analysis = response_data['data']['configuration_analysis']
144 |             if 'metrics' in config_analysis and 'pagination' in config_analysis['metrics']:
145 |                 pagination_found = True
146 |                 pagination_info = config_analysis['metrics']['pagination']
147 |                 print(f"Metrics pagination: {pagination_info}")
148 |         
149 |         print(f"Metrics optimization pagination found: {pagination_found}")
150 |         assert isinstance(response_data, dict), "Metrics response should be a dictionary"
151 |     
152 |     @pytest.mark.asyncio
153 |     async def test_pagination_consistency_across_apis(self):
154 |         """Test pagination consistency across different CloudWatch APIs."""
155 |         
156 |         from runbook_functions import (
157 |             run_cloudwatch_general_spend_analysis,
158 |             run_cloudwatch_metrics_optimization,
159 |             run_cloudwatch_logs_optimization,
160 |             run_cloudwatch_alarms_and_dashboards_optimization
161 |         )
162 |         
163 |         test_params = {
164 |             "region": "us-east-1",
165 |             "lookback_days": 30,
166 |             "allow_minimal_cost_metrics": True,
167 |             "page": 1
168 |         }
169 |         
170 |         apis_to_test = [
171 |             ("general_spend", run_cloudwatch_general_spend_analysis),
172 |             ("metrics", run_cloudwatch_metrics_optimization),
173 |             ("logs", run_cloudwatch_logs_optimization),
174 |             ("alarms", run_cloudwatch_alarms_and_dashboards_optimization),
175 |         ]
176 |         
177 |         pagination_structures = {}
178 |         
179 |         for api_name, api_func in apis_to_test:
180 |             print(f"\n=== Testing {api_name} API ===")
181 |             
182 |             try:
183 |                 result = await api_func(test_params)
184 |                 response_data = json.loads(result[0].text)
185 |                 
186 |                 # Find pagination structures
187 |                 pagination_paths = []
188 |                 if 'data' in response_data and 'configuration_analysis' in response_data['data']:
189 |                     config_analysis = response_data['data']['configuration_analysis']
190 |                     for section_name, section_data in config_analysis.items():
191 |                         if isinstance(section_data, dict) and 'pagination' in section_data:
192 |                             pagination_paths.append(f"data.configuration_analysis.{section_name}.pagination")
193 |                 
194 |                 pagination_structures[api_name] = pagination_paths
195 |                 print(f"{api_name} pagination paths: {pagination_paths}")
196 |                 
197 |             except Exception as e:
198 |                 print(f"Error testing {api_name}: {str(e)}")
199 |                 pagination_structures[api_name] = f"ERROR: {str(e)}"
200 |         
201 |         print(f"\n=== Pagination Structure Summary ===")
202 |         for api_name, paths in pagination_structures.items():
203 |             print(f"{api_name}: {paths}")
204 |         
205 |         # Test passes if we collected pagination info from all APIs
206 |         assert len(pagination_structures) == len(apis_to_test), "Should test all APIs"
207 | 
208 | 
209 | if __name__ == '__main__':
210 |     pytest.main([__file__, '-v', '-s'])  # -s to show print statements
```

--------------------------------------------------------------------------------
/utils/cleanup_manager.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Cleanup Manager for CFM Tips MCP Server
  3 | 
  4 | Handles automatic cleanup of sessions, results, and temporary data.
  5 | """
  6 | 
  7 | import logging
  8 | import threading
  9 | import time
 10 | import os
 11 | import glob
 12 | from datetime import datetime, timedelta
 13 | from typing import Dict, Any, Optional
 14 | 
 15 | logger = logging.getLogger(__name__)
 16 | 
 17 | class CleanupManager:
 18 |     """Manages automatic cleanup of sessions and temporary data."""
 19 |     
 20 |     def __init__(self, 
 21 |                  session_timeout_minutes: int = 60,
 22 |                  result_retention_minutes: int = 120,
 23 |                  cleanup_interval_minutes: int = 15):
 24 |         self.session_timeout_minutes = session_timeout_minutes
 25 |         self.result_retention_minutes = result_retention_minutes
 26 |         self.cleanup_interval_minutes = cleanup_interval_minutes
 27 |         self._shutdown = False
 28 |         self._cleanup_thread = None
 29 |         
 30 |         # Start cleanup thread
 31 |         self._start_cleanup_thread()
 32 |     
 33 |     def _start_cleanup_thread(self):
 34 |         """Start the background cleanup thread."""
 35 |         if self._cleanup_thread is None or not self._cleanup_thread.is_alive():
 36 |             self._cleanup_thread = threading.Thread(
 37 |                 target=self._cleanup_worker,
 38 |                 daemon=True,
 39 |                 name="CleanupManager"
 40 |             )
 41 |             self._cleanup_thread.start()
 42 |             logger.info(f"Cleanup manager started (session timeout: {self.session_timeout_minutes}min)")
 43 |     
 44 |     def _cleanup_worker(self):
 45 |         """Background worker for periodic cleanup."""
 46 |         while not self._shutdown:
 47 |             try:
 48 |                 self._perform_cleanup()
 49 |                 time.sleep(self.cleanup_interval_minutes * 60)
 50 |             except Exception as e:
 51 |                 logger.error(f"Error in cleanup worker: {e}")
 52 |                 time.sleep(60)  # Wait 1 minute on error
 53 |     
 54 |     def _perform_cleanup(self):
 55 |         """Perform all cleanup operations."""
 56 |         logger.debug("Starting periodic cleanup")
 57 |         
 58 |         # Clean up session files
 59 |         self._cleanup_session_files()
 60 |         
 61 |         # Clean up temporary files
 62 |         self._cleanup_temp_files()
 63 |         
 64 |         # Clean up old log files
 65 |         self._cleanup_log_files()
 66 |         
 67 |         logger.debug("Periodic cleanup completed")
 68 |     
 69 |     def _cleanup_session_files(self):
 70 |         """Clean up old session database files."""
 71 |         try:
 72 |             sessions_dir = "sessions"
 73 |             if not os.path.exists(sessions_dir):
 74 |                 return
 75 |             
 76 |             cutoff_time = datetime.now() - timedelta(minutes=self.session_timeout_minutes)
 77 |             cleaned_count = 0
 78 |             
 79 |             # Find all session database files
 80 |             session_files = glob.glob(os.path.join(sessions_dir, "session_*.db"))
 81 |             
 82 |             for session_file in session_files:
 83 |                 try:
 84 |                     # Check file modification time
 85 |                     file_mtime = datetime.fromtimestamp(os.path.getmtime(session_file))
 86 |                     
 87 |                     if file_mtime < cutoff_time:
 88 |                         # Remove old session file
 89 |                         os.remove(session_file)
 90 |                         cleaned_count += 1
 91 |                         logger.debug(f"Cleaned up old session file: {session_file}")
 92 |                         
 93 |                         # Also remove any associated WAL and SHM files
 94 |                         for ext in ['-wal', '-shm']:
 95 |                             wal_file = session_file + ext
 96 |                             if os.path.exists(wal_file):
 97 |                                 os.remove(wal_file)
 98 |                 
 99 |                 except Exception as e:
100 |                     logger.warning(f"Error cleaning session file {session_file}: {e}")
101 |             
102 |             if cleaned_count > 0:
103 |                 logger.info(f"Cleaned up {cleaned_count} old session files")
104 |                 
105 |         except Exception as e:
106 |             logger.error(f"Error in session file cleanup: {e}")
107 |     
108 |     def _cleanup_temp_files(self):
109 |         """Clean up temporary files and directories."""
110 |         try:
111 |             temp_patterns = [
112 |                 "*.tmp",
113 |                 "*.temp",
114 |                 "__pycache__/*.pyc",
115 |                 "*.log.old"
116 |             ]
117 |             
118 |             cleaned_count = 0
119 |             
120 |             for pattern in temp_patterns:
121 |                 temp_files = glob.glob(pattern, recursive=True)
122 |                 
123 |                 for temp_file in temp_files:
124 |                     try:
125 |                         # Check if file is older than retention period
126 |                         file_mtime = datetime.fromtimestamp(os.path.getmtime(temp_file))
127 |                         cutoff_time = datetime.now() - timedelta(minutes=self.result_retention_minutes)
128 |                         
129 |                         if file_mtime < cutoff_time:
130 |                             if os.path.isfile(temp_file):
131 |                                 os.remove(temp_file)
132 |                                 cleaned_count += 1
133 |                             elif os.path.isdir(temp_file):
134 |                                 import shutil
135 |                                 shutil.rmtree(temp_file)
136 |                                 cleaned_count += 1
137 |                     
138 |                     except Exception as e:
139 |                         logger.warning(f"Error cleaning temp file {temp_file}: {e}")
140 |             
141 |             if cleaned_count > 0:
142 |                 logger.info(f"Cleaned up {cleaned_count} temporary files")
143 |                 
144 |         except Exception as e:
145 |             logger.error(f"Error in temp file cleanup: {e}")
146 |     
147 |     def _cleanup_log_files(self):
148 |         """Clean up old log files."""
149 |         try:
150 |             logs_dir = "logs"
151 |             if not os.path.exists(logs_dir):
152 |                 return
153 |             
154 |             # Keep logs for 7 days
155 |             cutoff_time = datetime.now() - timedelta(days=7)
156 |             cleaned_count = 0
157 |             
158 |             log_files = glob.glob(os.path.join(logs_dir, "*.log.*"))
159 |             
160 |             for log_file in log_files:
161 |                 try:
162 |                     file_mtime = datetime.fromtimestamp(os.path.getmtime(log_file))
163 |                     
164 |                     if file_mtime < cutoff_time:
165 |                         os.remove(log_file)
166 |                         cleaned_count += 1
167 |                         logger.debug(f"Cleaned up old log file: {log_file}")
168 |                 
169 |                 except Exception as e:
170 |                     logger.warning(f"Error cleaning log file {log_file}: {e}")
171 |             
172 |             if cleaned_count > 0:
173 |                 logger.info(f"Cleaned up {cleaned_count} old log files")
174 |                 
175 |         except Exception as e:
176 |             logger.error(f"Error in log file cleanup: {e}")
177 |     
178 |     def force_cleanup(self):
179 |         """Force immediate cleanup of all resources."""
180 |         logger.info("Forcing immediate cleanup")
181 |         self._perform_cleanup()
182 |         
183 |         # Also clean up from session manager and parallel executor
184 |         try:
185 |             from . import get_session_manager, get_parallel_executor
186 |             
187 |             # Clean up session manager
188 |             session_manager = get_session_manager()
189 |             session_manager._cleanup_expired_sessions()
190 |             
191 |             # Clean up parallel executor results
192 |             executor = get_parallel_executor()
193 |             executor.clear_results(older_than_minutes=self.result_retention_minutes)
194 |             
195 |             logger.info("Force cleanup completed")
196 |             
197 |         except Exception as e:
198 |             logger.error(f"Error in force cleanup: {e}")
199 |     
200 |     def get_cleanup_stats(self) -> Dict[str, Any]:
201 |         """Get statistics about cleanup operations."""
202 |         try:
203 |             stats = {
204 |                 'session_timeout_minutes': self.session_timeout_minutes,
205 |                 'result_retention_minutes': self.result_retention_minutes,
206 |                 'cleanup_interval_minutes': self.cleanup_interval_minutes,
207 |                 'cleanup_thread_alive': self._cleanup_thread.is_alive() if self._cleanup_thread else False,
208 |                 'sessions_directory_exists': os.path.exists('sessions'),
209 |                 'logs_directory_exists': os.path.exists('logs')
210 |             }
211 |             
212 |             # Count current files
213 |             if os.path.exists('sessions'):
214 |                 session_files = glob.glob('sessions/session_*.db')
215 |                 stats['active_session_files'] = len(session_files)
216 |             else:
217 |                 stats['active_session_files'] = 0
218 |             
219 |             if os.path.exists('logs'):
220 |                 log_files = glob.glob('logs/*.log*')
221 |                 stats['log_files'] = len(log_files)
222 |             else:
223 |                 stats['log_files'] = 0
224 |             
225 |             return stats
226 |             
227 |         except Exception as e:
228 |             logger.error(f"Error getting cleanup stats: {e}")
229 |             return {'error': str(e)}
230 |     
231 |     def update_settings(self, 
232 |                        session_timeout_minutes: Optional[int] = None,
233 |                        result_retention_minutes: Optional[int] = None,
234 |                        cleanup_interval_minutes: Optional[int] = None):
235 |         """Update cleanup settings."""
236 |         if session_timeout_minutes is not None:
237 |             self.session_timeout_minutes = session_timeout_minutes
238 |             logger.info(f"Updated session timeout to {session_timeout_minutes} minutes")
239 |         
240 |         if result_retention_minutes is not None:
241 |             self.result_retention_minutes = result_retention_minutes
242 |             logger.info(f"Updated result retention to {result_retention_minutes} minutes")
243 |         
244 |         if cleanup_interval_minutes is not None:
245 |             self.cleanup_interval_minutes = cleanup_interval_minutes
246 |             logger.info(f"Updated cleanup interval to {cleanup_interval_minutes} minutes")
247 |     
248 |     def shutdown(self):
249 |         """Shutdown the cleanup manager."""
250 |         logger.info("Shutting down cleanup manager")
251 |         self._shutdown = True
252 |         
253 |         # Perform final cleanup
254 |         try:
255 |             self._perform_cleanup()
256 |         except Exception as e:
257 |             logger.error(f"Error in final cleanup: {e}")
258 |         
259 |         # Wait for cleanup thread to finish
260 |         if self._cleanup_thread and self._cleanup_thread.is_alive():
261 |             self._cleanup_thread.join(timeout=10)
262 | 
263 | # Global cleanup manager instance
264 | _cleanup_manager = None
265 | 
266 | def get_cleanup_manager() -> CleanupManager:
267 |     """Get the global cleanup manager instance."""
268 |     global _cleanup_manager
269 |     if _cleanup_manager is None:
270 |         _cleanup_manager = CleanupManager()
271 |     return _cleanup_manager
```

--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_cloudwatch_pagination_architecture.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Test CloudWatch Pagination Architecture
  4 | 
  5 | This test validates that the CloudWatch pagination system works correctly:
  6 | 1. Fetches ALL data from AWS using proper NextToken pagination
  7 | 2. Sorts client-side by estimated cost (descending)
  8 | 3. Applies client-side pagination for MCP responses
  9 | 
 10 | The key insight: "Pagination breaking" isn't an API error - it's the correct
 11 | architecture handling large datasets that may cause performance issues.
 12 | """
 13 | 
 14 | import pytest
 15 | import asyncio
 16 | from unittest.mock import Mock, patch, AsyncMock
 17 | from playbooks.cloudwatch.result_processor import CloudWatchResultProcessor
 18 | from services.cloudwatch_service import CloudWatchService, CloudWatchOperationResult
 19 | 
 20 | 
 21 | class TestCloudWatchPaginationArchitecture:
 22 |     """Test the CloudWatch pagination architecture end-to-end."""
 23 |     
 24 |     def setup_method(self):
 25 |         """Set up test fixtures."""
 26 |         self.result_processor = CloudWatchResultProcessor()
 27 |     
 28 |     def test_pagination_metadata_calculation(self):
 29 |         """Test that pagination metadata is calculated correctly."""
 30 |         # Test with 25 items, page size 10
 31 |         total_items = 25
 32 |         
 33 |         # Page 1: items 0-9
 34 |         metadata_p1 = self.result_processor.create_pagination_metadata(total_items, 1)
 35 |         assert metadata_p1.current_page == 1
 36 |         assert metadata_p1.page_size == 10
 37 |         assert metadata_p1.total_items == 25
 38 |         assert metadata_p1.total_pages == 3
 39 |         assert metadata_p1.has_next_page == True
 40 |         assert metadata_p1.has_previous_page == False
 41 |         
 42 |         # Page 2: items 10-19
 43 |         metadata_p2 = self.result_processor.create_pagination_metadata(total_items, 2)
 44 |         assert metadata_p2.current_page == 2
 45 |         assert metadata_p2.has_next_page == True
 46 |         assert metadata_p2.has_previous_page == True
 47 |         
 48 |         # Page 3: items 20-24 (partial page)
 49 |         metadata_p3 = self.result_processor.create_pagination_metadata(total_items, 3)
 50 |         assert metadata_p3.current_page == 3
 51 |         assert metadata_p3.has_next_page == False
 52 |         assert metadata_p3.has_previous_page == True
 53 |         
 54 |         # Page 4: beyond data (empty)
 55 |         metadata_p4 = self.result_processor.create_pagination_metadata(total_items, 4)
 56 |         assert metadata_p4.current_page == 4
 57 |         assert metadata_p4.has_next_page == False
 58 |         assert metadata_p4.has_previous_page == True
 59 |     
 60 |     def test_client_side_pagination_slicing(self):
 61 |         """Test that client-side pagination slices data correctly."""
 62 |         # Create test data
 63 |         items = [{'id': i, 'name': f'item_{i}'} for i in range(25)]
 64 |         
 65 |         # Test page 1 (items 0-9)
 66 |         result_p1 = self.result_processor.paginate_results(items, 1)
 67 |         assert len(result_p1['items']) == 10
 68 |         assert result_p1['items'][0]['id'] == 0
 69 |         assert result_p1['items'][9]['id'] == 9
 70 |         assert result_p1['pagination']['current_page'] == 1
 71 |         assert result_p1['pagination']['total_pages'] == 3
 72 |         
 73 |         # Test page 2 (items 10-19)
 74 |         result_p2 = self.result_processor.paginate_results(items, 2)
 75 |         assert len(result_p2['items']) == 10
 76 |         assert result_p2['items'][0]['id'] == 10
 77 |         assert result_p2['items'][9]['id'] == 19
 78 |         
 79 |         # Test page 3 (items 20-24, partial page)
 80 |         result_p3 = self.result_processor.paginate_results(items, 3)
 81 |         assert len(result_p3['items']) == 5
 82 |         assert result_p3['items'][0]['id'] == 20
 83 |         assert result_p3['items'][4]['id'] == 24
 84 |         
 85 |         # Test page 4 (beyond data, empty)
 86 |         result_p4 = self.result_processor.paginate_results(items, 4)
 87 |         assert len(result_p4['items']) == 0
 88 |         assert result_p4['pagination']['current_page'] == 4
 89 |     
 90 |     def test_cost_based_sorting_before_pagination(self):
 91 |         """Test that items are sorted by cost before pagination."""
 92 |         # Create test metrics with different estimated costs
 93 |         metrics = [
 94 |             {'MetricName': 'LowCost', 'Namespace': 'AWS/EC2', 'Dimensions': []},
 95 |             {'MetricName': 'HighCost', 'Namespace': 'Custom/App', 'Dimensions': [{'Name': 'Instance', 'Value': 'i-123'}]},
 96 |             {'MetricName': 'MediumCost', 'Namespace': 'AWS/Lambda', 'Dimensions': [{'Name': 'Function', 'Value': 'test'}]},
 97 |         ]
 98 |         
 99 |         # Process with cost enrichment and sorting
100 |         enriched = self.result_processor.enrich_items_with_cost_estimates(metrics, 'metrics')
101 |         sorted_metrics = self.result_processor.sort_by_cost_descending(enriched)
102 |         
103 |         # Verify that enrichment adds cost estimates
104 |         assert all('estimated_monthly_cost' in metric for metric in sorted_metrics)
105 |         
106 |         # Verify sorting works (items are in descending cost order)
107 |         costs = [metric['estimated_monthly_cost'] for metric in sorted_metrics]
108 |         assert costs == sorted(costs, reverse=True)  # Should be in descending order
109 |         
110 |         # Verify custom namespace gets higher cost estimate than AWS namespaces
111 |         custom_metrics = [m for m in sorted_metrics if not m['Namespace'].startswith('AWS/')]
112 |         aws_metrics = [m for m in sorted_metrics if m['Namespace'].startswith('AWS/')]
113 |         
114 |         if custom_metrics and aws_metrics:
115 |             # Custom metrics should generally have higher costs than AWS metrics
116 |             max_custom_cost = max(m['estimated_monthly_cost'] for m in custom_metrics)
117 |             max_aws_cost = max(m['estimated_monthly_cost'] for m in aws_metrics)
118 |             # Note: This might be 0.0 for both in test environment, which is fine
119 |     
120 |     @pytest.mark.skip(reason="Test needs refactoring - mock setup is incorrect")
121 |     @pytest.mark.asyncio
122 |     async def test_aws_pagination_architecture(self):
123 |         """Test that AWS API pagination works correctly (NextToken only)."""
124 |         
125 |         # Mock CloudWatch service
126 |         mock_cloudwatch_service = Mock(spec=CloudWatchService)
127 |         
128 |         # Mock paginated response from AWS
129 |         mock_response_page1 = CloudWatchOperationResult(
130 |             success=True,
131 |             data={
132 |                 'metrics': [{'MetricName': f'Metric_{i}', 'Namespace': 'AWS/EC2'} for i in range(500)],
133 |                 'total_count': 500,
134 |                 'filtered': False
135 |             },
136 |             operation_name='list_metrics'
137 |         )
138 |         
139 |         mock_cloudwatch_service.list_metrics.return_value = mock_response_page1
140 |         
141 |         # Test that service is called correctly (no MaxRecords parameter)
142 |         result = await mock_cloudwatch_service.list_metrics(namespace='AWS/EC2')
143 |         
144 |         # Verify the call was made without MaxRecords
145 |         mock_cloudwatch_service.list_metrics.assert_called_once_with(namespace='AWS/EC2')
146 |         
147 |         # Verify we got the expected data structure
148 |         assert result.success == True
149 |         assert len(result.data['metrics']) == 500
150 |         assert result.data['total_count'] == 500
151 |     
152 |     def test_pagination_architecture_documentation(self):
153 |         """Document the pagination architecture for future reference."""
154 |         
155 |         architecture_doc = {
156 |             "cloudwatch_pagination_architecture": {
157 |                 "step_1_aws_fetch": {
158 |                     "description": "Fetch ALL data from AWS using proper NextToken pagination",
159 |                     "method": "AWS paginator with NextToken (no MaxRecords)",
160 |                     "apis_used": ["list_metrics", "describe_alarms", "describe_log_groups"],
161 |                     "result": "Complete dataset in arbitrary AWS order"
162 |                 },
163 |                 "step_2_client_sort": {
164 |                     "description": "Sort client-side by estimated cost (descending)",
165 |                     "method": "Cost estimation using free metadata + sorting",
166 |                     "cost": "Zero additional API calls",
167 |                     "result": "Dataset ordered by cost (highest first)"
168 |                 },
169 |                 "step_3_client_paginate": {
170 |                     "description": "Apply client-side pagination for MCP response",
171 |                     "method": "Array slicing with 10 items per page",
172 |                     "page_size": 10,
173 |                     "result": "Paginated response with metadata"
174 |                 }
175 |             },
176 |             "why_this_architecture": {
177 |                 "aws_limitation": "AWS APIs return data in arbitrary order, not by cost",
178 |                 "sorting_requirement": "Users want to see highest-cost items first",
179 |                 "solution": "Fetch all, sort by cost, then paginate for display"
180 |             },
181 |             "performance_considerations": {
182 |                 "large_datasets": "4000+ metrics may cause timeouts",
183 |                 "memory_usage": "All data loaded into memory for sorting",
184 |                 "optimization": "Caching and progressive loading implemented"
185 |             }
186 |         }
187 |         
188 |         # This test passes if the architecture is documented
189 |         assert architecture_doc["cloudwatch_pagination_architecture"]["step_1_aws_fetch"]["method"] == "AWS paginator with NextToken (no MaxRecords)"
190 |         assert architecture_doc["cloudwatch_pagination_architecture"]["step_3_client_paginate"]["page_size"] == 10
191 |     
192 |     def test_edge_cases_pagination(self):
193 |         """Test edge cases in pagination."""
194 |         
195 |         # Empty dataset
196 |         empty_result = self.result_processor.paginate_results([], 1)
197 |         assert len(empty_result['items']) == 0
198 |         assert empty_result['pagination']['total_pages'] == 0
199 |         assert empty_result['pagination']['has_next_page'] == False
200 |         
201 |         # Single item
202 |         single_item = [{'id': 1}]
203 |         single_result = self.result_processor.paginate_results(single_item, 1)
204 |         assert len(single_result['items']) == 1
205 |         assert single_result['pagination']['total_pages'] == 1
206 |         
207 |         # Exactly page size (10 items)
208 |         exact_page = [{'id': i} for i in range(10)]
209 |         exact_result = self.result_processor.paginate_results(exact_page, 1)
210 |         assert len(exact_result['items']) == 10
211 |         assert exact_result['pagination']['total_pages'] == 1
212 |         assert exact_result['pagination']['has_next_page'] == False
213 |         
214 |         # Invalid page numbers
215 |         items = [{'id': i} for i in range(5)]
216 |         
217 |         # Page 0 should default to page 1
218 |         page_0_result = self.result_processor.paginate_results(items, 0)
219 |         assert page_0_result['pagination']['current_page'] == 1
220 |         
221 |         # Negative page should default to page 1
222 |         negative_result = self.result_processor.paginate_results(items, -1)
223 |         assert negative_result['pagination']['current_page'] == 1
224 | 
225 | 
226 | if __name__ == "__main__":
227 |     pytest.main([__file__, "-v"])
```

--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_mcp_pagination_bug.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Unit tests to identify MCP pagination bug in CloudWatch metrics optimization.
  3 | 
  4 | Tests the complete flow from MCP tool call through orchestrator to result processor
  5 | to identify where pagination is being bypassed.
  6 | """
  7 | 
  8 | import pytest
  9 | import asyncio
 10 | import json
 11 | from unittest.mock import patch, AsyncMock, MagicMock
 12 | from mcp.types import TextContent
 13 | 
 14 | # Import the functions under test
 15 | from runbook_functions import run_cloudwatch_metrics_optimization
 16 | 
 17 | 
 18 | @pytest.mark.skip(reason="Tests need refactoring to match actual API structure")
 19 | class TestMCPPaginationBug:
 20 |     """Test suite to identify where pagination is failing in the MCP flow."""
 21 |     
 22 |     @pytest.fixture
 23 |     def mock_large_metrics_dataset(self):
 24 |         """Create a large dataset of 25 metrics for pagination testing."""
 25 |         return [
 26 |             {
 27 |                 'MetricName': f'CustomMetric{i:02d}',
 28 |                 'Namespace': 'Custom/Application' if i % 2 == 0 else 'AWS/EC2',
 29 |                 'Dimensions': [{'Name': 'InstanceId', 'Value': f'i-{i:08x}'}],
 30 |                 'estimated_monthly_cost': 10.0 - (i * 0.2)  # Decreasing cost
 31 |             }
 32 |             for i in range(25)
 33 |         ]
 34 |     
 35 |     @pytest.fixture
 36 |     def mock_orchestrator_response(self, mock_large_metrics_dataset):
 37 |         """Mock orchestrator response with proper structure."""
 38 |         def create_response(page=1):
 39 |             # Simulate orchestrator pagination - should return only 10 items per page
 40 |             start_idx = (page - 1) * 10
 41 |             end_idx = start_idx + 10
 42 |             page_metrics = mock_large_metrics_dataset[start_idx:end_idx]
 43 |             
 44 |             return {
 45 |                 'status': 'success',
 46 |                 'data': {
 47 |                     'metrics_configuration_analysis': {
 48 |                         'metrics': {
 49 |                             'metrics': page_metrics,
 50 |                             'pagination': {
 51 |                                 'current_page': page,
 52 |                                 'page_size': 10,
 53 |                                 'total_items': 25,
 54 |                                 'total_pages': 3,
 55 |                                 'has_next_page': page < 3,
 56 |                                 'has_previous_page': page > 1
 57 |                             },
 58 |                             'total_count': 25,
 59 |                             'namespace': 'all',
 60 |                             'filtered': False
 61 |                         }
 62 |                     }
 63 |                 },
 64 |                 'orchestrator_metadata': {
 65 |                     'session_id': 'test-session',
 66 |                     'region': 'us-east-1'
 67 |                 }
 68 |             }
 69 |         return create_response
 70 |     
 71 |     @pytest.mark.asyncio
 72 |     async def test_orchestrator_pagination_works(self, mock_orchestrator_response):
 73 |         """Test that orchestrator correctly applies pagination."""
 74 |         
 75 |         with patch('runbook_functions.CloudWatchOptimizationOrchestrator') as mock_orchestrator_class:
 76 |             # Setup mock orchestrator instance
 77 |             mock_orchestrator = AsyncMock()
 78 |             mock_orchestrator_class.return_value = mock_orchestrator
 79 |             
 80 |             # Mock execute_analysis to return paginated responses
 81 |             def mock_execute_analysis(analysis_type, **kwargs):
 82 |                 page = kwargs.get('page', 1)
 83 |                 return mock_orchestrator_response(page)
 84 |             
 85 |             mock_orchestrator.execute_analysis = AsyncMock(side_effect=mock_execute_analysis)
 86 |             
 87 |             # Test page 1
 88 |             result_p1 = await run_cloudwatch_metrics_optimization({
 89 |                 'region': 'us-east-1',
 90 |                 'page': 1,
 91 |                 'lookback_days': 30
 92 |             })
 93 |             
 94 |             # Verify result structure
 95 |             assert len(result_p1) == 1
 96 |             assert isinstance(result_p1[0], TextContent)
 97 |             
 98 |             # Parse JSON response
 99 |             data_p1 = json.loads(result_p1[0].text)
100 |             assert data_p1['status'] == 'success'
101 |             
102 |             # Check metrics data
103 |             metrics_data_p1 = data_p1['data']['metrics_configuration_analysis']['metrics']
104 |             assert len(metrics_data_p1['metrics']) == 10, "Page 1 should have 10 metrics"
105 |             assert metrics_data_p1['pagination']['current_page'] == 1
106 |             assert metrics_data_p1['pagination']['total_items'] == 25
107 |             
108 |             # Test page 2
109 |             result_p2 = await run_cloudwatch_metrics_optimization({
110 |                 'region': 'us-east-1',
111 |                 'page': 2,
112 |                 'lookback_days': 30
113 |             })
114 |             
115 |             data_p2 = json.loads(result_p2[0].text)
116 |             metrics_data_p2 = data_p2['data']['metrics_configuration_analysis']['metrics']
117 |             assert len(metrics_data_p2['metrics']) == 10, "Page 2 should have 10 metrics"
118 |             assert metrics_data_p2['pagination']['current_page'] == 2
119 |             
120 |             # Verify different metrics on different pages
121 |             p1_names = [m['MetricName'] for m in metrics_data_p1['metrics']]
122 |             p2_names = [m['MetricName'] for m in metrics_data_p2['metrics']]
123 |             assert p1_names != p2_names, "Page 1 and Page 2 should have different metrics"
124 |             
125 |             # Verify orchestrator was called with correct parameters
126 |             assert mock_orchestrator.execute_analysis.call_count == 2
127 |             
128 |             # Check first call (page 1)
129 |             first_call_args = mock_orchestrator.execute_analysis.call_args_list[0]
130 |             assert first_call_args[0][0] == 'metrics_optimization'
131 |             assert first_call_args[1]['page'] == 1
132 |             
133 |             # Check second call (page 2)  
134 |             second_call_args = mock_orchestrator.execute_analysis.call_args_list[1]
135 |             assert second_call_args[1]['page'] == 2
136 |     
137 |     @pytest.mark.asyncio
138 |     async def test_mcp_tool_bypasses_pagination(self):
139 |         """Test to identify if MCP tool is bypassing orchestrator pagination."""
140 |         
141 |         # This test will help identify if there's a direct MCP call bypassing the orchestrator
142 |         with patch('runbook_functions.CloudWatchOptimizationOrchestrator') as mock_orchestrator_class:
143 |             mock_orchestrator = AsyncMock()
144 |             mock_orchestrator_class.return_value = mock_orchestrator
145 |             
146 |             # Mock orchestrator to return a response indicating it was called
147 |             mock_orchestrator.execute_analysis = AsyncMock(return_value={
148 |                 'status': 'success',
149 |                 'data': {
150 |                     'metrics_configuration_analysis': {
151 |                         'metrics': {
152 |                             'metrics': [],
153 |                             'pagination': {'current_page': 1, 'total_items': 0},
154 |                             'orchestrator_called': True  # Flag to verify orchestrator was used
155 |                         }
156 |                     }
157 |                 }
158 |             })
159 |             
160 |             # Also patch any potential direct MCP calls
161 |             with patch('runbook_functions.mcp_cfm_tips_cloudwatch_metrics_optimization') as mock_mcp:
162 |                 mock_mcp.return_value = {
163 |                     'status': 'success', 
164 |                     'data': {'direct_mcp_call': True}  # Flag to identify direct MCP call
165 |                 }
166 |                 
167 |                 result = await run_cloudwatch_metrics_optimization({
168 |                     'region': 'us-east-1',
169 |                     'page': 1
170 |                 })
171 |                 
172 |                 # Parse result
173 |                 data = json.loads(result[0].text)
174 |                 
175 |                 # Check if orchestrator was used (expected behavior)
176 |                 if 'orchestrator_called' in str(data):
177 |                     print("✅ Orchestrator was called - pagination should work")
178 |                     assert mock_orchestrator.execute_analysis.called
179 |                     assert not mock_mcp.called, "Direct MCP call should not be made"
180 |                 
181 |                 # Check if direct MCP call was made (bug scenario)
182 |                 elif 'direct_mcp_call' in str(data):
183 |                     pytest.fail("❌ BUG IDENTIFIED: Direct MCP call bypassing orchestrator pagination")
184 |                 
185 |                 else:
186 |                     pytest.fail("❌ Unable to determine call path - check test setup")
187 |     
188 |     @pytest.mark.asyncio 
189 |     async def test_result_processor_pagination(self):
190 |         """Test that result processor correctly paginates metrics."""
191 |         
192 |         from playbooks.cloudwatch.result_processor import CloudWatchResultProcessor
193 |         
194 |         # Create test metrics
195 |         test_metrics = [
196 |             {'MetricName': f'Metric{i}', 'estimated_monthly_cost': 10 - i}
197 |             for i in range(25)
198 |         ]
199 |         
200 |         processor = CloudWatchResultProcessor()
201 |         
202 |         # Test page 1
203 |         result_p1 = processor.process_metrics_results(test_metrics, page=1)
204 |         assert len(result_p1['items']) == 10
205 |         assert result_p1['pagination']['current_page'] == 1
206 |         assert result_p1['pagination']['total_items'] == 25
207 |         
208 |         # Test page 2
209 |         result_p2 = processor.process_metrics_results(test_metrics, page=2)
210 |         assert len(result_p2['items']) == 10
211 |         assert result_p2['pagination']['current_page'] == 2
212 |         
213 |         # Verify different items
214 |         p1_names = [item['MetricName'] for item in result_p1['items']]
215 |         p2_names = [item['MetricName'] for item in result_p2['items']]
216 |         assert p1_names != p2_names
217 |     
218 |     @pytest.mark.asyncio
219 |     async def test_orchestrator_apply_result_processing(self):
220 |         """Test that orchestrator's _apply_result_processing works correctly."""
221 |         
222 |         from playbooks.cloudwatch.optimization_orchestrator import CloudWatchOptimizationOrchestrator
223 |         
224 |         # Create mock result with metrics
225 |         mock_result = {
226 |             'status': 'success',
227 |             'data': {
228 |                 'metrics_configuration_analysis': {
229 |                     'metrics': {
230 |                         'metrics': [
231 |                             {'MetricName': f'Metric{i}', 'estimated_monthly_cost': 10 - i}
232 |                             for i in range(25)
233 |                         ],
234 |                         'total_count': 25
235 |                     }
236 |                 }
237 |             }
238 |         }
239 |         
240 |         orchestrator = CloudWatchOptimizationOrchestrator(region='us-east-1')
241 |         
242 |         # Test pagination application
243 |         processed_result = orchestrator._apply_result_processing(mock_result, page=1)
244 |         
245 |         metrics_data = processed_result['data']['metrics_configuration_analysis']['metrics']
246 |         assert len(metrics_data['metrics']) == 10, "Should be paginated to 10 items"
247 |         assert 'pagination' in metrics_data, "Should have pagination metadata"
248 |         assert metrics_data['pagination']['current_page'] == 1
249 |         assert metrics_data['pagination']['total_items'] == 25
```

--------------------------------------------------------------------------------
/diagnose_cost_optimization_hub_v2.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Diagnostic script for Cost Optimization Hub issues - Updated with correct permissions
  4 | """
  5 | 
  6 | import boto3
  7 | import json
  8 | from botocore.exceptions import ClientError
  9 | 
 10 | def check_cost_optimization_hub():
 11 |     """Check Cost Optimization Hub status and common issues."""
 12 |     print("🔍 Diagnosing Cost Optimization Hub Issues")
 13 |     print("=" * 50)
 14 |     
 15 |     # Test different regions where Cost Optimization Hub is available
 16 |     regions_to_test = ['us-east-1', 'us-west-2', 'eu-west-1', 'ap-southeast-1']
 17 |     
 18 |     for region in regions_to_test:
 19 |         print(f"\n📍 Testing region: {region}")
 20 |         try:
 21 |             client = boto3.client('cost-optimization-hub', region_name=region)
 22 |             
 23 |             # Test 1: Check enrollment statuses (correct API call)
 24 |             print("   ✅ Testing enrollment statuses...")
 25 |             try:
 26 |                 enrollment_response = client.list_enrollment_statuses()
 27 |                 print(f"   📊 Enrollment Response: {json.dumps(enrollment_response, indent=2, default=str)}")
 28 |                 
 29 |                 # Check if any accounts are enrolled
 30 |                 items = enrollment_response.get('items', [])
 31 |                 if items:
 32 |                     active_enrollments = [item for item in items if item.get('status') == 'Active']
 33 |                     if active_enrollments:
 34 |                         print(f"   ✅ Found {len(active_enrollments)} active enrollments")
 35 |                         
 36 |                         # Test 2: Try to list recommendations
 37 |                         print("   ✅ Testing list recommendations...")
 38 |                         try:
 39 |                             recommendations = client.list_recommendations(maxResults=5)
 40 |                             print(f"   📊 Found {len(recommendations.get('items', []))} recommendations")
 41 |                             print("   ✅ Cost Optimization Hub is working correctly!")
 42 |                             return True
 43 |                             
 44 |                         except ClientError as rec_error:
 45 |                             print(f"   ❌ Error listing recommendations: {rec_error.response['Error']['Code']} - {rec_error.response['Error']['Message']}")
 46 |                     else:
 47 |                         print("   ⚠️  No active enrollments found")
 48 |                         print("   💡 You need to enable Cost Optimization Hub in the AWS Console")
 49 |                 else:
 50 |                     print("   ⚠️  No enrollment information found")
 51 |                     print("   💡 Cost Optimization Hub may not be set up for this account")
 52 |                     
 53 |             except ClientError as enrollment_error:
 54 |                 error_code = enrollment_error.response['Error']['Code']
 55 |                 error_message = enrollment_error.response['Error']['Message']
 56 |                 
 57 |                 if error_code == 'AccessDeniedException':
 58 |                     print("   ❌ Access denied - check IAM permissions")
 59 |                     print("   💡 Required permissions: cost-optimization-hub:ListEnrollmentStatuses")
 60 |                 elif error_code == 'ValidationException':
 61 |                     print(f"   ❌ Validation error: {error_message}")
 62 |                 else:
 63 |                     print(f"   ❌ Error: {error_code} - {error_message}")
 64 |                     
 65 |         except Exception as e:
 66 |             print(f"   ❌ Failed to create client for region {region}: {str(e)}")
 67 |     
 68 |     return False
 69 | 
 70 | def check_iam_permissions():
 71 |     """Check IAM permissions for Cost Optimization Hub."""
 72 |     print("\n🔐 Checking IAM Permissions")
 73 |     print("=" * 30)
 74 |     
 75 |     try:
 76 |         # Get current user/role
 77 |         sts_client = boto3.client('sts')
 78 |         identity = sts_client.get_caller_identity()
 79 |         print(f"Current identity: {identity.get('Arn', 'Unknown')}")
 80 |         
 81 |         # Correct required actions for Cost Optimization Hub
 82 |         required_actions = [
 83 |             'cost-optimization-hub:ListEnrollmentStatuses',
 84 |             'cost-optimization-hub:UpdateEnrollmentStatus',
 85 |             'cost-optimization-hub:GetPreferences',
 86 |             'cost-optimization-hub:UpdatePreferences',
 87 |             'cost-optimization-hub:GetRecommendation',
 88 |             'cost-optimization-hub:ListRecommendations',
 89 |             'cost-optimization-hub:ListRecommendationSummaries'
 90 |         ]
 91 |         
 92 |         print("\nRequired permissions for Cost Optimization Hub:")
 93 |         for action in required_actions:
 94 |             print(f"  - {action}")
 95 |             
 96 |         print("\nMinimal permissions for read-only access:")
 97 |         minimal_actions = [
 98 |             'cost-optimization-hub:ListEnrollmentStatuses',
 99 |             'cost-optimization-hub:ListRecommendations',
100 |             'cost-optimization-hub:GetRecommendation',
101 |             'cost-optimization-hub:ListRecommendationSummaries'
102 |         ]
103 |         for action in minimal_actions:
104 |             print(f"  - {action}")
105 |             
106 |     except Exception as e:
107 |         print(f"Error checking IAM: {str(e)}")
108 | 
109 | def test_individual_apis():
110 |     """Test individual Cost Optimization Hub APIs."""
111 |     print("\n🧪 Testing Individual APIs")
112 |     print("=" * 30)
113 |     
114 |     try:
115 |         client = boto3.client('cost-optimization-hub', region_name='us-east-1')
116 |         
117 |         # Test 1: List Enrollment Statuses
118 |         print("\n1. Testing list_enrollment_statuses...")
119 |         try:
120 |             response = client.list_enrollment_statuses()
121 |             print(f"   ✅ Success: Found {len(response.get('items', []))} enrollment records")
122 |         except ClientError as e:
123 |             print(f"   ❌ Failed: {e.response['Error']['Code']} - {e.response['Error']['Message']}")
124 |         
125 |         # Test 2: List Recommendations
126 |         print("\n2. Testing list_recommendations...")
127 |         try:
128 |             response = client.list_recommendations(maxResults=5)
129 |             print(f"   ✅ Success: Found {len(response.get('items', []))} recommendations")
130 |         except ClientError as e:
131 |             print(f"   ❌ Failed: {e.response['Error']['Code']} - {e.response['Error']['Message']}")
132 |         
133 |         # Test 3: List Recommendation Summaries
134 |         print("\n3. Testing list_recommendation_summaries...")
135 |         try:
136 |             response = client.list_recommendation_summaries(maxResults=5)
137 |             print(f"   ✅ Success: Found {len(response.get('items', []))} summaries")
138 |         except ClientError as e:
139 |             print(f"   ❌ Failed: {e.response['Error']['Code']} - {e.response['Error']['Message']}")
140 |             
141 |     except Exception as e:
142 |         print(f"Error testing APIs: {str(e)}")
143 | 
144 | def provide_correct_iam_policy():
145 |     """Provide the correct IAM policy for Cost Optimization Hub."""
146 |     print("\n📋 Correct IAM Policy")
147 |     print("=" * 25)
148 |     
149 |     policy = {
150 |         "Version": "2012-10-17",
151 |         "Statement": [
152 |             {
153 |                 "Effect": "Allow",
154 |                 "Action": [
155 |                     "cost-optimization-hub:ListEnrollmentStatuses",
156 |                     "cost-optimization-hub:UpdateEnrollmentStatus",
157 |                     "cost-optimization-hub:GetPreferences",
158 |                     "cost-optimization-hub:UpdatePreferences",
159 |                     "cost-optimization-hub:GetRecommendation",
160 |                     "cost-optimization-hub:ListRecommendations",
161 |                     "cost-optimization-hub:ListRecommendationSummaries"
162 |                 ],
163 |                 "Resource": "*"
164 |             }
165 |         ]
166 |     }
167 |     
168 |     print("Full IAM Policy:")
169 |     print(json.dumps(policy, indent=2))
170 |     
171 |     minimal_policy = {
172 |         "Version": "2012-10-17",
173 |         "Statement": [
174 |             {
175 |                 "Effect": "Allow",
176 |                 "Action": [
177 |                     "cost-optimization-hub:ListEnrollmentStatuses",
178 |                     "cost-optimization-hub:ListRecommendations",
179 |                     "cost-optimization-hub:GetRecommendation",
180 |                     "cost-optimization-hub:ListRecommendationSummaries"
181 |                 ],
182 |                 "Resource": "*"
183 |             }
184 |         ]
185 |     }
186 |     
187 |     print("\nMinimal Read-Only Policy:")
188 |     print(json.dumps(minimal_policy, indent=2))
189 | 
190 | def provide_solutions():
191 |     """Provide solutions for common Cost Optimization Hub issues."""
192 |     print("\n🛠️  Updated Solutions")
193 |     print("=" * 20)
194 |     
195 |     solutions = [
196 |         {
197 |             "issue": "AccessDeniedException",
198 |             "solution": [
199 |                 "1. Add the correct IAM permissions (see policy above)",
200 |                 "2. The service uses different permission names than other AWS services",
201 |                 "3. Use 'cost-optimization-hub:ListEnrollmentStatuses' not 'GetEnrollmentStatus'",
202 |                 "4. Attach the policy to your IAM user/role"
203 |             ]
204 |         },
205 |         {
206 |             "issue": "No enrollment found",
207 |             "solution": [
208 |                 "1. Go to AWS Console → Cost Optimization Hub",
209 |                 "2. Enable the service for your account",
210 |                 "3. Wait for enrollment to complete",
211 |                 "4. URL: https://console.aws.amazon.com/cost-optimization-hub/"
212 |             ]
213 |         },
214 |         {
215 |             "issue": "Service not available",
216 |             "solution": [
217 |                 "1. Cost Optimization Hub is only available in specific regions",
218 |                 "2. Use us-east-1, us-west-2, eu-west-1, or ap-southeast-1",
219 |                 "3. The service may not be available in your region yet"
220 |             ]
221 |         },
222 |         {
223 |             "issue": "No recommendations found",
224 |             "solution": [
225 |                 "1. Cost Optimization Hub needs time to analyze your resources",
226 |                 "2. Ensure you have resources running for at least 14 days",
227 |                 "3. The service needs sufficient usage data to generate recommendations",
228 |                 "4. Check if you have any EC2, RDS, or other supported resources"
229 |             ]
230 |         }
231 |     ]
232 |     
233 |     for solution in solutions:
234 |         print(f"\n🔧 {solution['issue']}:")
235 |         for step in solution['solution']:
236 |             print(f"   {step}")
237 | 
238 | def main():
239 |     """Main diagnostic function."""
240 |     print("AWS Cost Optimization Hub Diagnostic Tool v2")
241 |     print("=" * 50)
242 |     
243 |     try:
244 |         # Run diagnostics
245 |         hub_working = check_cost_optimization_hub()
246 |         check_iam_permissions()
247 |         test_individual_apis()
248 |         provide_correct_iam_policy()
249 |         provide_solutions()
250 |         
251 |         print("\n" + "=" * 60)
252 |         if hub_working:
253 |             print("✅ DIAGNOSIS: Cost Optimization Hub appears to be working!")
254 |         else:
255 |             print("❌ DIAGNOSIS: Cost Optimization Hub needs to be set up.")
256 |             
257 |         print("\n📝 Next Steps:")
258 |         print("1. Apply the correct IAM policy shown above")
259 |         print("2. Enable Cost Optimization Hub in the AWS Console if needed")
260 |         print("3. Use the updated MCP server (mcp_server_fixed_v3.py)")
261 |         print("4. Test with the enrollment status tool first")
262 |         
263 |     except Exception as e:
264 |         print(f"\n❌ Diagnostic failed: {str(e)}")
265 |         print("Please check your AWS credentials and try again.")
266 | 
267 | if __name__ == "__main__":
268 |     main()
269 | 
```

--------------------------------------------------------------------------------
/utils/parallel_executor.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Parallel Execution Engine for CFM Tips MCP Server
  3 | 
  4 | Provides optimized parallel execution of AWS service calls with session integration.
  5 | """
  6 | 
  7 | import logging
  8 | import time
  9 | import threading
 10 | from concurrent.futures import ThreadPoolExecutor, as_completed, Future
 11 | from typing import Dict, List, Any, Optional, Callable, Union
 12 | from dataclasses import dataclass
 13 | from datetime import datetime
 14 | 
 15 | logger = logging.getLogger(__name__)
 16 | 
 17 | @dataclass
 18 | class TaskResult:
 19 |     """Result of a parallel task execution."""
 20 |     task_id: str
 21 |     service: str
 22 |     operation: str
 23 |     status: str  # 'success', 'error', 'timeout'
 24 |     data: Any = None
 25 |     error: Optional[str] = None
 26 |     execution_time: float = 0.0
 27 |     timestamp: datetime = None
 28 |     
 29 |     def __post_init__(self):
 30 |         if self.timestamp is None:
 31 |             self.timestamp = datetime.now()
 32 | 
 33 | @dataclass
 34 | class ParallelTask:
 35 |     """Definition of a task to be executed in parallel."""
 36 |     task_id: str
 37 |     service: str
 38 |     operation: str
 39 |     function: Callable
 40 |     args: tuple = ()
 41 |     kwargs: Dict[str, Any] = None
 42 |     timeout: float = 30.0
 43 |     priority: int = 1  # Higher number = higher priority
 44 |     
 45 |     def __post_init__(self):
 46 |         if self.kwargs is None:
 47 |             self.kwargs = {}
 48 | 
 49 | class ParallelExecutor:
 50 |     """Executes AWS service calls in parallel with optimized resource management."""
 51 |     
 52 |     def __init__(self, max_workers: int = 10, default_timeout: float = 30.0):
 53 |         self.max_workers = max_workers
 54 |         self.default_timeout = default_timeout
 55 |         self.executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="CFM-Worker")
 56 |         self._active_tasks: Dict[str, Future] = {}
 57 |         self._results: Dict[str, TaskResult] = {}
 58 |         self._lock = threading.RLock()
 59 |         
 60 |         logger.info(f"Initialized ParallelExecutor with {max_workers} workers")
 61 |     
 62 |     def submit_task(self, task: ParallelTask) -> str:
 63 |         """Submit a task for parallel execution."""
 64 |         with self._lock:
 65 |             if task.task_id in self._active_tasks:
 66 |                 raise ValueError(f"Task {task.task_id} already exists")
 67 |             
 68 |             future = self.executor.submit(self._execute_task, task)
 69 |             self._active_tasks[task.task_id] = future
 70 |             
 71 |             logger.debug(f"Submitted task {task.task_id} ({task.service}.{task.operation})")
 72 |             return task.task_id
 73 |     
 74 |     def submit_batch(self, tasks: List[ParallelTask]) -> List[str]:
 75 |         """Submit multiple tasks for parallel execution."""
 76 |         # Sort by priority (higher first)
 77 |         sorted_tasks = sorted(tasks, key=lambda t: t.priority, reverse=True)
 78 |         
 79 |         task_ids = []
 80 |         for task in sorted_tasks:
 81 |             try:
 82 |                 task_id = self.submit_task(task)
 83 |                 task_ids.append(task_id)
 84 |             except Exception as e:
 85 |                 logger.error(f"Error submitting task {task.task_id}: {e}")
 86 |                 # Create error result
 87 |                 error_result = TaskResult(
 88 |                     task_id=task.task_id,
 89 |                     service=task.service,
 90 |                     operation=task.operation,
 91 |                     status='error',
 92 |                     error=str(e)
 93 |                 )
 94 |                 with self._lock:
 95 |                     self._results[task.task_id] = error_result
 96 |         
 97 |         logger.info(f"Submitted batch of {len(task_ids)} tasks")
 98 |         return task_ids
 99 |     
100 |     def _execute_task(self, task: ParallelTask) -> TaskResult:
101 |         """Execute a single task with timeout and error handling."""
102 |         start_time = time.time()
103 |         
104 |         try:
105 |             logger.debug(f"Executing task {task.task_id}")
106 |             
107 |             # Execute the function with timeout
108 |             result_data = task.function(*task.args, **task.kwargs)
109 |             
110 |             execution_time = time.time() - start_time
111 |             
112 |             result = TaskResult(
113 |                 task_id=task.task_id,
114 |                 service=task.service,
115 |                 operation=task.operation,
116 |                 status='success',
117 |                 data=result_data,
118 |                 execution_time=execution_time
119 |             )
120 |             
121 |             logger.debug(f"Task {task.task_id} completed in {execution_time:.2f}s")
122 |             
123 |         except Exception as e:
124 |             execution_time = time.time() - start_time
125 |             error_msg = str(e)
126 |             
127 |             result = TaskResult(
128 |                 task_id=task.task_id,
129 |                 service=task.service,
130 |                 operation=task.operation,
131 |                 status='error',
132 |                 error=error_msg,
133 |                 execution_time=execution_time
134 |             )
135 |             
136 |             logger.error(f"Task {task.task_id} failed after {execution_time:.2f}s: {error_msg}")
137 |         
138 |         # Store result
139 |         with self._lock:
140 |             self._results[task.task_id] = result
141 |             if task.task_id in self._active_tasks:
142 |                 del self._active_tasks[task.task_id]
143 |         
144 |         return result
145 |     
146 |     def wait_for_tasks(self, task_ids: List[str], timeout: Optional[float] = None) -> Dict[str, TaskResult]:
147 |         """Wait for specific tasks to complete."""
148 |         if timeout is None:
149 |             timeout = self.default_timeout
150 |         
151 |         results = {}
152 |         remaining_tasks = set(task_ids)
153 |         start_time = time.time()
154 |         
155 |         while remaining_tasks and (time.time() - start_time) < timeout:
156 |             completed_tasks = set()
157 |             
158 |             with self._lock:
159 |                 for task_id in remaining_tasks:
160 |                     if task_id in self._results:
161 |                         results[task_id] = self._results[task_id]
162 |                         completed_tasks.add(task_id)
163 |                     elif task_id not in self._active_tasks:
164 |                         # Task not found, create error result
165 |                         error_result = TaskResult(
166 |                             task_id=task_id,
167 |                             service='unknown',
168 |                             operation='unknown',
169 |                             status='error',
170 |                             error='Task not found'
171 |                         )
172 |                         results[task_id] = error_result
173 |                         completed_tasks.add(task_id)
174 |             
175 |             remaining_tasks -= completed_tasks
176 |             
177 |             if remaining_tasks:
178 |                 time.sleep(0.1)  # Small delay to avoid busy waiting
179 |         
180 |         # Handle timeout for remaining tasks
181 |         for task_id in remaining_tasks:
182 |             timeout_result = TaskResult(
183 |                 task_id=task_id,
184 |                 service='unknown',
185 |                 operation='unknown',
186 |                 status='timeout',
187 |                 error=f'Task timed out after {timeout}s'
188 |             )
189 |             results[task_id] = timeout_result
190 |         
191 |         logger.info(f"Completed waiting for {len(task_ids)} tasks, {len(results)} results")
192 |         return results
193 |     
194 |     def wait_for_all(self, timeout: Optional[float] = None) -> Dict[str, TaskResult]:
195 |         """Wait for all active tasks to complete."""
196 |         with self._lock:
197 |             active_task_ids = list(self._active_tasks.keys())
198 |         
199 |         if not active_task_ids:
200 |             return {}
201 |         
202 |         return self.wait_for_tasks(active_task_ids, timeout)
203 |     
204 |     def get_result(self, task_id: str) -> Optional[TaskResult]:
205 |         """Get result for a specific task."""
206 |         with self._lock:
207 |             return self._results.get(task_id)
208 |     
209 |     def get_all_results(self) -> Dict[str, TaskResult]:
210 |         """Get all available results."""
211 |         with self._lock:
212 |             return self._results.copy()
213 |     
214 |     def cancel_task(self, task_id: str) -> bool:
215 |         """Cancel a running task."""
216 |         with self._lock:
217 |             if task_id in self._active_tasks:
218 |                 future = self._active_tasks[task_id]
219 |                 cancelled = future.cancel()
220 |                 if cancelled:
221 |                     del self._active_tasks[task_id]
222 |                     # Create cancelled result
223 |                     cancel_result = TaskResult(
224 |                         task_id=task_id,
225 |                         service='unknown',
226 |                         operation='unknown',
227 |                         status='error',
228 |                         error='Task cancelled'
229 |                     )
230 |                     self._results[task_id] = cancel_result
231 |                 return cancelled
232 |         return False
233 |     
234 |     def get_status(self) -> Dict[str, Any]:
235 |         """Get executor status information."""
236 |         with self._lock:
237 |             active_count = len(self._active_tasks)
238 |             completed_count = len(self._results)
239 |             
240 |             # Count results by status
241 |             status_counts = {}
242 |             for result in self._results.values():
243 |                 status_counts[result.status] = status_counts.get(result.status, 0) + 1
244 |         
245 |         return {
246 |             'max_workers': self.max_workers,
247 |             'active_tasks': active_count,
248 |             'completed_tasks': completed_count,
249 |             'status_breakdown': status_counts,
250 |             'executor_alive': not self.executor._shutdown
251 |         }
252 |     
253 |     def clear_results(self, older_than_minutes: int = 60):
254 |         """Clear old results to free memory."""
255 |         cutoff_time = datetime.now().timestamp() - (older_than_minutes * 60)
256 |         
257 |         with self._lock:
258 |             old_task_ids = []
259 |             for task_id, result in self._results.items():
260 |                 if result.timestamp.timestamp() < cutoff_time:
261 |                     old_task_ids.append(task_id)
262 |             
263 |             for task_id in old_task_ids:
264 |                 del self._results[task_id]
265 |         
266 |         logger.info(f"Cleared {len(old_task_ids)} old results")
267 |     
268 |     def shutdown(self, wait: bool = True):
269 |         """Shutdown the executor and clean up resources."""
270 |         logger.info("Shutting down ParallelExecutor")
271 |         
272 |         with self._lock:
273 |             # Cancel all active tasks
274 |             for task_id, future in self._active_tasks.items():
275 |                 future.cancel()
276 |             self._active_tasks.clear()
277 |         
278 |         # Shutdown executor
279 |         self.executor.shutdown(wait=wait)
280 |         logger.info("ParallelExecutor shutdown complete")
281 | 
282 | # Global executor instance
283 | _parallel_executor = None
284 | 
285 | def get_parallel_executor() -> ParallelExecutor:
286 |     """Get the global parallel executor instance."""
287 |     global _parallel_executor
288 |     if _parallel_executor is None:
289 |         _parallel_executor = ParallelExecutor()
290 |     return _parallel_executor
291 | 
292 | def create_task(task_id: str, service: str, operation: str, function: Callable,
293 |                 args: tuple = (), kwargs: Dict[str, Any] = None, 
294 |                 timeout: float = 30.0, priority: int = 1) -> ParallelTask:
295 |     """Helper function to create a ParallelTask."""
296 |     return ParallelTask(
297 |         task_id=task_id,
298 |         service=service,
299 |         operation=operation,
300 |         function=function,
301 |         args=args,
302 |         kwargs=kwargs or {},
303 |         timeout=timeout,
304 |         priority=priority
305 |     )
```

--------------------------------------------------------------------------------
/utils/logging_config.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Centralized logging configuration for CFM Tips MCP Server
  3 | 
  4 | """
  5 | 
  6 | import logging
  7 | import sys
  8 | import os
  9 | import json
 10 | import tempfile
 11 | from datetime import datetime
 12 | from typing import Dict, Any, Optional, List
 13 | 
 14 | 
 15 | class StructuredFormatter(logging.Formatter):
 16 |     """Custom formatter for structured logging with JSON output."""
 17 |     
 18 |     def format(self, record):
 19 |         """Format log record as structured JSON."""
 20 |         log_entry = {
 21 |             'timestamp': datetime.fromtimestamp(record.created).isoformat(),
 22 |             'level': record.levelname,
 23 |             'logger': record.name,
 24 |             'module': record.module,
 25 |             'function': record.funcName,
 26 |             'line': record.lineno,
 27 |             'message': record.getMessage(),
 28 |             'thread': record.thread,
 29 |             'thread_name': record.threadName
 30 |         }
 31 |         
 32 |         # Add exception information if present
 33 |         if record.exc_info:
 34 |             log_entry['exception'] = self.formatException(record.exc_info)
 35 |         
 36 |         # Add extra fields from record
 37 |         for key, value in record.__dict__.items():
 38 |             if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname', 
 39 |                           'filename', 'module', 'lineno', 'funcName', 'created', 
 40 |                           'msecs', 'relativeCreated', 'thread', 'threadName', 
 41 |                           'processName', 'process', 'getMessage', 'exc_info', 
 42 |                           'exc_text', 'stack_info']:
 43 |                 log_entry[key] = value
 44 |         
 45 |         return json.dumps(log_entry)
 46 | 
 47 | 
 48 | class StandardFormatter(logging.Formatter):
 49 |     """Enhanced standard formatter with more context."""
 50 |     
 51 |     def __init__(self):
 52 |         super().__init__(
 53 |             '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
 54 |         )
 55 | 
 56 | 
 57 | def setup_logging(structured: bool = False, log_level: str = "INFO"):
 58 |     """
 59 |     Configure comprehensive logging for the application.
 60 |     
 61 |     Args:
 62 |         structured: Whether to use structured JSON logging
 63 |         log_level: Logging level (DEBUG, INFO, WARNING, ERROR)
 64 |     """
 65 |     
 66 |     # Create appropriate formatter
 67 |     if structured:
 68 |         formatter = StructuredFormatter()
 69 |     else:
 70 |         formatter = StandardFormatter()
 71 |     
 72 |     # Configure root logger
 73 |     root_logger = logging.getLogger()
 74 |     root_logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
 75 |     
 76 |     # Remove existing handlers
 77 |     for handler in root_logger.handlers[:]:
 78 |         root_logger.removeHandler(handler)
 79 |     
 80 |     # Add file handlers
 81 |     try:
 82 |         # Try to create logs directory if it doesn't exist
 83 |         log_dir = 'logs'
 84 |         if not os.path.exists(log_dir):
 85 |             os.makedirs(log_dir, exist_ok=True)
 86 |         
 87 |         # Try main log file in logs directory first
 88 |         log_file = os.path.join(log_dir, 'cfm_tips_mcp.log')
 89 |         file_handler = logging.FileHandler(log_file)
 90 |         file_handler.setLevel(logging.INFO)
 91 |         file_handler.setFormatter(formatter)
 92 |         root_logger.addHandler(file_handler)
 93 |         
 94 |         # Try error log file
 95 |         error_file = os.path.join(log_dir, 'cfm_tips_mcp_errors.log')
 96 |         error_handler = logging.FileHandler(error_file)
 97 |         error_handler.setLevel(logging.ERROR)
 98 |         error_handler.setFormatter(formatter)
 99 |         root_logger.addHandler(error_handler)
100 |         
101 |     except (OSError, PermissionError) as e:
102 |         # If we can't write to logs directory, try current directory
103 |         try:
104 |             file_handler = logging.FileHandler('cfm_tips_mcp.log')
105 |             file_handler.setLevel(logging.INFO)
106 |             file_handler.setFormatter(formatter)
107 |             root_logger.addHandler(file_handler)
108 |             
109 |             error_handler = logging.FileHandler('cfm_tips_mcp_errors.log')
110 |             error_handler.setLevel(logging.ERROR)
111 |             error_handler.setFormatter(formatter)
112 |             root_logger.addHandler(error_handler)
113 |             
114 |         except (OSError, PermissionError):
115 |             # If we can't write anywhere, try temp directory
116 |             try:
117 |                 temp_dir = tempfile.gettempdir()
118 |                 temp_log = os.path.join(temp_dir, 'cfm_tips_mcp.log')
119 |                 file_handler = logging.FileHandler(temp_log)
120 |                 file_handler.setLevel(logging.INFO)
121 |                 file_handler.setFormatter(formatter)
122 |                 root_logger.addHandler(file_handler)
123 |                 
124 |                 temp_error = os.path.join(temp_dir, 'cfm_tips_mcp_errors.log')
125 |                 error_handler = logging.FileHandler(temp_error)
126 |                 error_handler.setLevel(logging.ERROR)
127 |                 error_handler.setFormatter(formatter)
128 |                 root_logger.addHandler(error_handler)
129 |                 
130 |                 # Log where we're writing files
131 |                 print(f"Warning: Using temp directory for logs: {temp_dir}")
132 |                 
133 |             except (OSError, PermissionError):
134 |                 # If all else fails, raise error since we need file logging
135 |                 raise RuntimeError("Could not create log files in any location")
136 |     
137 |     return logging.getLogger(__name__)
138 | 
139 | def log_function_entry(logger, func_name, **kwargs):
140 |     """Log function entry with parameters."""
141 |     logger.info(f"Entering {func_name} with params: {kwargs}")
142 | 
143 | def log_function_exit(logger, func_name, result_status=None, execution_time=None):
144 |     """Log function exit with results."""
145 |     msg = f"Exiting {func_name}"
146 |     if result_status:
147 |         msg += f" - Status: {result_status}"
148 |     if execution_time:
149 |         msg += f" - Time: {execution_time:.2f}s"
150 |     logger.info(msg)
151 | 
152 | def log_aws_api_call(logger, service, operation, **params):
153 |     """Log AWS API calls."""
154 |     logger.info(f"AWS API Call: {service}.{operation} with params: {params}")
155 | 
156 | def log_aws_api_error(logger, service, operation, error):
157 |     """Log AWS API errors."""
158 |     logger.error(f"AWS API Error: {service}.{operation} - {str(error)}")
159 | 
160 | 
161 | def create_structured_logger(name: str, extra_fields: Optional[Dict[str, Any]] = None) -> logging.Logger:
162 |     """
163 |     Create a logger with structured logging capabilities.
164 |     
165 |     Args:
166 |         name: Logger name
167 |         extra_fields: Additional fields to include in all log messages
168 |         
169 |     Returns:
170 |         Configured logger instance
171 |     """
172 |     logger = logging.getLogger(name)
173 |     
174 |     if extra_fields:
175 |         # Create adapter to add extra fields
176 |         logger = logging.LoggerAdapter(logger, extra_fields)
177 |     
178 |     return logger
179 | 
180 | 
181 | def log_s3_operation(logger, operation: str, bucket_name: Optional[str] = None, 
182 |                     object_key: Optional[str] = None, **kwargs):
183 |     """
184 |     Log S3 operations with structured data.
185 |     
186 |     Args:
187 |         logger: Logger instance
188 |         operation: S3 operation name
189 |         bucket_name: S3 bucket name
190 |         object_key: S3 object key
191 |         **kwargs: Additional operation parameters
192 |     """
193 |     log_data = {
194 |         'operation_type': 's3_operation',
195 |         'operation': operation,
196 |         'bucket_name': bucket_name,
197 |         'object_key': object_key
198 |     }
199 |     log_data.update(kwargs)
200 |     
201 |     logger.info(f"S3 Operation: {operation}", extra=log_data)
202 | 
203 | 
204 | def log_analysis_start(logger, analysis_type: str, session_id: Optional[str] = None, **kwargs):
205 |     """
206 |     Log analysis start with structured data.
207 |     
208 |     Args:
209 |         logger: Logger instance
210 |         analysis_type: Type of analysis
211 |         session_id: Session identifier
212 |         **kwargs: Additional analysis parameters
213 |     """
214 |     log_data = {
215 |         'event_type': 'analysis_start',
216 |         'analysis_type': analysis_type,
217 |         'session_id': session_id
218 |     }
219 |     log_data.update(kwargs)
220 |     
221 |     logger.info(f"Starting analysis: {analysis_type}", extra=log_data)
222 | 
223 | 
224 | def log_analysis_complete(logger, analysis_type: str, status: str, execution_time: float,
225 |                          session_id: Optional[str] = None, **kwargs):
226 |     """
227 |     Log analysis completion with structured data.
228 |     
229 |     Args:
230 |         logger: Logger instance
231 |         analysis_type: Type of analysis
232 |         status: Analysis status
233 |         execution_time: Execution time in seconds
234 |         session_id: Session identifier
235 |         **kwargs: Additional analysis results
236 |     """
237 |     log_data = {
238 |         'event_type': 'analysis_complete',
239 |         'analysis_type': analysis_type,
240 |         'status': status,
241 |         'execution_time': execution_time,
242 |         'session_id': session_id
243 |     }
244 |     log_data.update(kwargs)
245 |     
246 |     logger.info(f"Completed analysis: {analysis_type} - Status: {status}", extra=log_data)
247 | 
248 | 
249 | def log_cost_optimization_finding(logger, finding_type: str, resource_id: str, 
250 |                                  potential_savings: Optional[float] = None, **kwargs):
251 |     """
252 |     Log cost optimization findings with structured data.
253 |     
254 |     Args:
255 |         logger: Logger instance
256 |         finding_type: Type of optimization finding
257 |         resource_id: Resource identifier
258 |         potential_savings: Estimated cost savings
259 |         **kwargs: Additional finding details
260 |     """
261 |     log_data = {
262 |         'event_type': 'cost_optimization_finding',
263 |         'finding_type': finding_type,
264 |         'resource_id': resource_id,
265 |         'potential_savings': potential_savings
266 |     }
267 |     log_data.update(kwargs)
268 |     
269 |     logger.info(f"Cost optimization finding: {finding_type} for {resource_id}", extra=log_data)
270 | 
271 | 
272 | def log_session_operation(logger, operation: str, session_id: str, **kwargs):
273 |     """
274 |     Log session operations with structured data.
275 |     
276 |     Args:
277 |         logger: Logger instance
278 |         operation: Session operation
279 |         session_id: Session identifier
280 |         **kwargs: Additional operation details
281 |     """
282 |     log_data = {
283 |         'event_type': 'session_operation',
284 |         'operation': operation,
285 |         'session_id': session_id
286 |     }
287 |     log_data.update(kwargs)
288 |     
289 |     logger.info(f"Session operation: {operation} for session {session_id}", extra=log_data)
290 | 
291 | 
292 | def log_cloudwatch_operation(logger, operation: str, component: Optional[str] = None, 
293 |                             cost_incurred: bool = False, **kwargs):
294 |     """
295 |     Log CloudWatch operations with structured data and cost tracking.
296 |     
297 |     Args:
298 |         logger: Logger instance
299 |         operation: CloudWatch operation name
300 |         component: CloudWatch component (logs, metrics, alarms, dashboards)
301 |         cost_incurred: Whether the operation incurred costs
302 |         **kwargs: Additional operation parameters
303 |     """
304 |     log_data = {
305 |         'operation_type': 'cloudwatch_operation',
306 |         'operation': operation,
307 |         'component': component,
308 |         'cost_incurred': cost_incurred
309 |     }
310 |     log_data.update(kwargs)
311 |     
312 |     if cost_incurred:
313 |         logger.warning(f"CloudWatch Operation (COST INCURRED): {operation}", extra=log_data)
314 |     else:
315 |         logger.info(f"CloudWatch Operation: {operation}", extra=log_data)
316 | 
317 | 
318 | # CloudWatch-specific logging methods consolidated into log_cloudwatch_operation
319 | # These specialized methods have been removed in favor of the generic log_cloudwatch_operation method
320 | 
321 | 
322 | # Removed setup_cloudwatch_logging - use setup_logging instead with log_cloudwatch_operation for CloudWatch-specific events
```
Page 2/19FirstPrevNextLast