#
tokens: 49033/50000 58/161 files (page 1/19)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 1 of 19. Use http://codebase.md/aws-samples/sample-cfm-tips-mcp?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .gitignore
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── diagnose_cost_optimization_hub_v2.py
├── LICENSE
├── logging_config.py
├── mcp_runbooks.json
├── mcp_server_with_runbooks.py
├── playbooks
│   ├── __init__.py
│   ├── aws_lambda
│   │   ├── __init__.py
│   │   └── lambda_optimization.py
│   ├── cloudtrail
│   │   ├── __init__.py
│   │   └── cloudtrail_optimization.py
│   ├── cloudtrail_optimization.py
│   ├── cloudwatch
│   │   ├── __init__.py
│   │   ├── aggregation_queries.py
│   │   ├── alarms_and_dashboards_analyzer.py
│   │   ├── analysis_engine.py
│   │   ├── base_analyzer.py
│   │   ├── cloudwatch_optimization_analyzer.py
│   │   ├── cloudwatch_optimization_tool.py
│   │   ├── cloudwatch_optimization.py
│   │   ├── cost_controller.py
│   │   ├── general_spend_analyzer.py
│   │   ├── logs_optimization_analyzer.py
│   │   ├── metrics_optimization_analyzer.py
│   │   ├── optimization_orchestrator.py
│   │   └── result_processor.py
│   ├── comprehensive_optimization.py
│   ├── ebs
│   │   ├── __init__.py
│   │   └── ebs_optimization.py
│   ├── ebs_optimization.py
│   ├── ec2
│   │   ├── __init__.py
│   │   └── ec2_optimization.py
│   ├── ec2_optimization.py
│   ├── lambda_optimization.py
│   ├── rds
│   │   ├── __init__.py
│   │   └── rds_optimization.py
│   ├── rds_optimization.py
│   └── s3
│       ├── __init__.py
│       ├── analyzers
│       │   ├── __init__.py
│       │   ├── api_cost_analyzer.py
│       │   ├── archive_optimization_analyzer.py
│       │   ├── general_spend_analyzer.py
│       │   ├── governance_analyzer.py
│       │   ├── multipart_cleanup_analyzer.py
│       │   └── storage_class_analyzer.py
│       ├── base_analyzer.py
│       ├── s3_aggregation_queries.py
│       ├── s3_analysis_engine.py
│       ├── s3_comprehensive_optimization_tool.py
│       ├── s3_optimization_orchestrator.py
│       └── s3_optimization.py
├── README.md
├── requirements.txt
├── runbook_functions_extended.py
├── runbook_functions.py
├── RUNBOOKS_GUIDE.md
├── services
│   ├── __init__.py
│   ├── cloudwatch_pricing.py
│   ├── cloudwatch_service_vended_log.py
│   ├── cloudwatch_service.py
│   ├── compute_optimizer.py
│   ├── cost_explorer.py
│   ├── optimization_hub.py
│   ├── performance_insights.py
│   ├── pricing.py
│   ├── s3_pricing.py
│   ├── s3_service.py
│   ├── storage_lens_service.py
│   └── trusted_advisor.py
├── setup.py
├── test_runbooks.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   ├── cloudwatch
│   │   │   └── test_cloudwatch_integration.py
│   │   ├── test_cloudwatch_comprehensive_tool_integration.py
│   │   ├── test_cloudwatch_orchestrator_integration.py
│   │   ├── test_integration_suite.py
│   │   └── test_orchestrator_integration.py
│   ├── legacy
│   │   ├── example_output_with_docs.py
│   │   ├── example_wellarchitected_output.py
│   │   ├── test_aws_session_management.py
│   │   ├── test_cloudwatch_orchestrator_pagination.py
│   │   ├── test_cloudwatch_pagination_integration.py
│   │   ├── test_cloudwatch_performance_optimizations.py
│   │   ├── test_cloudwatch_result_processor.py
│   │   ├── test_cloudwatch_timeout_issue.py
│   │   ├── test_documentation_links.py
│   │   ├── test_metrics_pagination_count.py
│   │   ├── test_orchestrator_integration.py
│   │   ├── test_pricing_cache_fix_moved.py
│   │   ├── test_pricing_cache_fix.py
│   │   ├── test_runbook_integration.py
│   │   ├── test_runbooks.py
│   │   ├── test_setup_verification.py
│   │   └── test_stack_trace_fix.py
│   ├── performance
│   │   ├── __init__.py
│   │   ├── cloudwatch
│   │   │   └── test_cloudwatch_performance.py
│   │   ├── test_cloudwatch_parallel_execution.py
│   │   ├── test_parallel_execution.py
│   │   └── test_performance_suite.py
│   ├── pytest-cloudwatch.ini
│   ├── pytest.ini
│   ├── README.md
│   ├── requirements-test.txt
│   ├── run_cloudwatch_tests.py
│   ├── run_tests.py
│   ├── test_setup_verification.py
│   ├── test_suite_main.py
│   └── unit
│       ├── __init__.py
│       ├── analyzers
│       │   ├── __init__.py
│       │   ├── conftest_cloudwatch.py
│       │   ├── test_alarms_and_dashboards_analyzer.py
│       │   ├── test_base_analyzer.py
│       │   ├── test_cloudwatch_base_analyzer.py
│       │   ├── test_cloudwatch_cost_constraints.py
│       │   ├── test_cloudwatch_general_spend_analyzer.py
│       │   ├── test_general_spend_analyzer.py
│       │   ├── test_logs_optimization_analyzer.py
│       │   └── test_metrics_optimization_analyzer.py
│       ├── cloudwatch
│       │   ├── test_cache_control.py
│       │   ├── test_cloudwatch_api_mocking.py
│       │   ├── test_cloudwatch_metrics_pagination.py
│       │   ├── test_cloudwatch_pagination_architecture.py
│       │   ├── test_cloudwatch_pagination_comprehensive_fixed.py
│       │   ├── test_cloudwatch_pagination_comprehensive.py
│       │   ├── test_cloudwatch_pagination_fixed.py
│       │   ├── test_cloudwatch_pagination_real_format.py
│       │   ├── test_cloudwatch_pagination_simple.py
│       │   ├── test_cloudwatch_query_pagination.py
│       │   ├── test_cloudwatch_unit_suite.py
│       │   ├── test_general_spend_tips_refactor.py
│       │   ├── test_import_error.py
│       │   ├── test_mcp_pagination_bug.py
│       │   └── test_mcp_surface_pagination.py
│       ├── s3
│       │   └── live
│       │       ├── test_bucket_listing.py
│       │       ├── test_s3_governance_bucket_discovery.py
│       │       └── test_top_buckets.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── test_cloudwatch_cost_controller.py
│       │   ├── test_cloudwatch_query_service.py
│       │   ├── test_cloudwatch_service.py
│       │   ├── test_cost_control_routing.py
│       │   └── test_s3_service.py
│       └── test_unit_suite.py
└── utils
    ├── __init__.py
    ├── aws_client_factory.py
    ├── cache_decorator.py
    ├── cleanup_manager.py
    ├── cloudwatch_cache.py
    ├── documentation_links.py
    ├── error_handler.py
    ├── intelligent_cache.py
    ├── logging_config.py
    ├── memory_manager.py
    ├── parallel_executor.py
    ├── performance_monitor.py
    ├── progressive_timeout.py
    ├── service_orchestrator.py
    └── session_manager.py
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
 1 | #UNCOMMENT THE BELOW LINE IN PUBLIC GITLAB VERSION TO AVOID SHARING THE KIRO SETTINGS
 2 | #.kiro/
 3 | 
 4 | # Virtual environments
 5 | venv/
 6 | env/
 7 | .env/
 8 | .venv/
 9 | ENV/
10 | env.bak/
11 | venv.bak/
12 | 
13 | #Temporary folders
14 | __pycache__/
15 | .pytest_cache/
16 | .amazonq/
17 | sessions/
18 | tests/.pytest_cache/
19 | 
20 | #Log folders
21 | logs/
22 | 
23 | # IDE
24 | .vscode/
25 | .idea/
26 | *.swp
27 | *.swo
28 | 
29 | # OS
30 | .DS_Store
31 | .DS_Store?
32 | ._*
33 | .Spotlight-V100
34 | .Trashes
35 | ehthumbs.db
36 | Thumbs.db
37 | 
38 | #Template files
39 | mcp_runbooks.json
```

--------------------------------------------------------------------------------
/tests/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | # S3 Optimization System - Testing Suite
  2 | 
  3 | This directory contains a comprehensive testing suite for the S3 optimization system, designed to ensure reliability, performance, and most importantly, **cost constraint compliance**.
  4 | 
  5 | ## 🚨 Critical: No-Cost Constraint Testing
  6 | 
  7 | The most important aspect of this testing suite is validating that the system **NEVER** performs cost-incurring S3 operations. The `no_cost_validation/` tests are critical for customer billing protection.
  8 | 
  9 | ## Test Structure
 10 | 
 11 | ```
 12 | tests/
 13 | ├── conftest.py                    # Shared fixtures and configuration
 14 | ├── pytest.ini                    # Pytest configuration
 15 | ├── requirements-test.txt          # Testing dependencies
 16 | ├── run_tests.py                  # Test runner script
 17 | ├── README.md                     # This file
 18 | ├── unit/                         # Unit tests with mocked dependencies
 19 | │   ├── analyzers/               # Analyzer unit tests
 20 | │   │   ├── test_base_analyzer.py
 21 | │   │   └── test_general_spend_analyzer.py
 22 | │   └── services/                # Service unit tests
 23 | │       └── test_s3_service.py
 24 | ├── integration/                  # Integration tests
 25 | │   └── test_orchestrator_integration.py
 26 | ├── performance/                  # Performance and load tests
 27 | │   └── test_parallel_execution.py
 28 | └── no_cost_validation/          # 🚨 CRITICAL: Cost constraint tests
 29 |     └── test_cost_constraints.py
 30 | ```
 31 | 
 32 | ## Test Categories
 33 | 
 34 | ### 1. Unit Tests (`unit/`)
 35 | - **Purpose**: Test individual components in isolation
 36 | - **Scope**: Analyzers, services, utilities
 37 | - **Dependencies**: Fully mocked AWS services
 38 | - **Speed**: Fast (< 1 second per test)
 39 | - **Coverage**: High code coverage with edge cases
 40 | 
 41 | **Key Features:**
 42 | - Comprehensive mocking of AWS services
 43 | - Parameter validation testing
 44 | - Error handling verification
 45 | - Performance monitoring integration testing
 46 | 
 47 | ### 2. Integration Tests (`integration/`)
 48 | - **Purpose**: Test component interactions and data flow
 49 | - **Scope**: Orchestrator + analyzers + services
 50 | - **Dependencies**: Mocked AWS APIs with realistic responses
 51 | - **Speed**: Medium (1-10 seconds per test)
 52 | - **Coverage**: End-to-end workflows
 53 | 
 54 | **Key Features:**
 55 | - Complete analysis workflow testing
 56 | - Service fallback chain validation
 57 | - Session management integration
 58 | - Error propagation testing
 59 | 
 60 | ### 3. Performance Tests (`performance/`)
 61 | - **Purpose**: Validate performance characteristics and resource usage
 62 | - **Scope**: Parallel execution, timeout handling, memory usage
 63 | - **Dependencies**: Controlled mock delays and resource simulation
 64 | - **Speed**: Slow (10-60 seconds per test)
 65 | - **Coverage**: Performance benchmarks and limits
 66 | 
 67 | **Key Features:**
 68 | - Parallel vs sequential execution comparison
 69 | - Timeout handling validation
 70 | - Memory usage monitoring
 71 | - Concurrent request handling
 72 | - Cache effectiveness testing
 73 | 
 74 | ### 4. No-Cost Constraint Validation (`no_cost_validation/`) 🚨
 75 | - **Purpose**: **CRITICAL** - Ensure no cost-incurring operations are performed
 76 | - **Scope**: All S3 operations across the entire system
 77 | - **Dependencies**: Cost constraint validation framework
 78 | - **Speed**: Fast (< 1 second per test)
 79 | - **Coverage**: 100% of S3 operations
 80 | 
 81 | **Key Features:**
 82 | - Forbidden operation detection
 83 | - Cost constraint system validation
 84 | - Data source cost verification
 85 | - End-to-end cost compliance testing
 86 | - Bypass attempt prevention
 87 | 
 88 | ## Running Tests
 89 | 
 90 | ### Quick Start
 91 | 
 92 | ```bash
 93 | # Check test environment
 94 | python tests/run_tests.py --check
 95 | 
 96 | # Run all tests
 97 | python tests/run_tests.py --all
 98 | 
 99 | # Run specific test suites
100 | python tests/run_tests.py --unit
101 | python tests/run_tests.py --integration
102 | python tests/run_tests.py --performance
103 | python tests/run_tests.py --cost-validation  # 🚨 CRITICAL
104 | ```
105 | 
106 | ### Using pytest directly
107 | 
108 | ```bash
109 | # Install test dependencies
110 | pip install -r tests/requirements-test.txt
111 | 
112 | # Run unit tests with coverage
113 | pytest tests/unit/ --cov=core --cov=services --cov-report=html
114 | 
115 | # Run integration tests
116 | pytest tests/integration/ -v
117 | 
118 | # Run performance tests
119 | pytest tests/performance/ -m performance
120 | 
121 | # Run cost validation tests (CRITICAL)
122 | pytest tests/no_cost_validation/ -m no_cost_validation -v
123 | ```
124 | 
125 | ### Test Markers
126 | 
127 | Tests are organized using pytest markers:
128 | 
129 | - `@pytest.mark.unit` - Unit tests
130 | - `@pytest.mark.integration` - Integration tests  
131 | - `@pytest.mark.performance` - Performance tests
132 | - `@pytest.mark.no_cost_validation` - 🚨 Cost constraint tests
133 | - `@pytest.mark.slow` - Tests that take longer to run
134 | - `@pytest.mark.aws` - Tests requiring real AWS credentials (skipped by default)
135 | 
136 | ## Test Configuration
137 | 
138 | ### Environment Variables
139 | 
140 | ```bash
141 | # AWS credentials for testing (use test account only)
142 | export AWS_ACCESS_KEY_ID=testing
143 | export AWS_SECRET_ACCESS_KEY=testing
144 | export AWS_DEFAULT_REGION=us-east-1
145 | 
146 | # Test configuration
147 | export PYTEST_TIMEOUT=300
148 | export PYTEST_WORKERS=auto
149 | ```
150 | 
151 | ### Coverage Requirements
152 | 
153 | - **Minimum Coverage**: 80%
154 | - **Target Coverage**: 90%+
155 | - **Critical Paths**: 100% (cost constraint validation)
156 | 
157 | ### Performance Benchmarks
158 | 
159 | - **Unit Tests**: < 1 second each
160 | - **Integration Tests**: < 10 seconds each
161 | - **Performance Tests**: < 60 seconds each
162 | - **Full Suite**: < 5 minutes
163 | 
164 | ## Key Testing Patterns
165 | 
166 | ### 1. AWS Service Mocking
167 | 
168 | ```python
169 | @pytest.fixture
170 | def mock_s3_service():
171 |     service = Mock()
172 |     service.list_buckets = AsyncMock(return_value={
173 |         "status": "success",
174 |         "data": {"Buckets": [...]}
175 |     })
176 |     return service
177 | ```
178 | 
179 | ### 2. Cost Constraint Validation
180 | 
181 | ```python
182 | def test_no_forbidden_operations(cost_constraint_validator):
183 |     # Test code that should not call forbidden operations
184 |     analyzer.analyze()
185 |     
186 |     summary = cost_constraint_validator.get_operation_summary()
187 |     assert len(summary["forbidden_called"]) == 0
188 | ```
189 | 
190 | ### 3. Performance Testing
191 | 
192 | ```python
193 | @pytest.mark.performance
194 | async def test_parallel_execution_performance(performance_tracker):
195 |     performance_tracker.start_timer("test")
196 |     await run_parallel_analysis()
197 |     execution_time = performance_tracker.end_timer("test")
198 |     
199 |     performance_tracker.assert_performance("test", max_time=30.0)
200 | ```
201 | 
202 | ### 4. Error Handling Testing
203 | 
204 | ```python
205 | async def test_service_failure_handling():
206 |     with patch('service.api_call', side_effect=Exception("API Error")):
207 |         result = await analyzer.analyze()
208 |     
209 |     assert result["status"] == "error"
210 |     assert "API Error" in result["message"]
211 | ```
212 | 
213 | ## Critical Test Requirements
214 | 
215 | ### 🚨 Cost Constraint Tests MUST Pass
216 | 
217 | The no-cost constraint validation tests are **mandatory** and **must pass** before any deployment:
218 | 
219 | 1. **Forbidden Operation Detection**: Verify all cost-incurring S3 operations are blocked
220 | 2. **Data Source Validation**: Confirm all data sources are genuinely no-cost
221 | 3. **End-to-End Compliance**: Validate entire system respects cost constraints
222 | 4. **Bypass Prevention**: Ensure cost constraints cannot be circumvented
223 | 
224 | ### Test Data Management
225 | 
226 | - **No Real AWS Resources**: All tests use mocked AWS services
227 | - **Deterministic Data**: Test data is predictable and repeatable
228 | - **Edge Cases**: Include boundary conditions and error scenarios
229 | - **Realistic Scenarios**: Mock data reflects real AWS API responses
230 | 
231 | ## Continuous Integration
232 | 
233 | ### Pre-commit Hooks
234 | 
235 | ```bash
236 | # Install pre-commit hooks
237 | pip install pre-commit
238 | pre-commit install
239 | 
240 | # Run manually
241 | pre-commit run --all-files
242 | ```
243 | 
244 | ### CI Pipeline Requirements
245 | 
246 | 1. **All test suites must pass**
247 | 2. **Coverage threshold must be met**
248 | 3. **No-cost constraint tests are mandatory**
249 | 4. **Performance benchmarks must be within limits**
250 | 5. **No security vulnerabilities in dependencies**
251 | 
252 | ## Troubleshooting
253 | 
254 | ### Common Issues
255 | 
256 | 1. **Import Errors**
257 |    ```bash
258 |    # Ensure PYTHONPATH includes project root
259 |    export PYTHONPATH="${PYTHONPATH}:$(pwd)"
260 |    ```
261 | 
262 | 2. **AWS Credential Errors**
263 |    ```bash
264 |    # Use test credentials
265 |    export AWS_ACCESS_KEY_ID=testing
266 |    export AWS_SECRET_ACCESS_KEY=testing
267 |    ```
268 | 
269 | 3. **Timeout Issues**
270 |    ```bash
271 |    # Increase timeout for slow tests
272 |    pytest --timeout=600
273 |    ```
274 | 
275 | 4. **Memory Issues**
276 |    ```bash
277 |    # Run tests with memory profiling
278 |    pytest --memprof
279 |    ```
280 | 
281 | ### Debug Mode
282 | 
283 | ```bash
284 | # Run with debug output
285 | pytest -v --tb=long --log-cli-level=DEBUG
286 | 
287 | # Run single test with debugging
288 | pytest tests/unit/test_specific.py::TestClass::test_method -v -s
289 | ```
290 | 
291 | ## Contributing to Tests
292 | 
293 | ### Adding New Tests
294 | 
295 | 1. **Choose appropriate test category** (unit/integration/performance/cost-validation)
296 | 2. **Follow naming conventions** (`test_*.py`, `Test*` classes, `test_*` methods)
297 | 3. **Use appropriate fixtures** from `conftest.py`
298 | 4. **Add proper markers** (`@pytest.mark.unit`, etc.)
299 | 5. **Include docstrings** explaining test purpose
300 | 6. **Validate cost constraints** if testing S3 operations
301 | 
302 | ### Test Quality Guidelines
303 | 
304 | - **One assertion per test** (when possible)
305 | - **Clear test names** that describe what is being tested
306 | - **Arrange-Act-Assert** pattern
307 | - **Mock external dependencies** completely
308 | - **Test both success and failure paths**
309 | - **Include edge cases and boundary conditions**
310 | 
311 | ## Security Considerations
312 | 
313 | - **No real AWS credentials** in test code
314 | - **No sensitive data** in test fixtures
315 | - **Secure mock data** that doesn't expose patterns
316 | - **Cost constraint validation** is mandatory
317 | - **Regular dependency updates** for security patches
318 | 
319 | ## Reporting and Metrics
320 | 
321 | ### Coverage Reports
322 | 
323 | ```bash
324 | # Generate HTML coverage report
325 | pytest --cov=core --cov=services --cov-report=html
326 | 
327 | # View report
328 | open htmlcov/index.html
329 | ```
330 | 
331 | ### Performance Reports
332 | 
333 | ```bash
334 | # Generate performance benchmark report
335 | pytest tests/performance/ --benchmark-only --benchmark-json=benchmark.json
336 | ```
337 | 
338 | ### Test Reports
339 | 
340 | ```bash
341 | # Generate comprehensive test report
342 | python tests/run_tests.py --report
343 | 
344 | # View reports
345 | open test_report.html
346 | ```
347 | 
348 | ---
349 | 
350 | ## 🚨 Remember: Cost Constraint Compliance is Critical
351 | 
352 | The primary purpose of this testing suite is to ensure that the S3 optimization system **never incurs costs** for customers. The no-cost constraint validation tests are the most important tests in this suite and must always pass.
353 | 
354 | **Before any deployment or release:**
355 | 1. Run `python tests/run_tests.py --cost-validation`
356 | 2. Verify all cost constraint tests pass
357 | 3. Review any new S3 operations for cost implications
358 | 4. Update forbidden operations list if needed
359 | 
360 | **Customer billing protection is our top priority.**
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | # CFM Tips - Cost Optimization MCP Server
  2 | 
  3 | A comprehensive Model Context Protocol (MCP) server for AWS cost analysis and optimization recommendations, designed to work seamlessly with Amazon Q CLI and other MCP-compatible clients.
  4 | 
  5 | ## ✅ Features
  6 | 
  7 | ### Core AWS Services Integration
  8 | - **Cost Explorer** - Retrieve cost data and usage metrics
  9 | - **Cost Optimization Hub** - Get AWS cost optimization recommendations
 10 | - **Compute Optimizer** - Right-sizing recommendations for compute resources
 11 | - **Trusted Advisor** - Cost optimization checks and recommendations
 12 | - **Performance Insights** - RDS performance metrics and analysis
 13 | 
 14 | ### Cost Optimization Playbooks
 15 | - 🔧 **EC2 Right Sizing** - Identify underutilized EC2 instances
 16 | - 💾 **EBS Optimization** - Find unused and underutilized volumes
 17 | - 🗄️ **RDS Optimization** - Identify idle and underutilized databases
 18 | - ⚡ **Lambda Optimization** - Find overprovisioned and unused functions
 19 | - 🪣 **S3 Optimization** - Comprehensive S3 cost analysis and storage class optimization
 20 | - 📊 **CloudWatch Optimization** - Analyze logs, metrics, alarms, and dashboards for cost efficiency
 21 | - 📋 **CloudTrail Optimization** - Analyze and optimize CloudTrail configurations
 22 | - 📊 **Comprehensive Analysis** - Multi-service cost analysis
 23 | 
 24 | ### Advanced Features
 25 | - **Real CloudWatch Metrics** - Uses actual AWS metrics for analysis
 26 | - **Multiple Output Formats** - JSON and Markdown report generation
 27 | - **Cost Calculations** - Estimated savings and cost breakdowns
 28 | - **Actionable Recommendations** - Priority-based optimization suggestions
 29 | 
 30 | ## 📁 Project Structure
 31 | 
 32 | ```
 33 | sample-cfm-tips-mcp/
 34 | ├── playbooks/                            # CFM Tips optimization playbooks engine
 35 | │   ├── s3_optimization.py               # S3 cost optimization playbook
 36 | │   ├── ec2_optimization.py              # EC2 right-sizing playbook
 37 | │   ├── ebs_optimization.py              # EBS volume optimization playbook
 38 | │   ├── rds_optimization.py              # RDS database optimization playbook
 39 | │   ├── lambda_optimization.py           # Lambda function optimization playbook
 40 | │   ├── cloudwatch_optimization.py       # CloudWatch optimization playbook
 41 | │   └── cloudtrail_optimization.py       # CloudTrail optimization playbook
 42 | ├── services/                             # AWS Services as datasources for the cost optimization
 43 | │   ├── s3_service.py                    # S3 API interactions and metrics
 44 | │   ├── s3_pricing.py                    # S3 pricing calculations and cost modeling
 45 | │   ├── cost_explorer.py                 # Cost Explorer API integration
 46 | │   ├── compute_optimizer.py             # Compute Optimizer API integration
 47 | │   └── optimization_hub.py              # Cost Optimization Hub integration
 48 | ├── mcp_server_with_runbooks.py           # Main MCP server
 49 | ├── mcp_runbooks.json                     # Template file for MCP configuration file
 50 | ├── requirements.txt                      # Python dependencies
 51 | ├── test/                                 # Integration tests
 52 | ├── diagnose_cost_optimization_hub_v2.py  # Diagnostic utilities
 53 | ├── RUNBOOKS_GUIDE.md                     # Detailed usage guide
 54 | └── README.md                             # Project ReadMe
 55 | ```
 56 | 
 57 | ## 🔐 Security and Permissions - Least Privileges
 58 | 
 59 | The MCP tools require specific AWS permissions to function. 
 60 | -  **Create a read-only IAM role** - Restricts LLM agents from modifying AWS resources. This prevents unintended create, update, or delete actions. 
 61 | -  **Enable CloudTrail** - Tracks API activity across your AWS account for security monitoring. 
 62 | -  **Follow least-privilege principles** - Grant only essential read permissions (Describe*, List*, Get*) for required services.
 63 | 
 64 | The below creates an IAM policy with for list, read and describe actions only:
 65 | 
 66 | ```json
 67 | {
 68 |   "Version": "2012-10-17",
 69 |   "Statement": [
 70 |     {
 71 |       "Effect": "Allow",
 72 |       "Action": [
 73 |         "cost-optimization-hub:ListEnrollmentStatuses",
 74 |         "cost-optimization-hub:ListRecommendations",
 75 |         "cost-optimization-hub:GetRecommendation", 
 76 |         "cost-optimization-hub:ListRecommendationSummaries",
 77 |         "ce:GetCostAndUsage",
 78 |         "ce:GetCostForecast",
 79 |         "compute-optimizer:GetEC2InstanceRecommendations",
 80 |         "compute-optimizer:GetEBSVolumeRecommendations",
 81 |         "compute-optimizer:GetLambdaFunctionRecommendations",
 82 |         "ec2:DescribeInstances",
 83 |         "ec2:DescribeVolumes",
 84 |         "rds:DescribeDBInstances",
 85 |         "lambda:ListFunctions",
 86 |         "cloudwatch:GetMetricStatistics",
 87 |         "s3:ListBucket",
 88 |         "s3:ListObjectsV2",
 89 |         "s3:GetBucketLocation",
 90 |         "s3:GetBucketVersioning",
 91 |         "s3:GetBucketLifecycleConfiguration",
 92 |         "s3:GetBucketNotification",
 93 |         "s3:GetBucketTagging",
 94 |         "s3:ListMultipartUploads",
 95 |         "s3:GetStorageLensConfiguration",
 96 |         "support:DescribeTrustedAdvisorChecks",
 97 |         "support:DescribeTrustedAdvisorCheckResult",
 98 |         "pi:GetResourceMetrics",
 99 |         "cloudtrail:DescribeTrails",
100 |         "cloudtrail:GetTrailStatus",
101 |         "cloudtrail:GetEventSelectors",
102 |         "pricing:GetProducts",
103 |         "pricing:DescribeServices",
104 |         "pricing:GetAttributeValues"
105 |       ],
106 |       "Resource": "*"
107 |     }
108 |   ]
109 | }
110 | ```
111 | 
112 | ## 🛠️ Installation
113 | 
114 | ### Prerequisites
115 | - **Python 3.11** or higher
116 | - AWS CLI configured with appropriate credentials
117 | - Amazon Kiro CLI (for MCP integration) - https://docs.aws.amazon.com/amazonq/latest/qdeveloper-ug/command-line-installing.html
118 | 
119 | ### Setup Steps
120 | 
121 | 1. **Clone the Repository**
122 |    ```bash
123 |    git clone https://github.com/aws-samples/sample-cfm-tips-mcp.git
124 |    cd sample-cfm-tips-mcp
125 |    ```
126 | 
127 | 2. **Install Dependencies**
128 |    ```bash
129 |    pip install -r requirements.txt
130 |    ```
131 | 
132 | 3. **Configure AWS Credentials**
133 |    ```bash
134 |    aws configure
135 |    # Or set environment variables:
136 |    # export AWS_ACCESS_KEY_ID=your_access_key
137 |    # export AWS_SECRET_ACCESS_KEY=your_secret_key
138 |    # export AWS_DEFAULT_REGION=us-east-1
139 |    ```
140 | 
141 | 4. **Apply IAM Permissions**
142 |    - Create an IAM policy with the permissions listed above
143 |    - Attach the policy to your IAM user or role
144 | 
145 | 5. **Install the MCP Configurations**
146 |    ```bash
147 |    python3 setup.py
148 |    ```
149 | 
150 | 6. **Usage Option 1: Using the Kiro CLI Chat**
151 |    ```bash
152 |    kiro-cli
153 |    Show me cost optimization recommendations
154 |    ```
155 | 
156 | 7. **Usage Option 2: Integrate with Amazon Q Developer Plugin or Kiro**
157 |    - Open Amazon Q Developer Plugin on your IDE
158 |    - Click on Chat -> 🛠️ Configure MCP Servers -> ➕ Add new MCP
159 |    - Use the following configuration
160 |    ```bash
161 |    - Scope: Global
162 |    - Name: cfm-tips
163 |    - Transport: stdio
164 |    - Command: python3
165 |    - Arguments: <replace-with-path-to-folder>/mcp_server_with_runbooks.py
166 |    - Timeout: 60
167 |    ```
168 | 
169 | ## 🔧 Available Tools
170 | 
171 | ### Cost Analysis Tools
172 | - `get_cost_explorer_data` - Retrieve AWS cost and usage data
173 | - `list_coh_enrollment` - Check Cost Optimization Hub enrollment
174 | - `get_coh_recommendations` - Get cost optimization recommendations
175 | - `get_coh_summaries` - Get recommendation summaries
176 | - `get_compute_optimizer_recommendations` - Get compute optimization recommendations
177 | 
178 | ### EC2 Optimization
179 | - `ec2_rightsizing` - Analyze EC2 instances for right-sizing opportunities
180 | - `ec2_report` - Generate detailed EC2 optimization reports
181 | 
182 | ### EBS Optimization
183 | - `ebs_optimization` - Analyze EBS volumes for optimization
184 | - `ebs_unused` - Identify unused EBS volumes
185 | - `ebs_report` - Generate EBS optimization reports
186 | 
187 | ### RDS Optimization
188 | - `rds_optimization` - Analyze RDS instances for optimization
189 | - `rds_idle` - Identify idle RDS instances
190 | - `rds_report` - Generate RDS optimization reports
191 | 
192 | ### Lambda Optimization
193 | - `lambda_optimization` - Analyze Lambda functions for optimization
194 | - `lambda_unused` - Identify unused Lambda functions
195 | - `lambda_report` - Generate Lambda optimization reports
196 | 
197 | ### S3 Optimization
198 | - `s3_general_spend_analysis` - Analyze overall S3 spending patterns and usage
199 | - `s3_storage_class_selection` - Get guidance on choosing cost-effective storage classes
200 | - `s3_storage_class_validation` - Validate existing data storage class appropriateness
201 | - `s3_archive_optimization` - Identify and optimize long-term archive data storage
202 | - `s3_api_cost_minimization` - Minimize S3 API request charges through optimization
203 | - `s3_multipart_cleanup` - Identify and clean up incomplete multipart uploads
204 | - `s3_governance_check` - Implement S3 cost controls and governance compliance
205 | - `s3_comprehensive_analysis` - Run comprehensive S3 cost optimization analysis
206 | 
207 | ### CloudWatch Optimization
208 | - `cloudwatch_general_spend_analysis` - Analyze CloudWatch cost breakdown across logs, metrics, alarms, and dashboards
209 | - `cloudwatch_metrics_optimization` - Identify custom metrics cost optimization opportunities
210 | - `cloudwatch_logs_optimization` - Analyze log retention and ingestion cost optimization
211 | - `cloudwatch_alarms_and_dashboards_optimization` - Identify monitoring efficiency improvements
212 | - `cloudwatch_comprehensive_optimization_tool` - Run comprehensive CloudWatch optimization with intelligent orchestration
213 | - `get_cloudwatch_cost_estimate` - Get detailed cost estimate for CloudWatch optimization analysis
214 | 
215 | ### CloudTrail Optimization
216 | - `get_management_trails` - Get CloudTrail management trails
217 | - `run_cloudtrail_trails_analysis` - Run CloudTrail trails analysis for optimization
218 | - `generate_cloudtrail_report` - Generate CloudTrail optimization reports
219 | 
220 | ### Comprehensive Analysis
221 | - `comprehensive_analysis` - Multi-service cost analysis
222 | 
223 | ### Additional Tools
224 | - `get_trusted_advisor_checks` - Get Trusted Advisor recommendations
225 | - `get_performance_insights_metrics` - Get RDS Performance Insights data
226 | 
227 | ## 📊 Example Usage
228 | 
229 | ### Basic Cost Analysis
230 | ```
231 | "Get my AWS costs for the last month"
232 | "Show me cost optimization recommendations"
233 | "What are my biggest cost drivers?"
234 | ```
235 | 
236 | ### Resource Optimization
237 | ```
238 | "Find underutilized EC2 instances in us-east-1"
239 | "Show me unused EBS volumes that I can delete"
240 | "Identify idle RDS databases"
241 | "Find unused Lambda functions"
242 | "Analyze my S3 storage costs and recommend optimizations"
243 | "Find incomplete multipart uploads in my S3 buckets"
244 | "Recommend the best S3 storage class for my data"
245 | "Analyze my CloudWatch logs and metrics for cost optimization"
246 | "Show me CloudWatch alarms that can be optimized"
247 | ```
248 | 
249 | ### Report Generation
250 | ```
251 | "Generate a comprehensive cost optimization report"
252 | "Create an EC2 right-sizing report in markdown format"
253 | "Generate an EBS optimization report with cost savings"
254 | ```
255 | 
256 | ### Multi-Service Analysis
257 | ```
258 | "Run comprehensive cost analysis for all services in us-east-1"
259 | "Analyze my AWS infrastructure for cost optimization opportunities"
260 | "Show me immediate cost savings opportunities"
261 | "Generate a comprehensive S3 optimization report"
262 | "Analyze my S3 spending patterns and storage class efficiency"
263 | ```
264 | 
265 | ## 🔍 Troubleshooting
266 | 
267 | ### Common Issues
268 | 
269 | 1. **Cost Optimization Hub Not Working**
270 |    ```bash
271 |    python3 diagnose_cost_optimization_hub_v2.py
272 |    ```
273 | 
274 | 2. **No Metrics Found**
275 |    - Ensure resources have been running for at least 14 days
276 |    - Verify CloudWatch metrics are enabled
277 |    - Check that you're analyzing the correct region
278 | 
279 | 3. **Permission Errors**
280 |    - Verify IAM permissions are correctly applied
281 |    - Check AWS credentials configuration
282 |    - Ensure Cost Optimization Hub is enabled in AWS Console
283 | 
284 | 4. **Import Errors**
285 |    ```bash
286 |    # Check Python path and dependencies
287 |    python3 -c "import boto3, mcp; print('Dependencies OK')"
288 |    ```
289 | 
290 | ### Getting Help
291 | 
292 | - Check the [RUNBOOKS_GUIDE.md](RUNBOOKS_GUIDE.md) for detailed usage instructions
293 | - Run the diagnostic script: `python3 diagnose_cost_optimization_hub_v2.py`
294 | - Run integration tests: `python3 test_runbooks.py`
295 | 
296 | ## 🧩 Add-on MCPs
297 | Add-on AWS Pricing MCP Server MCP server for accessing real-time AWS pricing information and providing cost analysis capabilities
298 | https://github.com/awslabs/mcp/tree/main/src/aws-pricing-mcp-server
299 | 
300 | ```bash
301 | # Example usage with Add-on AWS Pricing MCP Server:
302 | "Review the CDK by comparing it to the actual spend from my AWS account's stackset. Suggest cost optimization opportunities for the app accordingly"
303 | ```
304 | 
305 | ## 🪣 S3 Optimization Features
306 | 
307 | The S3 optimization module provides comprehensive cost analysis and optimization recommendations:
308 | 
309 | ### Storage Class Optimization
310 | - **Intelligent Storage Class Selection** - Get recommendations for the most cost-effective storage class based on access patterns
311 | - **Storage Class Validation** - Analyze existing data to ensure optimal storage class usage
312 | - **Cost Breakeven Analysis** - Calculate when to transition between storage classes
313 | - **Archive Optimization** - Identify long-term data suitable for Glacier or Deep Archive
314 | 
315 | ### Cost Analysis & Monitoring
316 | - **General Spend Analysis** - Comprehensive S3 spending pattern analysis over 12 months
317 | - **Bucket-Level Cost Ranking** - Identify highest-cost buckets and optimization opportunities
318 | - **Usage Type Breakdown** - Analyze costs by storage, requests, and data transfer
319 | - **Regional Cost Distribution** - Understand spending across AWS regions
320 | 
321 | ### Operational Optimization
322 | - **Multipart Upload Cleanup** - Identify and eliminate incomplete multipart uploads
323 | - **API Cost Minimization** - Optimize request patterns to reduce API charges
324 | - **Governance Compliance** - Implement cost controls and policy compliance checking
325 | - **Lifecycle Policy Recommendations** - Automated suggestions for lifecycle transitions
326 | 
327 | ### Advanced Analytics
328 | - **Real-Time Pricing Integration** - Uses AWS Price List API for accurate cost calculations
329 | - **Trend Analysis** - Identify spending growth patterns and anomalies
330 | - **Efficiency Metrics** - Calculate cost per GB and storage efficiency ratios
331 | - **Comprehensive Reporting** - Generate detailed optimization reports in JSON or Markdown
332 | 
333 | ## 🎯 Key Benefits
334 | 
335 | - **Immediate Cost Savings** - Identify unused resources for deletion
336 | - **Right-Sizing Opportunities** - Optimize overprovisioned resources
337 | - **Real Metrics Analysis** - Uses actual CloudWatch data
338 | - **Actionable Reports** - Clear recommendations with cost estimates
339 | - **Comprehensive Coverage** - Analyze EC2, EBS, RDS, Lambda, S3, and more
340 | - **Easy Integration** - Works seamlessly with Amazon Q CLI
341 | 
342 | ## 📈 Expected Results
343 | 
344 | The CFM Tips cost optimization server can help you:
345 | 
346 | - **Identify cost savings** on average across all AWS services
347 | - **Find unused resources** costing hundreds of dollars monthly
348 | - **Right-size overprovisioned instances** for optimal performance/cost ratio
349 | - **Optimize storage costs** through volume type and storage class recommendations
350 | - **Eliminate idle resources** that provide no business value
351 | - **Reduce S3 costs by 30-60%** through intelligent storage class transitions
352 | - **Clean up storage waste** from incomplete multipart uploads and orphaned data
353 | - **Optimize API request patterns** to minimize S3 request charges
354 | - **Reduce CloudWatch costs** through log retention and metrics optimization
355 | - **Eliminate unused alarms and dashboards** reducing monitoring overhead
356 | 
357 | ## 🤝 Contributing
358 | 
359 | We welcome contributions! Please see our contributing guidelines:
360 | 
361 | 1. Fork the repository
362 | 2. Create a feature branch
363 | 3. Make your changes
364 | 4. Add tests for new functionality
365 | 5. Submit a pull request
366 | 
367 | ## 📄 License
368 | 
369 | This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
370 | 
371 | ---
372 | 
```

--------------------------------------------------------------------------------
/CODE_OF_CONDUCT.md:
--------------------------------------------------------------------------------

```markdown
1 | ## Code of Conduct
2 | This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct).
3 | For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact
4 | [email protected] with any additional questions or comments.
5 | 
```

--------------------------------------------------------------------------------
/CONTRIBUTING.md:
--------------------------------------------------------------------------------

```markdown
  1 | # Contributing to CFM Tips AWS Cost Optimization MCP Server
  2 | 
  3 | We welcome contributions to the CFM Tips AWS Cost Optimization MCP Server! This document provides guidelines for contributing to the project.
  4 | 
  5 | ## 🤝 How to Contribute
  6 | 
  7 | ### Reporting Issues
  8 | 
  9 | 1. **Search existing issues** first to avoid duplicates
 10 | 2. **Use the issue template** when creating new issues
 11 | 3. **Provide detailed information** including:
 12 |    - AWS region and services affected
 13 |    - Error messages and logs
 14 |    - Steps to reproduce
 15 |    - Expected vs actual behavior
 16 | 
 17 | ### Suggesting Features
 18 | 
 19 | 1. **Check existing feature requests** to avoid duplicates
 20 | 2. **Describe the use case** and business value
 21 | 3. **Provide implementation ideas** if you have them
 22 | 4. **Consider backward compatibility**
 23 | 
 24 | ### Code Contributions
 25 | 
 26 | #### Prerequisites
 27 | 
 28 | - Python 3.11 or higher
 29 | - AWS CLI configured with test credentials
 30 | - Familiarity with MCP (Model Context Protocol)
 31 | - Understanding of AWS cost optimization concepts
 32 | 
 33 | #### Development Setup
 34 | 
 35 | 1. **Fork the repository**
 36 |    ```bash
 37 |    git clone https://github.com/aws-samples/sample-cfm-tips-mcp.git
 38 |    cd sample-cfm-tips-mcp
 39 |    ```
 40 | 
 41 | 2. **Create a virtual environment**
 42 |    ```bash
 43 |    python3 -m venv venv
 44 |    source venv/bin/activate  # On Windows: venv\Scripts\activate
 45 |    ```
 46 | 
 47 | 3. **Install dependencies**
 48 |    ```bash
 49 |    pip install -r requirements.txt
 50 |    pip install -r requirements_dev.txt  # If available
 51 |    ```
 52 | 
 53 | 4. **Run tests**
 54 |    ```bash
 55 |    python3 test_runbooks.py
 56 |    ```
 57 | 
 58 | #### Making Changes
 59 | 
 60 | 1. **Create a feature branch**
 61 |    ```bash
 62 |    git checkout -b feature/your-feature-name
 63 |    ```
 64 | 
 65 | 2. **Make your changes**
 66 |    - Follow existing code style and patterns
 67 |    - Add docstrings to new functions
 68 |    - Include error handling
 69 |    - Update documentation as needed
 70 | 
 71 | 3. **Test your changes**
 72 |    ```bash
 73 |    # Run integration tests
 74 |    python3 test_runbooks.py
 75 |    
 76 |    # Test specific functionality
 77 |    python3 -c "from runbook_functions import your_function; print('OK')"
 78 |    
 79 |    # Test with Amazon Q (if possible)
 80 |    q chat
 81 |    ```
 82 | 
 83 | 4. **Update documentation**
 84 |    - Update README.md if needed
 85 |    - Update RUNBOOKS_GUIDE.md for new features
 86 |    - Add examples for new tools
 87 | 
 88 | #### Code Style Guidelines
 89 | 
 90 | - **Follow PEP 8** Python style guidelines
 91 | - **Use descriptive variable names**
 92 | - **Add type hints** where appropriate
 93 | - **Include docstrings** for all functions
 94 | - **Handle errors gracefully** with informative messages
 95 | - **Use async/await** for MCP tool functions
 96 | 
 97 | #### Example Code Structure
 98 | 
 99 | ```python
100 | async def your_new_tool(arguments: Dict[str, Any]) -> List[TextContent]:
101 |     """
102 |     Brief description of what the tool does.
103 |     
104 |     Args:
105 |         arguments: Dictionary containing tool parameters
106 |         
107 |     Returns:
108 |         List of TextContent with results
109 |     """
110 |     try:
111 |         # Validate inputs
112 |         region = arguments.get("region")
113 |         if not region:
114 |             return [TextContent(type="text", text="Error: region parameter is required")]
115 |         
116 |         # Create AWS client
117 |         client = boto3.client('service-name', region_name=region)
118 |         
119 |         # Make API calls
120 |         response = client.some_api_call()
121 |         
122 |         # Process results
123 |         result = {
124 |             "status": "success",
125 |             "data": response,
126 |             "message": "Operation completed successfully"
127 |         }
128 |         
129 |         return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
130 |         
131 |     except ClientError as e:
132 |         error_msg = f"AWS API Error: {e.response['Error']['Code']} - {e.response['Error']['Message']}"
133 |         return [TextContent(type="text", text=f"Error: {error_msg}")]
134 |     except Exception as e:
135 |         return [TextContent(type="text", text=f"Error: {str(e)}")]
136 | ```
137 | 
138 | #### Adding New Tools
139 | 
140 | 1. **Add tool definition** to `list_tools()` in `mcp_server_with_runbooks.py`
141 | 2. **Add tool handler** to `call_tool()` function
142 | 3. **Implement tool function** in `runbook_functions.py`
143 | 4. **Add tests** for the new functionality
144 | 5. **Update documentation**
145 | 
146 | #### Submitting Changes
147 | 
148 | 1. **Commit your changes**
149 |    ```bash
150 |    git add .
151 |    git commit -m "feat: add new cost optimization tool for XYZ"
152 |    ```
153 | 
154 | 2. **Push to your fork**
155 |    ```bash
156 |    git push origin feature/your-feature-name
157 |    ```
158 | 
159 | 3. **Create a Pull Request**
160 |    - Use a descriptive title
161 |    - Explain what the PR does and why
162 |    - Reference any related issues
163 |    - Include testing instructions
164 | 
165 | ## 📋 Pull Request Guidelines
166 | 
167 | ### PR Title Format
168 | - `feat:` for new features
169 | - `fix:` for bug fixes
170 | - `docs:` for documentation changes
171 | - `refactor:` for code refactoring
172 | - `test:` for test additions/changes
173 | 
174 | ### PR Description Should Include
175 | - **What** the PR does
176 | - **Why** the change is needed
177 | - **How** to test the changes
178 | - **Screenshots** if UI changes are involved
179 | - **Breaking changes** if any
180 | 
181 | ### Review Process
182 | 1. **Automated checks** must pass
183 | 2. **Code review** by maintainers
184 | 3. **Testing** in different environments
185 | 4. **Documentation** review if applicable
186 | 
187 | ## 🧪 Testing Guidelines
188 | 
189 | ### Integration Tests
190 | - All existing tests must pass
191 | - Add tests for new functionality
192 | - Test with real AWS resources when possible
193 | - Include error case testing
194 | 
195 | ### Manual Testing
196 | - Test with Amazon Q CLI
197 | - Verify tool responses are properly formatted
198 | - Check error handling with invalid inputs
199 | - Test in different AWS regions
200 | 
201 | ## 📚 Documentation Standards
202 | 
203 | ### Code Documentation
204 | - **Docstrings** for all public functions
205 | - **Inline comments** for complex logic
206 | - **Type hints** for function parameters and returns
207 | 
208 | ### User Documentation
209 | - **Clear examples** for new tools
210 | - **Parameter descriptions** with types and defaults
211 | - **Error scenarios** and troubleshooting tips
212 | - **Use cases** and expected outcomes
213 | 
214 | ## 🏷️ Release Process
215 | 
216 | 1. **Version bumping** follows semantic versioning
217 | 2. **Changelog** is updated with new features and fixes
218 | 3. **Documentation** is updated for new releases
219 | 4. **Testing** is performed across different environments
220 | 
221 | ## 🆘 Getting Help
222 | 
223 | - **GitHub Issues** for bugs and feature requests
224 | - **GitHub Discussions** for questions and ideas
225 | - **Documentation** for usage guidelines
226 | - **Code comments** for implementation details
227 | 
228 | ## 📜 Code of Conduct
229 | 
230 | - Be respectful and inclusive
231 | - Focus on constructive feedback
232 | - Help others learn and grow
233 | - Follow the project's coding standards
234 | 
235 | ## 🙏 Recognition
236 | 
237 | Contributors will be recognized in:
238 | - README.md contributors section
239 | - Release notes for significant contributions
240 | - GitHub contributor graphs
241 | 
242 | Thank you for contributing to CFM Tips AWS Cost Optimization MCP Server!
243 | 
```

--------------------------------------------------------------------------------
/tests/legacy/test_metrics_pagination_count.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/tests/legacy/test_pricing_cache_fix_moved.py:
--------------------------------------------------------------------------------

```python
1 | 
```

--------------------------------------------------------------------------------
/tests/unit/__init__.py:
--------------------------------------------------------------------------------

```python
1 | # Unit tests package
```

--------------------------------------------------------------------------------
/tests/integration/__init__.py:
--------------------------------------------------------------------------------

```python
1 | # Integration tests package
```

--------------------------------------------------------------------------------
/tests/performance/__init__.py:
--------------------------------------------------------------------------------

```python
1 | # Performance tests package
```

--------------------------------------------------------------------------------
/tests/unit/services/__init__.py:
--------------------------------------------------------------------------------

```python
1 | # Service unit tests package
```

--------------------------------------------------------------------------------
/tests/unit/analyzers/__init__.py:
--------------------------------------------------------------------------------

```python
1 | # Analyzer unit tests package
```

--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------

```python
1 | # Test package for S3 Optimization System
```

--------------------------------------------------------------------------------
/playbooks/__init__.py:
--------------------------------------------------------------------------------

```python
1 | # AWS Cost Optimization Playbooks package
2 | 
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
1 | mcp>=1.0.0
2 | boto3>=1.28.0
3 | botocore>=1.31.0
4 | psutil>=5.8.0
5 | 
```

--------------------------------------------------------------------------------
/services/__init__.py:
--------------------------------------------------------------------------------

```python
1 | # AWS Cost Analysis MCP Server services package
2 | 
3 | from .storage_lens_service import StorageLensService
4 | 
5 | __all__ = ['StorageLensService']
6 | 
```

--------------------------------------------------------------------------------
/playbooks/cloudtrail/__init__.py:
--------------------------------------------------------------------------------

```python
1 | """
2 | CloudTrail optimization playbooks
3 | 
4 | This package contains CloudTrail-specific cost optimization playbooks including
5 | trail analysis and logging cost optimization.
6 | """
```

--------------------------------------------------------------------------------
/playbooks/aws_lambda/__init__.py:
--------------------------------------------------------------------------------

```python
1 | """
2 | Lambda optimization playbooks
3 | 
4 | This package contains Lambda-specific cost optimization playbooks including
5 | memory optimization and unused function identification.
6 | """
```

--------------------------------------------------------------------------------
/playbooks/rds/__init__.py:
--------------------------------------------------------------------------------

```python
1 | """
2 | RDS optimization playbooks
3 | 
4 | This package contains RDS-specific cost optimization playbooks including
5 | database right-sizing and performance optimization recommendations.
6 | """
```

--------------------------------------------------------------------------------
/playbooks/ebs/__init__.py:
--------------------------------------------------------------------------------

```python
1 | """
2 | EBS optimization playbooks
3 | 
4 | This package contains EBS-specific cost optimization playbooks including
5 | volume utilization analysis and storage optimization recommendations.
6 | """
```

--------------------------------------------------------------------------------
/playbooks/ec2/__init__.py:
--------------------------------------------------------------------------------

```python
1 | """
2 | EC2 optimization playbooks
3 | 
4 | This package contains EC2-specific cost optimization playbooks including
5 | right-sizing, instance type recommendations, and utilization analysis.
6 | """
```

--------------------------------------------------------------------------------
/utils/__init__.py:
--------------------------------------------------------------------------------

```python
1 | """
2 | Utilities package for CFM Tips MCP Server
3 | 
4 | This package contains utility modules for logging, performance monitoring,
5 | session management, and other cross-cutting concerns.
6 | """
```

--------------------------------------------------------------------------------
/playbooks/s3/__init__.py:
--------------------------------------------------------------------------------

```python
1 | """
2 | S3 optimization playbooks
3 | 
4 | This package contains S3-specific cost optimization playbooks including
5 | storage class optimization, lifecycle policies, and unused resource cleanup.
6 | """
```

--------------------------------------------------------------------------------
/tests/requirements-test.txt:
--------------------------------------------------------------------------------

```
 1 | # Testing dependencies for S3 optimization system
 2 | 
 3 | # Core testing framework
 4 | pytest>=7.0.0
 5 | pytest-asyncio>=0.21.0
 6 | pytest-cov>=4.0.0
 7 | pytest-mock>=3.10.0
 8 | pytest-timeout>=2.1.0
 9 | 
10 | # AWS mocking
11 | moto[s3,cloudwatch,ce,s3control]>=4.2.0
12 | boto3>=1.28.0
13 | botocore>=1.31.0
14 | 
15 | # Performance testing
16 | pytest-benchmark>=4.0.0
17 | memory-profiler>=0.60.0
18 | 
19 | # Parallel testing (optional)
20 | pytest-xdist>=3.0.0
21 | 
22 | # Test reporting
23 | pytest-html>=3.1.0
24 | pytest-json-report>=1.5.0
25 | 
26 | # Code quality
27 | flake8>=5.0.0
28 | black>=22.0.0
29 | isort>=5.10.0
30 | 
31 | # Type checking
32 | mypy>=1.0.0
33 | types-boto3>=1.0.0
34 | 
35 | # Additional utilities
36 | freezegun>=1.2.0  # For time-based testing
37 | responses>=0.23.0  # For HTTP mocking
38 | factory-boy>=3.2.0  # For test data generation
```

--------------------------------------------------------------------------------
/playbooks/s3/analyzers/__init__.py:
--------------------------------------------------------------------------------

```python
 1 | """
 2 | S3 Optimization Analyzers Package
 3 | 
 4 | This package contains all S3 optimization analyzers that extend BaseAnalyzer.
 5 | Each analyzer focuses on a specific aspect of S3 cost optimization.
 6 | """
 7 | 
 8 | from .general_spend_analyzer import GeneralSpendAnalyzer
 9 | from .storage_class_analyzer import StorageClassAnalyzer
10 | from .archive_optimization_analyzer import ArchiveOptimizationAnalyzer
11 | from .api_cost_analyzer import ApiCostAnalyzer
12 | from .multipart_cleanup_analyzer import MultipartCleanupAnalyzer
13 | from .governance_analyzer import GovernanceAnalyzer
14 | 
15 | __all__ = [
16 |     'GeneralSpendAnalyzer',
17 |     'StorageClassAnalyzer', 
18 |     'ArchiveOptimizationAnalyzer',
19 |     'ApiCostAnalyzer',
20 |     'MultipartCleanupAnalyzer',
21 |     'GovernanceAnalyzer'
22 | ]
```

--------------------------------------------------------------------------------
/mcp_runbooks.json:
--------------------------------------------------------------------------------

```json
 1 | {
 2 |   "mcpServers": {
 3 |     "cfm-tips": {
 4 |       "command": "python3",
 5 |       "args": [
 6 |         "<replace-with-project-folder-path>/mcp_server_with_runbooks.py"
 7 |       ],
 8 |       "env": {
 9 |         "AWS_DEFAULT_REGION": "us-east-1",
10 |         "AWS_PROFILE": "<replace-with-your-aws-profile>",
11 |         "PYTHONPATH": "<replace-with-project-folder-path>"
12 |       },
13 |       "disabled": false,
14 |       "autoApprove": []
15 |     },
16 |     "awslabs.aws-pricing-mcp-server": {
17 |       "command": "uvx",
18 |       "args": [
19 |         "awslabs.aws-pricing-mcp-server@latest"
20 |       ],
21 |       "env": {
22 |         "FASTMCP_LOG_LEVEL": "ERROR",
23 |         "AWS_PROFILE": "<replace-with-your-aws-profile>",
24 |         "AWS_REGION": "us-east-1"
25 |       },
26 |       "disabled": false,
27 |       "autoApprove": []
28 |     }
29 |   }
30 | }
```

--------------------------------------------------------------------------------
/playbooks/cloudwatch/__init__.py:
--------------------------------------------------------------------------------

```python
 1 | """
 2 | CloudWatch Optimization Playbook for CFM Tips MCP Server
 3 | 
 4 | Provides comprehensive CloudWatch cost analysis and optimization recommendations.
 5 | """
 6 | 
 7 | from .optimization_orchestrator import CloudWatchOptimizationOrchestrator
 8 | from .base_analyzer import BaseAnalyzer
 9 | from .cloudwatch_optimization import (
10 |     run_cloudwatch_general_spend_analysis_mcp,
11 |     run_cloudwatch_metrics_optimization_mcp,
12 |     run_cloudwatch_logs_optimization_mcp,
13 |     run_cloudwatch_alarms_and_dashboards_optimization_mcp,
14 |     run_cloudwatch_comprehensive_optimization_tool_mcp,
15 |     query_cloudwatch_analysis_results_mcp,
16 |     validate_cloudwatch_cost_preferences_mcp,
17 |     get_cloudwatch_cost_estimate_mcp
18 | )
19 | 
20 | __all__ = [
21 |     'CloudWatchOptimizationOrchestrator',
22 |     'BaseAnalyzer',
23 |     'run_cloudwatch_general_spend_analysis_mcp',
24 |     'run_cloudwatch_metrics_optimization_mcp',
25 |     'run_cloudwatch_logs_optimization_mcp',
26 |     'run_cloudwatch_alarms_and_dashboards_optimization_mcp',
27 |     'run_cloudwatch_comprehensive_optimization_tool_mcp',
28 |     'query_cloudwatch_analysis_results_mcp',
29 |     'validate_cloudwatch_cost_preferences_mcp',
30 |     'get_cloudwatch_cost_estimate_mcp'
31 | ]
```

--------------------------------------------------------------------------------
/tests/unit/s3/live/test_bucket_listing.py:
--------------------------------------------------------------------------------

```python
 1 | """
 2 | Simple test to verify S3 bucket listing works.
 3 | """
 4 | 
 5 | import asyncio
 6 | import boto3
 7 | 
 8 | 
 9 | async def test_list_buckets():
10 |     """Test basic S3 bucket listing."""
11 |     
12 |     s3_client = boto3.client('s3')
13 |     
14 |     try:
15 |         response = s3_client.list_buckets()
16 |         buckets = response.get('Buckets', [])
17 |         
18 |         print(f"\n=== Found {len(buckets)} S3 Buckets ===")
19 |         
20 |         for bucket in buckets[:10]:  # Show first 10
21 |             bucket_name = bucket['Name']
22 |             creation_date = bucket['CreationDate']
23 |             
24 |             # Try to get bucket location
25 |             try:
26 |                 location_response = s3_client.get_bucket_location(Bucket=bucket_name)
27 |                 region = location_response.get('LocationConstraint') or 'us-east-1'
28 |             except Exception as e:
29 |                 region = f"Error: {str(e)}"
30 |             
31 |             print(f"\nBucket: {bucket_name}")
32 |             print(f"  Region: {region}")
33 |             print(f"  Created: {creation_date}")
34 |         
35 |         return len(buckets)
36 |         
37 |     except Exception as e:
38 |         print(f"\nError listing buckets: {str(e)}")
39 |         return 0
40 | 
41 | 
42 | if __name__ == "__main__":
43 |     count = asyncio.run(test_list_buckets())
44 |     print(f"\n\nTotal buckets: {count}")
45 | 
```

--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_cloudwatch_unit_suite.py:
--------------------------------------------------------------------------------

```python
 1 | #!/usr/bin/env python3
 2 | """
 3 | CloudWatch Unit Test Suite Runner
 4 | 
 5 | Runs all CloudWatch unit tests including pagination tests.
 6 | """
 7 | 
 8 | import pytest
 9 | import sys
10 | import os
11 | 
12 | # Add the project root to the path
13 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))
14 | 
15 | 
16 | class TestCloudWatchUnitSuite:
17 |     """CloudWatch unit test suite runner."""
18 |     
19 |     def test_run_all_cloudwatch_unit_tests(self):
20 |         """Run all CloudWatch unit tests."""
21 |         # Get the directory containing this file
22 |         test_dir = os.path.dirname(__file__)
23 |         
24 |         # Run all test files in the cloudwatch unit test directory, excluding this suite runner
25 |         exit_code = pytest.main([
26 |             test_dir,
27 |             '-v',
28 |             '--tb=short',
29 |             '--disable-warnings',
30 |             '--ignore=' + __file__  # Exclude this suite runner to prevent recursion
31 |         ])
32 |         
33 |         assert exit_code == 0, "CloudWatch unit tests failed"
34 | 
35 | 
36 | if __name__ == '__main__':
37 |     # Run the CloudWatch unit test suite
38 |     test_dir = os.path.dirname(__file__)
39 |     exit_code = pytest.main([
40 |         test_dir,
41 |         '-v',
42 |         '--tb=short',
43 |         '--ignore=' + __file__  # Exclude this suite runner to prevent recursion
44 |     ])
45 |     
46 |     sys.exit(exit_code)
```

--------------------------------------------------------------------------------
/tests/pytest.ini:
--------------------------------------------------------------------------------

```
 1 | [tool:pytest]
 2 | # Pytest configuration for S3 optimization testing
 3 | 
 4 | # Test discovery
 5 | testpaths = tests
 6 | python_files = test_*.py
 7 | python_classes = Test*
 8 | python_functions = test_*
 9 | 
10 | # Markers
11 | markers =
12 |     unit: Unit tests with mocked dependencies
13 |     integration: Integration tests with multiple components
14 |     performance: Performance and load tests
15 |     no_cost_validation: Critical tests for cost constraint validation
16 |     slow: Tests that take longer to run
17 |     aws: Tests that require AWS credentials (skipped by default)
18 |     cloudwatch: CloudWatch-specific tests
19 | 
20 | # Output and reporting
21 | addopts = 
22 |     --verbose
23 |     --tb=short
24 |     --strict-markers
25 |     --strict-config
26 |     --disable-warnings
27 |     --color=yes
28 |     --durations=10
29 |     --cov=core
30 |     --cov=services
31 |     --cov-report=term-missing
32 |     --cov-report=html:htmlcov
33 |     --cov-fail-under=80
34 | 
35 | # Async support
36 | asyncio_mode = auto
37 | 
38 | # Logging
39 | log_cli = true
40 | log_cli_level = INFO
41 | log_cli_format = %(asctime)s [%(levelname)8s] %(name)s: %(message)s
42 | log_cli_date_format = %Y-%m-%d %H:%M:%S
43 | 
44 | # Warnings
45 | filterwarnings =
46 |     ignore::DeprecationWarning
47 |     ignore::PendingDeprecationWarning
48 |     ignore::UserWarning:boto3.*
49 |     ignore::UserWarning:botocore.*
50 | 
51 | # Minimum Python version
52 | minversion = 3.8
53 | 
54 | # Test timeout (in seconds)
55 | timeout = 300
56 | 
57 | # Parallel execution
58 | # addopts = -n auto  # Uncomment to enable parallel execution with pytest-xdist
```

--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_import_error.py:
--------------------------------------------------------------------------------

```python
 1 | """
 2 | Test to replicate the CloudWatchServiceFactory import error.
 3 | 
 4 | This test verifies that the import error occurs when trying to import
 5 | CloudWatchServiceFactory from services.cloudwatch_service.
 6 | """
 7 | 
 8 | import pytest
 9 | 
10 | 
11 | def test_cloudwatch_service_factory_import_error():
12 |     """Test that CloudWatchServiceFactory import fails as expected."""
13 |     with pytest.raises(ImportError, match="cannot import name 'CloudWatchServiceFactory'"):
14 |         from services.cloudwatch_service import CloudWatchServiceFactory
15 | 
16 | 
17 | def test_cloudwatch_optimization_analyzer_import_success():
18 |     """Test that CloudWatchOptimizationAnalyzer import now works after fix."""
19 |     from playbooks.cloudwatch.cloudwatch_optimization_analyzer import CloudWatchOptimizationAnalyzer
20 |     assert CloudWatchOptimizationAnalyzer is not None
21 | 
22 | 
23 | def test_correct_imports_work():
24 |     """Test that correct imports from cloudwatch_service work."""
25 |     from services.cloudwatch_service import (
26 |         CWGeneralSpendTips,
27 |         CWMetricsTips,
28 |         CWLogsTips,
29 |         CWAlarmsTips,
30 |         CWDashboardTips,
31 |         CloudWatchService,
32 |         create_cloudwatch_service
33 |     )
34 |     
35 |     # Verify all imports are classes/functions
36 |     assert CWGeneralSpendTips is not None
37 |     assert CWMetricsTips is not None
38 |     assert CWLogsTips is not None
39 |     assert CWAlarmsTips is not None
40 |     assert CWDashboardTips is not None
41 |     assert CloudWatchService is not None
42 |     assert create_cloudwatch_service is not None
43 | 
```

--------------------------------------------------------------------------------
/tests/legacy/test_cloudwatch_timeout_issue.py:
--------------------------------------------------------------------------------

```python
 1 | #!/usr/bin/env python3
 2 | """
 3 | Test to replicate the CloudWatch timeout issue and verify stack trace reporting.
 4 | """
 5 | 
 6 | import asyncio
 7 | import json
 8 | import traceback
 9 | import sys
10 | import os
11 | 
12 | # Add the project root to the path
13 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
14 | 
15 | from runbook_functions import run_cloudwatch_general_spend_analysis
16 | 
17 | 
18 | async def test_cloudwatch_timeout():
19 |     """Test CloudWatch general spend analysis to replicate timeout issue."""
20 |     print("Testing CloudWatch general spend analysis timeout issue...")
21 |     
22 |     try:
23 |         # Test with minimal parameters that should trigger the timeout
24 |         arguments = {
25 |             "region": "us-east-1",
26 |             "lookback_days": 7,
27 |             "page": 1
28 |         }
29 |         
30 |         print(f"Calling run_cloudwatch_general_spend_analysis with: {arguments}")
31 |         
32 |         # This should timeout and we should get a full stack trace
33 |         result = await run_cloudwatch_general_spend_analysis(arguments)
34 |         
35 |         print("Result received:")
36 |         for content in result:
37 |             print(content.text)
38 |             
39 |         return True
40 |         
41 |     except Exception as e:
42 |         print(f"Exception caught in test: {str(e)}")
43 |         print("Full stack trace:")
44 |         traceback.print_exc()
45 |         return False
46 | 
47 | 
48 | if __name__ == "__main__":
49 |     success = asyncio.run(test_cloudwatch_timeout())
50 |     if success:
51 |         print("✅ Test completed successfully")
52 |     else:
53 |         print("❌ Test failed")
54 |         sys.exit(1)
```

--------------------------------------------------------------------------------
/tests/unit/test_unit_suite.py:
--------------------------------------------------------------------------------

```python
 1 | #!/usr/bin/env python3
 2 | """
 3 | Unit Test Suite Runner - Second Level Suite
 4 | Runs all unit tests across all playbooks.
 5 | """
 6 | 
 7 | import pytest
 8 | import sys
 9 | import os
10 | 
11 | # Add the project root to the path
12 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..'))
13 | 
14 | 
15 | def run_unit_tests():
16 |     """Run all unit tests across all playbooks."""
17 |     print("🧪 Running Unit Test Suite")
18 |     print("=" * 50)
19 |     
20 |     # Define test directories for each playbook (relative to tests directory)
21 |     base_dir = os.path.dirname(os.path.dirname(__file__))  # Go up to tests directory
22 |     test_dirs = [
23 |         os.path.join(base_dir, "unit/cloudwatch/"),
24 |         os.path.join(base_dir, "unit/ec2/"),
25 |         os.path.join(base_dir, "unit/s3/"),
26 |         # Add other playbooks as they are organized
27 |     ]
28 |     
29 |     # Filter to only existing directories
30 |     existing_dirs = [d for d in test_dirs if os.path.exists(d)]
31 |     
32 |     if not existing_dirs:
33 |         print("❌ No unit test directories found")
34 |         return False
35 |     
36 |     print(f"Running unit tests from: {existing_dirs}")
37 |     
38 |     # Run pytest on all unit test directories
39 |     exit_code = pytest.main([
40 |         "-v",
41 |         "--tb=short",
42 |         "--color=yes",
43 |         *existing_dirs
44 |     ])
45 |     
46 |     success = exit_code == 0
47 |     
48 |     if success:
49 |         print("\n🎉 ALL UNIT TESTS PASSED!")
50 |     else:
51 |         print(f"\n❌ UNIT TESTS FAILED (exit code: {exit_code})")
52 |     
53 |     return success
54 | 
55 | 
56 | if __name__ == "__main__":
57 |     success = run_unit_tests()
58 |     sys.exit(0 if success else 1)
```

--------------------------------------------------------------------------------
/tests/pytest-cloudwatch.ini:
--------------------------------------------------------------------------------

```
 1 | [tool:pytest]
 2 | # CloudWatch optimization testing configuration
 3 | 
 4 | # Test discovery
 5 | testpaths = .
 6 | python_files = test_*.py *_test.py
 7 | python_classes = Test*
 8 | python_functions = test_*
 9 | 
10 | # Markers
11 | markers =
12 |     unit: Unit tests
13 |     integration: Integration tests
14 |     performance: Performance tests
15 |     no_cost_validation: Tests that validate no unexpected costs
16 |     cloudwatch: CloudWatch-specific tests
17 |     slow: Slow running tests
18 |     asyncio: Async tests
19 | 
20 | # Async support
21 | asyncio_mode = auto
22 | 
23 | # Output options
24 | addopts = 
25 |     --strict-markers
26 |     --strict-config
27 |     --tb=short
28 |     --maxfail=10
29 |     --durations=10
30 |     -ra
31 | 
32 | # Logging
33 | log_cli = true
34 | log_cli_level = INFO
35 | log_cli_format = %(asctime)s [%(levelname)8s] %(name)s: %(message)s
36 | log_cli_date_format = %Y-%m-%d %H:%M:%S
37 | 
38 | # Warnings
39 | filterwarnings =
40 |     ignore::DeprecationWarning
41 |     ignore::PendingDeprecationWarning
42 |     ignore::UserWarning:moto.*
43 |     error::pytest.PytestUnraisableExceptionWarning
44 | 
45 | # Minimum version
46 | minversion = 6.0
47 | 
48 | # Test timeout (for performance tests)
49 | timeout = 300
50 | 
51 | # Coverage options (when --cov is used)
52 | # These are applied when pytest-cov is installed and --cov is used
53 | [coverage:run]
54 | source = 
55 |     playbooks/cloudwatch
56 |     services/cloudwatch_service.py
57 |     services/cloudwatch_pricing.py
58 | 
59 | omit =
60 |     */tests/*
61 |     */test_*
62 |     */__pycache__/*
63 |     */venv/*
64 |     */.venv/*
65 | 
66 | [coverage:report]
67 | exclude_lines =
68 |     pragma: no cover
69 |     def __repr__
70 |     if self.debug:
71 |     if settings.DEBUG
72 |     raise AssertionError
73 |     raise NotImplementedError
74 |     if 0:
75 |     if __name__ == .__main__.:
76 |     class .*\bProtocol\):
77 |     @(abc\.)?abstractmethod
78 | 
79 | show_missing = true
80 | precision = 2
81 | skip_covered = false
82 | 
83 | [coverage:html]
84 | directory = htmlcov
85 | title = CloudWatch Optimization Test Coverage
```

--------------------------------------------------------------------------------
/tests/legacy/test_stack_trace_fix.py:
--------------------------------------------------------------------------------

```python
 1 | #!/usr/bin/env python3
 2 | """
 3 | Test to verify stack traces are now properly captured in CloudWatch functions.
 4 | """
 5 | 
 6 | import asyncio
 7 | import json
 8 | import sys
 9 | import os
10 | 
11 | # Add the project root to the path
12 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
13 | 
14 | from runbook_functions import run_cloudwatch_general_spend_analysis
15 | 
16 | 
17 | async def test_stack_trace_capture():
18 |     """Test that CloudWatch functions now capture full stack traces."""
19 |     print("Testing CloudWatch stack trace capture...")
20 |     
21 |     # Test with invalid region to trigger an error
22 |     arguments = {
23 |         "region": "invalid-region-12345",  # This should cause an error
24 |         "lookback_days": 1,
25 |         "page": 1
26 |     }
27 |     
28 |     print(f"Calling run_cloudwatch_general_spend_analysis with invalid region: {arguments}")
29 |     
30 |     try:
31 |         result = await run_cloudwatch_general_spend_analysis(arguments)
32 |         
33 |         print("Result received:")
34 |         for content in result:
35 |             result_text = content.text
36 |             print(result_text)
37 |             
38 |             # Check if the result contains a full stack trace
39 |             if "Full stack trace:" in result_text:
40 |                 print("✅ SUCCESS: Full stack trace found in error response")
41 |                 return True
42 |             else:
43 |                 print("❌ FAILURE: No full stack trace found in error response")
44 |                 return False
45 |                 
46 |     except Exception as e:
47 |         print(f"❌ FAILURE: Exception not handled properly: {str(e)}")
48 |         return False
49 | 
50 | 
51 | if __name__ == "__main__":
52 |     success = asyncio.run(test_stack_trace_capture())
53 |     if success:
54 |         print("\n✅ Stack trace fix verification PASSED")
55 |     else:
56 |         print("\n❌ Stack trace fix verification FAILED")
57 |         sys.exit(1)
```

--------------------------------------------------------------------------------
/tests/legacy/test_pricing_cache_fix.py:
--------------------------------------------------------------------------------

```python
 1 | #!/usr/bin/env python3
 2 | """
 3 | Test to verify the CloudWatch pricing cache fix works.
 4 | """
 5 | 
 6 | import sys
 7 | import os
 8 | import time
 9 | 
10 | # Add the project root to the path
11 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
12 | 
13 | from services.cloudwatch_pricing import CloudWatchPricing
14 | 
15 | 
16 | def test_pricing_cache():
17 |     """Test that pricing calls are cached and don't block."""
18 |     print("Testing CloudWatch pricing cache fix...")
19 |     
20 |     # Initialize pricing service
21 |     pricing = CloudWatchPricing(region='us-east-1')
22 |     
23 |     # First call - should use fallback pricing and cache it
24 |     print("Making first pricing call...")
25 |     start_time = time.time()
26 |     result1 = pricing.get_metrics_pricing()
27 |     first_call_time = time.time() - start_time
28 |     
29 |     print(f"First call took {first_call_time:.3f} seconds")
30 |     print(f"Status: {result1.get('status')}")
31 |     print(f"Source: {result1.get('source')}")
32 |     
33 |     # Second call - should use cache and be instant
34 |     print("\nMaking second pricing call...")
35 |     start_time = time.time()
36 |     result2 = pricing.get_metrics_pricing()
37 |     second_call_time = time.time() - start_time
38 |     
39 |     print(f"Second call took {second_call_time:.3f} seconds")
40 |     print(f"Status: {result2.get('status')}")
41 |     print(f"Source: {result2.get('source')}")
42 |     
43 |     # Verify caching worked
44 |     if second_call_time < 0.001:  # Should be nearly instant
45 |         print("✅ SUCCESS: Caching is working - second call was instant")
46 |         return True
47 |     else:
48 |         print("❌ FAILURE: Caching not working - second call took too long")
49 |         return False
50 | 
51 | 
52 | if __name__ == "__main__":
53 |     success = test_pricing_cache()
54 |     if success:
55 |         print("\n✅ Pricing cache fix verification PASSED")
56 |     else:
57 |         print("\n❌ Pricing cache fix verification FAILED")
58 |         sys.exit(1)
```

--------------------------------------------------------------------------------
/tests/legacy/test_documentation_links.py:
--------------------------------------------------------------------------------

```python
 1 | #!/usr/bin/env python3
 2 | """
 3 | Test script for documentation links functionality
 4 | """
 5 | 
 6 | import json
 7 | from utils.documentation_links import add_documentation_links, get_service_documentation, format_documentation_section
 8 | 
 9 | def test_documentation_links():
10 |     """Test the documentation links functionality"""
11 |     
12 |     print("Testing documentation links functionality...\n")
13 |     
14 |     # Test 1: Basic result with EC2 service
15 |     print("1. Testing EC2 service documentation links:")
16 |     ec2_result = {
17 |         "status": "success",
18 |         "data": {
19 |             "underutilized_instances": [],
20 |             "count": 0,
21 |             "total_monthly_savings": 0
22 |         },
23 |         "message": "Found 0 underutilized EC2 instances"
24 |     }
25 |     
26 |     enhanced_result = add_documentation_links(ec2_result, "ec2")
27 |     print(json.dumps(enhanced_result, indent=2))
28 |     print()
29 |     
30 |     # Test 2: S3 service documentation
31 |     print("2. Testing S3 service documentation links:")
32 |     s3_result = {
33 |         "status": "success",
34 |         "data": {
35 |             "buckets_analyzed": 5,
36 |             "total_savings": 150.50
37 |         }
38 |     }
39 |     
40 |     enhanced_s3_result = add_documentation_links(s3_result, "s3")
41 |     print(json.dumps(enhanced_s3_result, indent=2))
42 |     print()
43 |     
44 |     # Test 3: General documentation (no specific service)
45 |     print("3. Testing general documentation links:")
46 |     general_result = {
47 |         "status": "success",
48 |         "message": "Cost analysis completed"
49 |     }
50 |     
51 |     enhanced_general_result = add_documentation_links(general_result)
52 |     print(json.dumps(enhanced_general_result, indent=2))
53 |     print()
54 |     
55 |     # Test 4: Get service-specific documentation
56 |     print("4. Testing service-specific documentation retrieval:")
57 |     rds_docs = get_service_documentation("rds")
58 |     print("RDS Documentation:")
59 |     for title, url in rds_docs.items():
60 |         print(f"  - {title}: {url}")
61 |     print()
62 |     
63 |     # Test 5: Format standalone documentation section
64 |     print("5. Testing standalone documentation section:")
65 |     lambda_docs = format_documentation_section("lambda")
66 |     print(json.dumps(lambda_docs, indent=2))
67 |     print()
68 |     
69 |     print("All tests completed successfully!")
70 | 
71 | if __name__ == "__main__":
72 |     test_documentation_links()
```

--------------------------------------------------------------------------------
/services/cloudwatch_pricing.py:
--------------------------------------------------------------------------------

```python
 1 | """
 2 | CloudWatch Pricing - Backward compatibility wrapper.
 3 | 
 4 | This module provides backward compatibility for code that expects CloudWatchPricing.
 5 | The actual pricing logic is now internal to cloudwatch_service.py via AWSPricingDAO.
 6 | """
 7 | 
 8 | import logging
 9 | from typing import Dict, Any, Optional
10 | 
11 | logger = logging.getLogger(__name__)
12 | 
13 | 
14 | class CloudWatchPricing:
15 |     """
16 |     Backward compatibility wrapper for CloudWatch pricing.
17 |     
18 |     This class provides the same interface as before but delegates to the internal
19 |     AWSPricingDAO class in cloudwatch_service.py.
20 |     """
21 |     
22 |     def __init__(self, region: str = 'us-east-1'):
23 |         """Initialize pricing service."""
24 |         self.region = region
25 |         
26 |         # Import here to avoid circular dependency
27 |         from services.cloudwatch_service import AWSPricingDAO
28 |         self._pricing_dao = AWSPricingDAO(region=region)
29 |         
30 |         logger.debug(f"CloudWatchPricing initialized for region: {region}")
31 |     
32 |     def get_pricing_data(self, component: str) -> Dict[str, Any]:
33 |         """Get pricing data for CloudWatch components."""
34 |         return self._pricing_dao.get_pricing_data(component)
35 |     
36 |     def get_free_tier_limits(self) -> Dict[str, Any]:
37 |         """Get free tier limits for CloudWatch services."""
38 |         return self._pricing_dao.get_free_tier_limits()
39 |     
40 |     def calculate_cost(self, component: str, usage: Dict[str, Any]) -> Dict[str, Any]:
41 |         """Calculate costs for CloudWatch components."""
42 |         return self._pricing_dao.calculate_cost(component, usage)
43 |     
44 |     def calculate_logs_cost(self, usage: Dict[str, Any]) -> Dict[str, Any]:
45 |         """Calculate CloudWatch Logs costs."""
46 |         pricing = self.get_pricing_data('logs')
47 |         return self._pricing_dao._calculate_logs_cost(usage, pricing)
48 |     
49 |     def calculate_metrics_cost(self, usage: Dict[str, Any]) -> Dict[str, Any]:
50 |         """Calculate CloudWatch Metrics costs."""
51 |         pricing = self.get_pricing_data('metrics')
52 |         return self._pricing_dao._calculate_metrics_cost(usage, pricing)
53 |     
54 |     def calculate_alarms_cost(self, usage: Dict[str, Any]) -> Dict[str, Any]:
55 |         """Calculate CloudWatch Alarms costs."""
56 |         pricing = self.get_pricing_data('alarms')
57 |         return self._pricing_dao._calculate_alarms_cost(usage, pricing)
58 |     
59 |     def calculate_dashboards_cost(self, usage: Dict[str, Any]) -> Dict[str, Any]:
60 |         """Calculate CloudWatch Dashboards costs."""
61 |         pricing = self.get_pricing_data('dashboards')
62 |         return self._pricing_dao._calculate_dashboards_cost(usage, pricing)
63 | 
```

--------------------------------------------------------------------------------
/tests/performance/test_performance_suite.py:
--------------------------------------------------------------------------------

```python
 1 | #!/usr/bin/env python3
 2 | """
 3 | Performance Test Suite Runner - Second Level Suite
 4 | Runs all performance tests across all playbooks.
 5 | """
 6 | 
 7 | import sys
 8 | import os
 9 | import importlib.util
10 | 
11 | # Add the project root to the path
12 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..'))
13 | 
14 | 
15 | def run_performance_tests():
16 |     """Run all performance tests across all playbooks."""
17 |     print("⚡ Running Performance Test Suite")
18 |     print("=" * 50)
19 |     
20 |     # Define performance test modules for each playbook (relative to tests directory)
21 |     base_dir = os.path.dirname(os.path.dirname(__file__))  # Go up to tests directory
22 |     test_modules = [
23 |         ("CloudWatch Performance", os.path.join(base_dir, "performance/cloudwatch/test_cloudwatch_performance.py")),
24 |         # Add other playbooks as they are organized
25 |         # ("EC2 Performance", os.path.join(base_dir, "performance/ec2/test_ec2_performance.py")),
26 |         # ("S3 Performance", os.path.join(base_dir, "performance/s3/test_s3_performance.py")),
27 |     ]
28 |     
29 |     total_passed = 0
30 |     total_failed = 0
31 |     
32 |     for test_name, test_path in test_modules:
33 |         if not os.path.exists(test_path):
34 |             print(f"⚠️  Skipping {test_name}: {test_path} not found")
35 |             continue
36 |             
37 |         print(f"\n🔄 Running {test_name}...")
38 |         
39 |         try:
40 |             # Load and run the test module
41 |             spec = importlib.util.spec_from_file_location("test_module", test_path)
42 |             test_module = importlib.util.module_from_spec(spec)
43 |             spec.loader.exec_module(test_module)
44 |             
45 |             # Run the main function if it exists
46 |             if hasattr(test_module, 'main'):
47 |                 success = test_module.main()
48 |                 if success:
49 |                     total_passed += 1
50 |                     print(f"✅ {test_name} PASSED")
51 |                 else:
52 |                     total_failed += 1
53 |                     print(f"❌ {test_name} FAILED")
54 |             else:
55 |                 print(f"⚠️  {test_name}: No main() function found")
56 |                 
57 |         except Exception as e:
58 |             total_failed += 1
59 |             print(f"❌ {test_name} FAILED with exception: {e}")
60 |     
61 |     print("\n" + "=" * 50)
62 |     print(f"Performance Test Results: {total_passed + total_failed} total, {total_passed} passed, {total_failed} failed")
63 |     
64 |     success = total_failed == 0
65 |     
66 |     if success:
67 |         print("🎉 ALL PERFORMANCE TESTS PASSED!")
68 |     else:
69 |         print(f"❌ {total_failed} PERFORMANCE TESTS FAILED")
70 |     
71 |     return success
72 | 
73 | 
74 | if __name__ == "__main__":
75 |     success = run_performance_tests()
76 |     sys.exit(0 if success else 1)
```

--------------------------------------------------------------------------------
/tests/integration/test_integration_suite.py:
--------------------------------------------------------------------------------

```python
 1 | #!/usr/bin/env python3
 2 | """
 3 | Integration Test Suite Runner - Second Level Suite
 4 | Runs all integration tests across all playbooks.
 5 | """
 6 | 
 7 | import asyncio
 8 | import sys
 9 | import os
10 | import importlib.util
11 | 
12 | # Add the project root to the path
13 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..'))
14 | 
15 | 
16 | async def run_integration_tests():
17 |     """Run all integration tests across all playbooks."""
18 |     print("🔗 Running Integration Test Suite")
19 |     print("=" * 50)
20 |     
21 |     # Define integration test modules for each playbook (relative to tests directory)
22 |     base_dir = os.path.dirname(os.path.dirname(__file__))  # Go up to tests directory
23 |     test_modules = [
24 |         ("CloudWatch Integration", os.path.join(base_dir, "integration/cloudwatch/test_cloudwatch_integration.py")),
25 |         # Add other playbooks as they are organized
26 |         # ("EC2 Integration", os.path.join(base_dir, "integration/ec2/test_ec2_integration.py")),
27 |         # ("S3 Integration", os.path.join(base_dir, "integration/s3/test_s3_integration.py")),
28 |     ]
29 |     
30 |     total_passed = 0
31 |     total_failed = 0
32 |     
33 |     for test_name, test_path in test_modules:
34 |         if not os.path.exists(test_path):
35 |             print(f"⚠️  Skipping {test_name}: {test_path} not found")
36 |             continue
37 |             
38 |         print(f"\n🔄 Running {test_name}...")
39 |         
40 |         try:
41 |             # Load and run the test module
42 |             spec = importlib.util.spec_from_file_location("test_module", test_path)
43 |             test_module = importlib.util.module_from_spec(spec)
44 |             spec.loader.exec_module(test_module)
45 |             
46 |             # Run the main function if it exists
47 |             if hasattr(test_module, 'run_cloudwatch_integration_tests'):
48 |                 success = await test_module.run_cloudwatch_integration_tests()
49 |                 if success:
50 |                     total_passed += 1
51 |                     print(f"✅ {test_name} PASSED")
52 |                 else:
53 |                     total_failed += 1
54 |                     print(f"❌ {test_name} FAILED")
55 |             elif hasattr(test_module, 'main'):
56 |                 # Handle sync main functions
57 |                 success = test_module.main()
58 |                 if success:
59 |                     total_passed += 1
60 |                     print(f"✅ {test_name} PASSED")
61 |                 else:
62 |                     total_failed += 1
63 |                     print(f"❌ {test_name} FAILED")
64 |             else:
65 |                 print(f"⚠️  {test_name}: No main() or run_*_integration_tests() function found")
66 |                 
67 |         except Exception as e:
68 |             total_failed += 1
69 |             print(f"❌ {test_name} FAILED with exception: {e}")
70 |     
71 |     print("\n" + "=" * 50)
72 |     print(f"Integration Test Results: {total_passed + total_failed} total, {total_passed} passed, {total_failed} failed")
73 |     
74 |     success = total_failed == 0
75 |     
76 |     if success:
77 |         print("🎉 ALL INTEGRATION TESTS PASSED!")
78 |     else:
79 |         print(f"❌ {total_failed} INTEGRATION TESTS FAILED")
80 |     
81 |     return success
82 | 
83 | 
84 | if __name__ == "__main__":
85 |     success = asyncio.run(run_integration_tests())
86 |     sys.exit(0 if success else 1)
```

--------------------------------------------------------------------------------
/services/trusted_advisor.py:
--------------------------------------------------------------------------------

```python
 1 | """
 2 | AWS Trusted Advisor service module.
 3 | 
 4 | This module provides functions for interacting with the AWS Trusted Advisor API.
 5 | """
 6 | 
 7 | import logging
 8 | from typing import Dict, List, Optional, Any
 9 | import boto3
10 | from botocore.exceptions import ClientError
11 | 
12 | from utils.error_handler import AWSErrorHandler, ResponseFormatter
13 | from utils.aws_client_factory import get_trusted_advisor_client
14 | 
15 | logger = logging.getLogger(__name__)
16 | 
17 | def get_trusted_advisor_checks(
18 |     check_categories: Optional[List[str]] = None,
19 |     region: Optional[str] = None
20 | ) -> Dict[str, Any]:
21 |     """
22 |     Get AWS Trusted Advisor check results.
23 |     
24 |     Args:
25 |         check_categories: List of check categories to filter
26 |         region: AWS region (optional)
27 |         
28 |     Returns:
29 |         Dictionary containing the Trusted Advisor check results
30 |     """
31 |     try:
32 |         # Trusted Advisor is only available in us-east-1
33 |         support_client = get_trusted_advisor_client()
34 |         
35 |         # Get available checks
36 |         checks_response = support_client.describe_trusted_advisor_checks(language='en')
37 |         checks = checks_response['checks']
38 |         
39 |         # Filter by categories if specified
40 |         if check_categories:
41 |             checks = [check for check in checks if check['category'] in check_categories]
42 |             
43 |         # Get results for each check
44 |         results = []
45 |         for check in checks:
46 |             # Ensure check is a dictionary
47 |             if not isinstance(check, dict):
48 |                 logger.warning(f"Unexpected check format in Trusted Advisor response: {type(check)}")
49 |                 continue
50 |                 
51 |             check_id = check.get('id')
52 |             check_name = check.get('name', 'Unknown')
53 |             
54 |             if not check_id:
55 |                 logger.warning(f"Check missing ID: {check_name}")
56 |                 continue
57 |                 
58 |             try:
59 |                 result = support_client.describe_trusted_advisor_check_result(
60 |                     checkId=check_id,
61 |                     language='en'
62 |                 )
63 |                 
64 |                 # Validate result structure
65 |                 if 'result' in result and isinstance(result['result'], dict):
66 |                     results.append({
67 |                         'check_id': check_id,
68 |                         'name': check_name,
69 |                         'category': check.get('category', 'unknown'),
70 |                         'result': result['result']
71 |                     })
72 |                 else:
73 |                     logger.warning(f"Invalid result structure for check {check_name}")
74 |                     
75 |             except Exception as check_error:
76 |                 logger.warning(f"Error getting result for check {check_name}: {str(check_error)}")
77 |                 
78 |         return ResponseFormatter.success_response(
79 |             data={"checks": results, "count": len(results)},
80 |             message=f"Retrieved {len(results)} Trusted Advisor check results",
81 |             analysis_type="trusted_advisor_checks"
82 |         )
83 |         
84 |     except ClientError as e:
85 |         return AWSErrorHandler.format_client_error(
86 |             e, 
87 |             "get_trusted_advisor_checks",
88 |             ["support:DescribeTrustedAdvisorChecks", "support:DescribeTrustedAdvisorCheckResult"]
89 |         )
90 |         
91 |     except Exception as e:
92 |         return AWSErrorHandler.format_general_error(e, "get_trusted_advisor_checks")
```

--------------------------------------------------------------------------------
/runbook_functions_extended.py:
--------------------------------------------------------------------------------

```python
 1 | # Extended EC2 runbook functions
 2 | import json
 3 | from typing import Dict, List, Any
 4 | from mcp.types import TextContent
 5 | 
 6 | from playbooks.ec2_optimization import (
 7 |     get_graviton_compatible_instances, get_burstable_instances_analysis,
 8 |     get_spot_instance_opportunities, get_unused_capacity_reservations,
 9 |     get_scheduling_opportunities, get_commitment_plan_recommendations,
10 |     get_governance_violations, generate_comprehensive_ec2_report
11 | )
12 | 
13 | async def identify_graviton_compatible_instances(arguments: Dict[str, Any]) -> List[TextContent]:
14 |     try:
15 |         result = get_graviton_compatible_instances(region=arguments.get("region"))
16 |         return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
17 |     except Exception as e:
18 |         return [TextContent(type="text", text=f"Error: {str(e)}")]
19 | 
20 | async def analyze_burstable_instances(arguments: Dict[str, Any]) -> List[TextContent]:
21 |     try:
22 |         result = get_burstable_instances_analysis(
23 |             region=arguments.get("region"),
24 |             lookback_period_days=arguments.get("lookback_period_days", 14)
25 |         )
26 |         return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
27 |     except Exception as e:
28 |         return [TextContent(type="text", text=f"Error: {str(e)}")]
29 | 
30 | async def identify_spot_opportunities(arguments: Dict[str, Any]) -> List[TextContent]:
31 |     try:
32 |         result = get_spot_instance_opportunities(region=arguments.get("region"))
33 |         return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
34 |     except Exception as e:
35 |         return [TextContent(type="text", text=f"Error: {str(e)}")]
36 | 
37 | async def identify_unused_reservations(arguments: Dict[str, Any]) -> List[TextContent]:
38 |     try:
39 |         result = get_unused_capacity_reservations(region=arguments.get("region"))
40 |         return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
41 |     except Exception as e:
42 |         return [TextContent(type="text", text=f"Error: {str(e)}")]
43 | 
44 | async def identify_scheduling_opportunities(arguments: Dict[str, Any]) -> List[TextContent]:
45 |     try:
46 |         result = get_scheduling_opportunities(region=arguments.get("region"))
47 |         return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
48 |     except Exception as e:
49 |         return [TextContent(type="text", text=f"Error: {str(e)}")]
50 | 
51 | async def analyze_commitment_plans(arguments: Dict[str, Any]) -> List[TextContent]:
52 |     try:
53 |         result = get_commitment_plan_recommendations(region=arguments.get("region"))
54 |         return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
55 |     except Exception as e:
56 |         return [TextContent(type="text", text=f"Error: {str(e)}")]
57 | 
58 | async def identify_governance_violations(arguments: Dict[str, Any]) -> List[TextContent]:
59 |     try:
60 |         result = get_governance_violations(region=arguments.get("region"))
61 |         return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
62 |     except Exception as e:
63 |         return [TextContent(type="text", text=f"Error: {str(e)}")]
64 | 
65 | async def generate_comprehensive_report(arguments: Dict[str, Any]) -> List[TextContent]:
66 |     try:
67 |         result = generate_comprehensive_ec2_report(region=arguments.get("region"))
68 |         return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
69 |     except Exception as e:
70 |         return [TextContent(type="text", text=f"Error: {str(e)}")]
```

--------------------------------------------------------------------------------
/services/performance_insights.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | AWS Performance Insights service module.
  3 | 
  4 | This module provides functions for interacting with the AWS Performance Insights API.
  5 | """
  6 | 
  7 | import logging
  8 | from typing import Dict, List, Optional, Any
  9 | import boto3
 10 | from datetime import datetime, timedelta
 11 | from botocore.exceptions import ClientError
 12 | 
 13 | from utils.error_handler import AWSErrorHandler, ResponseFormatter
 14 | from utils.aws_client_factory import get_performance_insights_client
 15 | 
 16 | logger = logging.getLogger(__name__)
 17 | 
 18 | def get_performance_insights_metrics(
 19 |     db_instance_identifier: str,
 20 |     start_time: Optional[str] = None,
 21 |     end_time: Optional[str] = None,
 22 |     region: Optional[str] = None
 23 | ) -> Dict[str, Any]:
 24 |     """
 25 |     Get Performance Insights metrics for an RDS instance.
 26 |     
 27 |     Args:
 28 |         db_instance_identifier: RDS instance identifier
 29 |         start_time: Start time for metrics (ISO format)
 30 |         end_time: End time for metrics (ISO format)
 31 |         region: AWS region (optional)
 32 |         
 33 |     Returns:
 34 |         Dictionary containing the Performance Insights metrics
 35 |     """
 36 |     try:
 37 |         # Create Performance Insights client
 38 |         pi_client = get_performance_insights_client(region)
 39 |             
 40 |         # Set default time range if not provided
 41 |         if not start_time:
 42 |             end_datetime = datetime.utcnow()
 43 |             start_datetime = end_datetime - timedelta(hours=1)
 44 |             start_time = start_datetime.isoformat() + 'Z'
 45 |             end_time = end_datetime.isoformat() + 'Z'
 46 |         elif not end_time:
 47 |             end_time = datetime.utcnow().isoformat() + 'Z'
 48 |             
 49 |         # Define metrics to retrieve
 50 |         metrics = [
 51 |             {'Metric': 'db.load.avg'},
 52 |             {'Metric': 'db.sampledload.avg'}
 53 |         ]
 54 |             
 55 |         # Make the API call
 56 |         response = pi_client.get_resource_metrics(
 57 |             ServiceType='RDS',
 58 |             Identifier=db_instance_identifier,
 59 |             StartTime=start_time,
 60 |             EndTime=end_time,
 61 |             MetricQueries=metrics,
 62 |             PeriodInSeconds=60
 63 |         )
 64 |             
 65 |         return {
 66 |             "status": "success",
 67 |             "data": response,
 68 |             "message": f"Retrieved Performance Insights metrics for {db_instance_identifier}"
 69 |         }
 70 |         
 71 |     except ClientError as e:
 72 |         error_code = e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
 73 |         
 74 |         # Handle specific authorization errors gracefully
 75 |         if error_code in ['NotAuthorizedException', 'AccessDenied', 'UnauthorizedOperation']:
 76 |             logger.warning(f"Performance Insights not authorized for {db_instance_identifier}: {str(e)}")
 77 |             return {
 78 |                 "status": "success",
 79 |                 "data": {
 80 |                     "MetricList": [],
 81 |                     "AlignedStartTime": start_time,
 82 |                     "AlignedEndTime": end_time,
 83 |                     "Identifier": db_instance_identifier
 84 |                 },
 85 |                 "message": f"Performance Insights not enabled or authorized for {db_instance_identifier}",
 86 |                 "warning": "Performance Insights requires explicit enablement and permissions"
 87 |             }
 88 |         else:
 89 |             logger.error(f"Error in Performance Insights API: {str(e)}")
 90 |             return {
 91 |                 "status": "error",
 92 |                 "message": f"Performance Insights API error: {str(e)}",
 93 |                 "error_code": error_code
 94 |             }
 95 |         
 96 |     except Exception as e:
 97 |         logger.error(f"Unexpected error in Performance Insights service: {str(e)}")
 98 |         return {
 99 |             "status": "error",
100 |             "message": f"Unexpected error: {str(e)}"
101 |         }
```

--------------------------------------------------------------------------------
/playbooks/s3/s3_optimization.py:
--------------------------------------------------------------------------------

```python
 1 | """
 2 | S3 Cost Optimization Playbook - Consolidated Module
 3 | 
 4 | This module has been consolidated and cleaned up. The main S3 optimization functionality
 5 | has been moved to the new architecture:
 6 | - core/s3_optimization_orchestrator.py (main orchestrator)
 7 | - core/s3_analysis_engine.py (analysis engine)
 8 | - core/analyzers/ (individual analyzer implementations)
 9 | 
10 | This file now contains only essential imports and references for backward compatibility.
11 | All new development should use the S3OptimizationOrchestrator from playbooks.s3.s3_optimization_orchestrator.
12 | """
13 | 
14 | import logging
15 | from typing import Dict, Any, Optional
16 | 
17 | logger = logging.getLogger(__name__)
18 | 
19 | # Import the new orchestrator for any legacy compatibility needs
20 | try:
21 |     from .s3_optimization_orchestrator import S3OptimizationOrchestrator
22 |     
23 |     # Provide a compatibility alias for any remaining legacy code
24 |     S3Optimization = S3OptimizationOrchestrator
25 |     
26 |     logger.info("S3 optimization functionality available via S3OptimizationOrchestrator")
27 |     
28 | except ImportError as e:
29 |     logger.error(f"Failed to import S3OptimizationOrchestrator: {e}")
30 |     
31 |     # Fallback class for error handling
32 |     class S3Optimization:
33 |         """
34 |         Fallback S3Optimization class when the new orchestrator is not available.
35 |         
36 |         This class provides basic error handling and guidance to use the new architecture.
37 |         """
38 |         
39 |         def __init__(self, region: Optional[str] = None, timeout_seconds: int = 45):
40 |             """
41 |             Initialize fallback S3 optimization.
42 |             
43 |             Args:
44 |                 region: AWS region (optional)
45 |                 timeout_seconds: Maximum execution time per analysis (default: 45)
46 |             """
47 |             self.region = region
48 |             self.timeout_seconds = timeout_seconds
49 |             logger.warning("Using fallback S3Optimization class. Please use S3OptimizationOrchestrator instead.")
50 |         
51 |         def __getattr__(self, name: str) -> Any:
52 |             """
53 |             Handle any method calls by providing guidance to use the new architecture.
54 |             
55 |             Args:
56 |                 name: Method name being called
57 |                 
58 |             Returns:
59 |                 Error response with guidance
60 |             """
61 |             logger.error(f"Method '{name}' called on fallback S3Optimization class")
62 |             return lambda *args, **kwargs: {
63 |                 "status": "error",
64 |                 "message": f"S3Optimization.{name}() is deprecated. Use S3OptimizationOrchestrator instead.",
65 |                 "guidance": {
66 |                     "new_class": "S3OptimizationOrchestrator",
67 |                     "import_path": "from playbooks.s3.s3_optimization_orchestrator import S3OptimizationOrchestrator",
68 |                     "migration_note": "The new orchestrator provides all S3 optimization functionality with improved performance and session integration."
69 |                 },
70 |                 "data": {}
71 |             }
72 | 
73 | 
74 | # Utility functions for backward compatibility
75 | def get_s3_optimization_instance(region: Optional[str] = None, timeout_seconds: int = 45) -> S3Optimization:
76 |     """
77 |     Get an S3 optimization instance (preferably the new orchestrator).
78 |     
79 |     Args:
80 |         region: AWS region (optional)
81 |         timeout_seconds: Maximum execution time per analysis (default: 45)
82 |         
83 |     Returns:
84 |         S3Optimization instance (either orchestrator or fallback)
85 |     """
86 |     try:
87 |         return S3OptimizationOrchestrator(region=region)
88 |     except Exception as e:
89 |         logger.warning(f"Could not create S3OptimizationOrchestrator, using fallback: {e}")
90 |         return S3Optimization(region=region, timeout_seconds=timeout_seconds)
91 | 
92 | 
93 | # Export the main class for backward compatibility
94 | __all__ = ['S3Optimization', 'get_s3_optimization_instance']
```

--------------------------------------------------------------------------------
/utils/documentation_links.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Documentation Links Utility
  3 | 
  4 | This module provides centralized documentation links for AWS cost optimization tools.
  5 | It adds relevant documentation and best practices links to tool outputs, including
  6 | AWS Well-Architected Framework recommendations.
  7 | """
  8 | 
  9 | from typing import Dict, Any, List
 10 | # Removed wellarchitected_recommendations - let LLMs provide recommendations based on MCP output
 11 | 
 12 | # Documentation links mapping
 13 | DOCUMENTATION_LINKS = {
 14 |     "general": {
 15 |         "CFM-TIPs Guidance": "https://catalog.workshops.aws/awscff/en-US/introduction",
 16 |         "Cost Optimization Pillar of AWS Well Architected": "https://docs.aws.amazon.com/wellarchitected/latest/framework/cost-optimization.html"
 17 |     },
 18 |     "ec2": {
 19 |         "Best Practices Playbooks for EC2": "https://catalog.workshops.aws/awscff/en-US/playbooks/compute/ec2"
 20 |     },
 21 |     "ebs": {
 22 |         "Best Practices Playbooks for EBS": "https://catalog.workshops.aws/awscff/en-US/playbooks/storage/ebs"
 23 |     },
 24 |     "rds": {
 25 |         "Best Practices Playbooks for RDS": "https://catalog.workshops.aws/awscff/en-US/playbooks/databases/rds"
 26 |     },
 27 |     "lambda": {
 28 |         "Best Practices Playbooks for AWS Lambda": "https://catalog.workshops.aws/awscff/en-US/playbooks/compute/lambda"
 29 |     },
 30 |     "s3": {
 31 |         "Best Practices Playbooks for S3": "https://catalog.workshops.aws/awscff/en-US/playbooks/storage/s3"
 32 |     },
 33 |     "cloudtrail": {
 34 |         "Best Practices Playbooks for CloudTrail": "https://catalog.workshops.aws/awscff/en-US/playbooks/management-and-governance/cloudtrail"
 35 |     }
 36 | }
 37 | 
 38 | def add_documentation_links(result: Dict[str, Any], service_type: str = None, finding_type: str = None) -> Dict[str, Any]:
 39 |     """
 40 |     Add relevant documentation links and Well-Architected recommendations to a result dictionary.
 41 |     
 42 |     Args:
 43 |         result: The result dictionary from a cost optimization function
 44 |         service_type: The AWS service type (ec2, ebs, rds, lambda, s3, cloudtrail)
 45 |         finding_type: Type of optimization finding (underutilized, unused, overprovisioned, etc.)
 46 |     
 47 |     Returns:
 48 |         Enhanced result dictionary with documentation links and Well-Architected recommendations
 49 |     """
 50 |     if not isinstance(result, dict):
 51 |         return result
 52 |     
 53 |     # Create a copy to avoid modifying the original
 54 |     enhanced_result = result.copy()
 55 |     
 56 |     # Build documentation links
 57 |     docs = {}
 58 |     
 59 |     # Always include general documentation
 60 |     docs.update(DOCUMENTATION_LINKS["general"])
 61 |     
 62 |     # Add service-specific documentation if specified
 63 |     if service_type and service_type.lower() in DOCUMENTATION_LINKS:
 64 |         docs.update(DOCUMENTATION_LINKS[service_type.lower()])
 65 |     
 66 |     # Add documentation section to the result
 67 |     enhanced_result["documentation"] = {
 68 |         "description": "Suggested documentation and further reading",
 69 |         "links": docs
 70 |     }
 71 |     
 72 |     # Well-Architected recommendations now provided by LLMs analyzing MCP output
 73 |     
 74 |     return enhanced_result
 75 | 
 76 | def get_service_documentation(service_type: str) -> Dict[str, str]:
 77 |     """
 78 |     Get documentation links for a specific service.
 79 |     
 80 |     Args:
 81 |         service_type: The AWS service type
 82 |     
 83 |     Returns:
 84 |         Dictionary of documentation links
 85 |     """
 86 |     docs = DOCUMENTATION_LINKS["general"].copy()
 87 |     
 88 |     if service_type.lower() in DOCUMENTATION_LINKS:
 89 |         docs.update(DOCUMENTATION_LINKS[service_type.lower()])
 90 |     
 91 |     return docs
 92 | 
 93 | def format_documentation_section(service_type: str = None) -> Dict[str, Any]:
 94 |     """
 95 |     Format a standalone documentation section.
 96 |     
 97 |     Args:
 98 |         service_type: Optional service type for service-specific links
 99 |     
100 |     Returns:
101 |         Formatted documentation section
102 |     """
103 |     docs = DOCUMENTATION_LINKS["general"].copy()
104 |     
105 |     if service_type and service_type.lower() in DOCUMENTATION_LINKS:
106 |         docs.update(DOCUMENTATION_LINKS[service_type.lower()])
107 |     
108 |     return {
109 |         "documentation": {
110 |             "description": "Suggested documentation and further reading",
111 |             "links": docs
112 |         }
113 |     }
```

--------------------------------------------------------------------------------
/logging_config.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Centralized logging configuration for CFM Tips MCP Server
  3 | """
  4 | 
  5 | import logging
  6 | import sys
  7 | import os
  8 | import tempfile
  9 | from datetime import datetime
 10 | 
 11 | def setup_logging():
 12 |     """Configure comprehensive logging for the application."""
 13 |     
 14 |     # Create formatter
 15 |     formatter = logging.Formatter(
 16 |         '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
 17 |     )
 18 |     
 19 |     # Configure root logger
 20 |     root_logger = logging.getLogger()
 21 |     root_logger.setLevel(logging.INFO)
 22 |     
 23 |     # Remove existing handlers
 24 |     for handler in root_logger.handlers[:]:
 25 |         root_logger.removeHandler(handler)
 26 |     
 27 |     # Add file handlers
 28 |     try:
 29 |         # Try to create logs directory if it doesn't exist
 30 |         log_dir = 'logs'
 31 |         if not os.path.exists(log_dir):
 32 |             os.makedirs(log_dir, exist_ok=True)
 33 |         
 34 |         # Try main log file in logs directory first
 35 |         log_file = os.path.join(log_dir, 'cfm_tips_mcp.log')
 36 |         file_handler = logging.FileHandler(log_file)
 37 |         file_handler.setLevel(logging.INFO)
 38 |         file_handler.setFormatter(formatter)
 39 |         root_logger.addHandler(file_handler)
 40 |         
 41 |         # Try error log file
 42 |         error_file = os.path.join(log_dir, 'cfm_tips_mcp_errors.log')
 43 |         error_handler = logging.FileHandler(error_file)
 44 |         error_handler.setLevel(logging.ERROR)
 45 |         error_handler.setFormatter(formatter)
 46 |         root_logger.addHandler(error_handler)
 47 |         
 48 |     except (OSError, PermissionError) as e:
 49 |         # If we can't write to logs directory, try current directory
 50 |         try:
 51 |             file_handler = logging.FileHandler('cfm_tips_mcp.log')
 52 |             file_handler.setLevel(logging.INFO)
 53 |             file_handler.setFormatter(formatter)
 54 |             root_logger.addHandler(file_handler)
 55 |             
 56 |             error_handler = logging.FileHandler('cfm_tips_mcp_errors.log')
 57 |             error_handler.setLevel(logging.ERROR)
 58 |             error_handler.setFormatter(formatter)
 59 |             root_logger.addHandler(error_handler)
 60 |             
 61 |         except (OSError, PermissionError):
 62 |             # If we can't write anywhere, try temp directory
 63 |             try:
 64 |                 temp_dir = tempfile.gettempdir()
 65 |                 temp_log = os.path.join(temp_dir, 'cfm_tips_mcp.log')
 66 |                 file_handler = logging.FileHandler(temp_log)
 67 |                 file_handler.setLevel(logging.INFO)
 68 |                 file_handler.setFormatter(formatter)
 69 |                 root_logger.addHandler(file_handler)
 70 |                 
 71 |                 temp_error = os.path.join(temp_dir, 'cfm_tips_mcp_errors.log')
 72 |                 error_handler = logging.FileHandler(temp_error)
 73 |                 error_handler.setLevel(logging.ERROR)
 74 |                 error_handler.setFormatter(formatter)
 75 |                 root_logger.addHandler(error_handler)
 76 |                 
 77 |                 # Log where we're writing files
 78 |                 print(f"Warning: Using temp directory for logs: {temp_dir}")
 79 |                 
 80 |             except (OSError, PermissionError):
 81 |                 # If all else fails, raise error since we need file logging
 82 |                 raise RuntimeError("Could not create log files in any location")
 83 |     
 84 |     return logging.getLogger(__name__)
 85 | 
 86 | def log_function_entry(logger, func_name, **kwargs):
 87 |     """Log function entry with parameters."""
 88 |     logger.info(f"Entering {func_name} with params: {kwargs}")
 89 | 
 90 | def log_function_exit(logger, func_name, result_status=None, execution_time=None):
 91 |     """Log function exit with results."""
 92 |     msg = f"Exiting {func_name}"
 93 |     if result_status:
 94 |         msg += f" - Status: {result_status}"
 95 |     if execution_time:
 96 |         msg += f" - Time: {execution_time:.2f}s"
 97 |     logger.info(msg)
 98 | 
 99 | def log_aws_api_call(logger, service, operation, **params):
100 |     """Log AWS API calls."""
101 |     logger.info(f"AWS API Call: {service}.{operation} with params: {params}")
102 | 
103 | def log_aws_api_error(logger, service, operation, error):
104 |     """Log AWS API errors."""
105 |     logger.error(f"AWS API Error: {service}.{operation} - {str(error)}")
```

--------------------------------------------------------------------------------
/tests/test_suite_main.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Main Test Suite Runner - Top Level Suite
  4 | Orchestrates all second-level test suites (unit, performance, integration).
  5 | """
  6 | 
  7 | import asyncio
  8 | import sys
  9 | import os
 10 | import importlib.util
 11 | from typing import Dict, Any
 12 | 
 13 | # Add the project root to the path
 14 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
 15 | 
 16 | 
 17 | def run_suite(suite_name: str, suite_path: str) -> Dict[str, Any]:
 18 |     """Run a test suite and return results."""
 19 |     print(f"\n{'='*60}")
 20 |     print(f"🚀 STARTING {suite_name.upper()} SUITE")
 21 |     print(f"{'='*60}")
 22 |     
 23 |     if not os.path.exists(suite_path):
 24 |         return {
 25 |             'name': suite_name,
 26 |             'status': 'skipped',
 27 |             'reason': f'Suite file not found: {suite_path}'
 28 |         }
 29 |     
 30 |     try:
 31 |         # Load the suite module
 32 |         spec = importlib.util.spec_from_file_location("suite_module", suite_path)
 33 |         suite_module = importlib.util.module_from_spec(spec)
 34 |         spec.loader.exec_module(suite_module)
 35 |         
 36 |         # Determine the appropriate function to call
 37 |         if suite_name == 'Integration' and hasattr(suite_module, 'run_integration_tests'):
 38 |             # Integration tests are async
 39 |             success = asyncio.run(suite_module.run_integration_tests())
 40 |         elif hasattr(suite_module, f'run_{suite_name.lower()}_tests'):
 41 |             # Standard naming convention
 42 |             func = getattr(suite_module, f'run_{suite_name.lower()}_tests')
 43 |             success = func()
 44 |         elif hasattr(suite_module, 'main'):
 45 |             # Fallback to main function
 46 |             success = suite_module.main()
 47 |         else:
 48 |             return {
 49 |                 'name': suite_name,
 50 |                 'status': 'error',
 51 |                 'reason': f'No suitable entry point found in {suite_path}'
 52 |             }
 53 |         
 54 |         return {
 55 |             'name': suite_name,
 56 |             'status': 'passed' if success else 'failed',
 57 |             'success': success
 58 |         }
 59 |         
 60 |     except Exception as e:
 61 |         return {
 62 |             'name': suite_name,
 63 |             'status': 'error',
 64 |             'reason': str(e)
 65 |         }
 66 | 
 67 | 
 68 | def main():
 69 |     """Run all test suites in order."""
 70 |     print("🎯 CFM Tips - Main Test Suite Runner")
 71 |     print("=" * 60)
 72 |     print("Running hierarchical test suite:")
 73 |     print("  📁 Top Level: Main Suite")
 74 |     print("  📁 Second Level: Unit, Performance, Integration")
 75 |     print("  📁 Third Level: Playbook-specific (CloudWatch, EC2, S3, etc.)")
 76 |     print("=" * 60)
 77 |     
 78 |     # Define the test suites in execution order
 79 |     suites = [
 80 |         ("Unit", "tests/unit/test_unit_suite.py"),
 81 |         ("Performance", "tests/performance/test_performance_suite.py"),
 82 |         ("Integration", "tests/integration/test_integration_suite.py"),
 83 |     ]
 84 |     
 85 |     results = []
 86 |     
 87 |     # Run each suite
 88 |     for suite_name, suite_path in suites:
 89 |         result = run_suite(suite_name, suite_path)
 90 |         results.append(result)
 91 |     
 92 |     # Print summary
 93 |     print(f"\n{'='*60}")
 94 |     print("📊 MAIN TEST SUITE SUMMARY")
 95 |     print(f"{'='*60}")
 96 |     
 97 |     passed = 0
 98 |     failed = 0
 99 |     skipped = 0
100 |     errors = 0
101 |     
102 |     for result in results:
103 |         status = result['status']
104 |         name = result['name']
105 |         
106 |         if status == 'passed':
107 |             print(f"✅ {name} Suite: PASSED")
108 |             passed += 1
109 |         elif status == 'failed':
110 |             print(f"❌ {name} Suite: FAILED")
111 |             failed += 1
112 |         elif status == 'skipped':
113 |             print(f"⏭️  {name} Suite: SKIPPED - {result['reason']}")
114 |             skipped += 1
115 |         elif status == 'error':
116 |             print(f"💥 {name} Suite: ERROR - {result['reason']}")
117 |             errors += 1
118 |     
119 |     total = len(results)
120 |     print(f"\n📈 Results: {total} suites total")
121 |     print(f"   ✅ Passed: {passed}")
122 |     print(f"   ❌ Failed: {failed}")
123 |     print(f"   ⏭️  Skipped: {skipped}")
124 |     print(f"   💥 Errors: {errors}")
125 |     
126 |     success_rate = (passed / total * 100) if total > 0 else 0
127 |     print(f"   📊 Success Rate: {success_rate:.1f}%")
128 |     
129 |     overall_success = failed == 0 and errors == 0
130 |     
131 |     if overall_success:
132 |         print(f"\n🎉 ALL TEST SUITES COMPLETED SUCCESSFULLY!")
133 |         print("   🚀 CFM Tips is ready for deployment!")
134 |     else:
135 |         print(f"\n⚠️  SOME TEST SUITES FAILED")
136 |         print("   🔧 Please review failed tests before deployment")
137 |     
138 |     return overall_success
139 | 
140 | 
141 | if __name__ == "__main__":
142 |     success = main()
143 |     sys.exit(0 if success else 1)
```

--------------------------------------------------------------------------------
/tests/unit/s3/live/test_top_buckets.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Live test for S3 top buckets listing functionality.
  3 | 
  4 | This test validates that the quick analysis properly returns top 10 buckets
  5 | with their cost estimates.
  6 | """
  7 | 
  8 | import pytest
  9 | import asyncio
 10 | import json
 11 | from playbooks.s3.s3_optimization_orchestrator import run_s3_quick_analysis
 12 | 
 13 | 
 14 | @pytest.mark.live
 15 | @pytest.mark.asyncio
 16 | async def test_list_top_10_buckets():
 17 |     """Test that quick analysis returns top 10 buckets with cost estimates."""
 18 |     
 19 |     # Run quick analysis
 20 |     arguments = {
 21 |         'region': 'us-east-1'  # You can change this to your preferred region
 22 |     }
 23 |     
 24 |     result = await run_s3_quick_analysis(arguments)
 25 |     
 26 |     # Parse the result
 27 |     assert len(result) > 0
 28 |     assert result[0]["type"] == "text"
 29 |     
 30 |     data = json.loads(result[0]["text"])
 31 |     
 32 |     # Verify structure
 33 |     assert data["status"] == "success"
 34 |     assert "results" in data
 35 |     assert "general_spend" in data["results"]
 36 |     
 37 |     # Check general_spend results
 38 |     general_spend = data["results"]["general_spend"]
 39 |     
 40 |     print(f"\n=== General Spend Status: {general_spend.get('status')} ===")
 41 |     
 42 |     if general_spend.get("status") == "success":
 43 |         assert "data" in general_spend
 44 |         
 45 |         # Print full data structure for debugging
 46 |         print(f"\nData keys: {list(general_spend['data'].keys())}")
 47 |         
 48 |         if "bucket_costs" in general_spend["data"]:
 49 |             bucket_costs = general_spend["data"]["bucket_costs"]
 50 |             print(f"\nBucket costs keys: {list(bucket_costs.keys())}")
 51 |             print(f"Total buckets analyzed: {bucket_costs.get('total_buckets_analyzed', 'N/A')}")
 52 |             
 53 |             # Verify top_10_buckets exists
 54 |             assert "top_10_buckets" in bucket_costs
 55 |             
 56 |             top_buckets = bucket_costs["top_10_buckets"]
 57 |             
 58 |             # Print results for manual verification
 59 |             print("\n=== Top 10 S3 Buckets by Estimated Cost ===")
 60 |             if len(top_buckets) == 0:
 61 |                 print("No buckets found or analyzed.")
 62 |             else:
 63 |                 for i, bucket in enumerate(top_buckets, 1):
 64 |                     print(f"{i}. {bucket['bucket_name']}")
 65 |                     print(f"   Estimated Monthly Cost: ${bucket['estimated_monthly_cost']:.2f}")
 66 |                     print(f"   Size: {bucket['size_gb']:.2f} GB")
 67 |                     print(f"   Objects: {bucket['object_count']:,}")
 68 |                     print(f"   Storage Class: {bucket['primary_storage_class']}")
 69 |                     print()
 70 |         else:
 71 |             print("\nWARNING: bucket_costs not found in general_spend data")
 72 |             print(f"Available data: {json.dumps(general_spend['data'], indent=2, default=str)}")
 73 |         
 74 |         # Verify bucket data structure
 75 |         if len(top_buckets) > 0:
 76 |             first_bucket = top_buckets[0]
 77 |             assert "bucket_name" in first_bucket
 78 |             assert "estimated_monthly_cost" in first_bucket
 79 |             assert "size_gb" in first_bucket
 80 |             assert "object_count" in first_bucket
 81 |             assert "primary_storage_class" in first_bucket
 82 |             
 83 |             # Verify costs are sorted (highest first)
 84 |             if len(top_buckets) > 1:
 85 |                 for i in range(len(top_buckets) - 1):
 86 |                     assert top_buckets[i]["estimated_monthly_cost"] >= top_buckets[i + 1]["estimated_monthly_cost"], \
 87 |                         "Buckets should be sorted by cost (highest first)"
 88 |     else:
 89 |         print(f"\nGeneral spend analysis failed: {general_spend.get('message')}")
 90 |         pytest.skip(f"General spend analysis failed: {general_spend.get('message')}")
 91 | 
 92 | 
 93 | @pytest.mark.live
 94 | @pytest.mark.asyncio
 95 | async def test_bucket_cost_estimation():
 96 |     """Test that bucket cost estimation is working correctly."""
 97 |     
 98 |     arguments = {'region': 'us-east-1'}
 99 |     result = await run_s3_quick_analysis(arguments)
100 |     
101 |     data = json.loads(result[0]["text"])
102 |     
103 |     if data["status"] == "success":
104 |         general_spend = data["results"].get("general_spend", {})
105 |         
106 |         if general_spend.get("status") == "success":
107 |             bucket_costs = general_spend["data"].get("bucket_costs", {})
108 |             
109 |             # Check that we have bucket analysis data
110 |             assert "by_bucket" in bucket_costs or "top_10_buckets" in bucket_costs
111 |             
112 |             # Verify total buckets analyzed
113 |             if "total_buckets_analyzed" in bucket_costs:
114 |                 print(f"\nTotal buckets analyzed: {bucket_costs['total_buckets_analyzed']}")
115 |             
116 |             # Verify cost estimation method
117 |             if "cost_estimation_method" in bucket_costs:
118 |                 print(f"Cost estimation method: {bucket_costs['cost_estimation_method']}")
119 |                 assert bucket_costs["cost_estimation_method"] in ["size_based", "cost_explorer"]
120 | 
121 | 
122 | if __name__ == "__main__":
123 |     # Run the test directly
124 |     asyncio.run(test_list_top_10_buckets())
125 | 
```

--------------------------------------------------------------------------------
/tests/legacy/example_output_with_docs.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Example showing how documentation links appear in tool outputs
  4 | """
  5 | 
  6 | import json
  7 | from utils.documentation_links import add_documentation_links
  8 | 
  9 | def show_example_outputs():
 10 |     """Show examples of how documentation links appear in different tool outputs"""
 11 |     
 12 |     print("CFM Tips - Documentation Links Feature Examples")
 13 |     print("=" * 60)
 14 |     print()
 15 |     
 16 |     # Example 1: EC2 Right-sizing Analysis
 17 |     print("1. EC2 Right-sizing Analysis Output:")
 18 |     print("-" * 40)
 19 |     ec2_result = {
 20 |         "status": "success",
 21 |         "data": {
 22 |             "underutilized_instances": [
 23 |                 {
 24 |                     "instance_id": "i-1234567890abcdef0",
 25 |                     "instance_type": "m5.large",
 26 |                     "finding": "Overprovisioned",
 27 |                     "recommendation": {
 28 |                         "recommended_instance_type": "m5.medium",
 29 |                         "estimated_monthly_savings": 45.50
 30 |                     }
 31 |                 }
 32 |             ],
 33 |             "count": 1,
 34 |             "total_monthly_savings": 45.50
 35 |         },
 36 |         "message": "Found 1 underutilized EC2 instances via Compute Optimizer"
 37 |     }
 38 |     
 39 |     enhanced_ec2 = add_documentation_links(ec2_result, "ec2")
 40 |     print(json.dumps(enhanced_ec2, indent=2))
 41 |     print("\n" + "=" * 60 + "\n")
 42 |     
 43 |     # Example 2: S3 Optimization Analysis
 44 |     print("2. S3 Optimization Analysis Output:")
 45 |     print("-" * 40)
 46 |     s3_result = {
 47 |         "status": "success",
 48 |         "comprehensive_s3_optimization": {
 49 |             "overview": {
 50 |                 "total_potential_savings": "$1,250.75",
 51 |                 "analyses_completed": "6/6",
 52 |                 "failed_analyses": 0,
 53 |                 "execution_time": "45.2s"
 54 |             },
 55 |             "key_findings": [
 56 |                 "Found 15 buckets with suboptimal storage classes",
 57 |                 "Identified $800 in potential lifecycle savings",
 58 |                 "Discovered 25 incomplete multipart uploads"
 59 |             ],
 60 |             "top_recommendations": [
 61 |                 {
 62 |                     "type": "storage_class_optimization",
 63 |                     "bucket": "my-data-bucket",
 64 |                     "potential_savings": "$450.25/month",
 65 |                     "action": "Transition to IA after 30 days"
 66 |                 }
 67 |             ]
 68 |         }
 69 |     }
 70 |     
 71 |     enhanced_s3 = add_documentation_links(s3_result, "s3")
 72 |     print(json.dumps(enhanced_s3, indent=2))
 73 |     print("\n" + "=" * 60 + "\n")
 74 |     
 75 |     # Example 3: RDS Optimization Analysis
 76 |     print("3. RDS Optimization Analysis Output:")
 77 |     print("-" * 40)
 78 |     rds_result = {
 79 |         "status": "success",
 80 |         "data": {
 81 |             "underutilized_instances": [
 82 |                 {
 83 |                     "db_instance_identifier": "prod-database-1",
 84 |                     "db_instance_class": "db.r5.xlarge",
 85 |                     "finding": "Underutilized",
 86 |                     "avg_cpu_utilization": 15.5,
 87 |                     "recommendation": {
 88 |                         "recommended_instance_class": "db.r5.large",
 89 |                         "estimated_monthly_savings": 180.00
 90 |                     }
 91 |                 }
 92 |             ],
 93 |             "count": 1,
 94 |             "total_monthly_savings": 180.00
 95 |         },
 96 |         "message": "Found 1 underutilized RDS instances"
 97 |     }
 98 |     
 99 |     enhanced_rds = add_documentation_links(rds_result, "rds")
100 |     print(json.dumps(enhanced_rds, indent=2))
101 |     print("\n" + "=" * 60 + "\n")
102 |     
103 |     # Example 4: Lambda Optimization Analysis
104 |     print("4. Lambda Optimization Analysis Output:")
105 |     print("-" * 40)
106 |     lambda_result = {
107 |         "status": "success",
108 |         "data": {
109 |             "overprovisioned_functions": [
110 |                 {
111 |                     "function_name": "data-processor",
112 |                     "current_memory": 1024,
113 |                     "avg_memory_utilization": 35.2,
114 |                     "recommendation": {
115 |                         "recommended_memory": 512,
116 |                         "estimated_monthly_savings": 25.75
117 |                     }
118 |                 }
119 |             ],
120 |             "count": 1,
121 |             "total_monthly_savings": 25.75
122 |         },
123 |         "message": "Found 1 overprovisioned Lambda functions"
124 |     }
125 |     
126 |     enhanced_lambda = add_documentation_links(lambda_result, "lambda")
127 |     print(json.dumps(enhanced_lambda, indent=2))
128 |     print("\n" + "=" * 60 + "\n")
129 |     
130 |     # Example 5: General Cost Analysis (no specific service)
131 |     print("5. General Cost Analysis Output:")
132 |     print("-" * 40)
133 |     general_result = {
134 |         "status": "success",
135 |         "data": {
136 |             "total_monthly_cost": 5420.75,
137 |             "potential_savings": 1250.50,
138 |             "services_analyzed": ["EC2", "EBS", "RDS", "Lambda", "S3"],
139 |             "optimization_opportunities": 47
140 |         },
141 |         "message": "Comprehensive cost analysis completed"
142 |     }
143 |     
144 |     enhanced_general = add_documentation_links(general_result)
145 |     print(json.dumps(enhanced_general, indent=2))
146 |     print("\n" + "=" * 60 + "\n")
147 |     
148 |     print("Key Benefits of Documentation Links:")
149 |     print("• Provides immediate access to AWS best practices")
150 |     print("• Links to CFM-TIPs guidance and workshops")
151 |     print("• References AWS Well-Architected Framework")
152 |     print("• Service-specific playbooks for detailed guidance")
153 |     print("• Consistent across all tool outputs")
154 |     print("• Helps users understand optimization recommendations")
155 | 
156 | if __name__ == "__main__":
157 |     show_example_outputs()
```

--------------------------------------------------------------------------------
/services/optimization_hub.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | AWS Cost Optimization Hub service module.
  3 | 
  4 | This module provides functions for interacting with the AWS Cost Optimization Hub API.
  5 | """
  6 | 
  7 | import logging
  8 | from typing import Dict, List, Optional, Any
  9 | import boto3
 10 | from botocore.exceptions import ClientError
 11 | 
 12 | logger = logging.getLogger(__name__)
 13 | 
 14 | def get_recommendations(
 15 |     resource_type: Optional[str] = None,
 16 |     region: Optional[str] = None,
 17 |     account_id: Optional[str] = None,
 18 |     client_region: Optional[str] = None
 19 | ) -> Dict[str, Any]:
 20 |     """
 21 |     Get cost optimization recommendations from AWS Cost Optimization Hub.
 22 |     
 23 |     Args:
 24 |         resource_type: Resource type to analyze (e.g., EC2, RDS)
 25 |         region: AWS region to filter recommendations
 26 |         account_id: AWS account ID to filter recommendations
 27 |         client_region: Region for the boto3 client (optional)
 28 |         
 29 |     Returns:
 30 |         Dictionary containing the optimization recommendations
 31 |     """
 32 |     try:
 33 |         # Create Cost Optimization Hub client
 34 |         if client_region:
 35 |             client = boto3.client('cost-optimization-hub', region_name=client_region)
 36 |         else:
 37 |             client = boto3.client('cost-optimization-hub')
 38 |         
 39 |         # Prepare filters based on parameters
 40 |         filters = {}
 41 |         if resource_type:
 42 |             filters['resourceType'] = {'values': [resource_type]}
 43 |         if region:
 44 |             filters['region'] = {'values': [region]}
 45 |         if account_id:
 46 |             filters['accountId'] = {'values': [account_id]}
 47 |             
 48 |         # Make the API call
 49 |         if filters:
 50 |             response = client.get_recommendations(filters=filters)
 51 |         else:
 52 |             response = client.get_recommendations()
 53 |             
 54 |         # Extract recommendation count
 55 |         recommendation_count = len(response.get('recommendations', []))
 56 |             
 57 |         return {
 58 |             "status": "success",
 59 |             "data": response,
 60 |             "message": f"Retrieved {recommendation_count} cost optimization recommendations"
 61 |         }
 62 |         
 63 |     except ClientError as e:
 64 |         logger.error(f"Error in Cost Optimization Hub API: {str(e)}")
 65 |         return {
 66 |             "status": "error",
 67 |             "message": f"Cost Optimization Hub API error: {str(e)}",
 68 |             "error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
 69 |         }
 70 |         
 71 |     except Exception as e:
 72 |         logger.error(f"Unexpected error in Cost Optimization Hub service: {str(e)}")
 73 |         return {
 74 |             "status": "error",
 75 |             "message": f"Unexpected error: {str(e)}"
 76 |         }
 77 | 
 78 | def get_recommendation_summary(
 79 |     client_region: Optional[str] = None
 80 | ) -> Dict[str, Any]:
 81 |     """
 82 |     Get a summary of cost optimization recommendations.
 83 |     
 84 |     Args:
 85 |         client_region: Region for the boto3 client (optional)
 86 |         
 87 |     Returns:
 88 |         Dictionary containing the recommendation summary
 89 |     """
 90 |     try:
 91 |         # Create Cost Optimization Hub client
 92 |         if client_region:
 93 |             client = boto3.client('cost-optimization-hub', region_name=client_region)
 94 |         else:
 95 |             client = boto3.client('cost-optimization-hub')
 96 |             
 97 |         # Make the API call
 98 |         response = client.get_recommendation_summary()
 99 |             
100 |         return {
101 |             "status": "success",
102 |             "data": response,
103 |             "message": "Retrieved cost optimization recommendation summary"
104 |         }
105 |         
106 |     except ClientError as e:
107 |         logger.error(f"Error getting recommendation summary: {str(e)}")
108 |         return {
109 |             "status": "error",
110 |             "message": f"Error getting recommendation summary: {str(e)}",
111 |             "error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
112 |         }
113 |         
114 |     except Exception as e:
115 |         logger.error(f"Unexpected error getting recommendation summary: {str(e)}")
116 |         return {
117 |             "status": "error",
118 |             "message": f"Unexpected error: {str(e)}"
119 |         }
120 | 
121 | def get_savings_plans_recommendations(
122 |     lookback_period: str = "SIXTY_DAYS",
123 |     payment_option: str = "NO_UPFRONT",
124 |     term: str = "ONE_YEAR",
125 |     client_region: Optional[str] = None
126 | ) -> Dict[str, Any]:
127 |     """
128 |     Get Savings Plans recommendations from AWS Cost Optimization Hub.
129 |     
130 |     Args:
131 |         lookback_period: Historical data period to analyze
132 |         payment_option: Payment option for Savings Plans
133 |         term: Term length for Savings Plans
134 |         client_region: Region for the boto3 client (optional)
135 |         
136 |     Returns:
137 |         Dictionary containing the Savings Plans recommendations
138 |     """
139 |     try:
140 |         # Create Cost Optimization Hub client
141 |         if client_region:
142 |             client = boto3.client('cost-optimization-hub', region_name=client_region)
143 |         else:
144 |             client = boto3.client('cost-optimization-hub')
145 |             
146 |         # Make the API call
147 |         response = client.get_savings_plans_recommendations(
148 |             lookbackPeriod=lookback_period,
149 |             paymentOption=payment_option,
150 |             term=term
151 |         )
152 |             
153 |         return {
154 |             "status": "success",
155 |             "data": response,
156 |             "message": "Retrieved Savings Plans recommendations"
157 |         }
158 |         
159 |     except ClientError as e:
160 |         logger.error(f"Error getting Savings Plans recommendations: {str(e)}")
161 |         return {
162 |             "status": "error",
163 |             "message": f"Error getting Savings Plans recommendations: {str(e)}",
164 |             "error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
165 |         }
166 |         
167 |     except Exception as e:
168 |         logger.error(f"Unexpected error getting Savings Plans recommendations: {str(e)}")
169 |         return {
170 |             "status": "error",
171 |             "message": f"Unexpected error: {str(e)}"
172 |         }
173 | 
```

--------------------------------------------------------------------------------
/tests/run_cloudwatch_tests.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Test runner for CloudWatch optimization comprehensive testing suite.
  4 | 
  5 | Runs all CloudWatch-related tests including unit tests, integration tests,
  6 | performance tests, and cost constraint validation tests.
  7 | """
  8 | 
  9 | import sys
 10 | import os
 11 | import subprocess
 12 | import argparse
 13 | from pathlib import Path
 14 | 
 15 | 
 16 | def run_command(cmd, description):
 17 |     """Run a command and return success status."""
 18 |     print(f"\n{'='*60}")
 19 |     print(f"Running: {description}")
 20 |     print(f"Command: {' '.join(cmd)}")
 21 |     print(f"{'='*60}")
 22 |     
 23 |     try:
 24 |         result = subprocess.run(cmd, check=True, capture_output=True, text=True)
 25 |         print("✅ PASSED")
 26 |         if result.stdout:
 27 |             print("STDOUT:", result.stdout)
 28 |         return True
 29 |     except subprocess.CalledProcessError as e:
 30 |         print("❌ FAILED")
 31 |         print("STDERR:", e.stderr)
 32 |         if e.stdout:
 33 |             print("STDOUT:", e.stdout)
 34 |         return False
 35 | 
 36 | 
 37 | def main():
 38 |     parser = argparse.ArgumentParser(description="Run CloudWatch optimization tests")
 39 |     parser.add_argument("--unit", action="store_true", help="Run only unit tests")
 40 |     parser.add_argument("--integration", action="store_true", help="Run only integration tests")
 41 |     parser.add_argument("--performance", action="store_true", help="Run only performance tests")
 42 |     parser.add_argument("--cost-validation", action="store_true", help="Run only cost validation tests")
 43 |     parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
 44 |     parser.add_argument("--coverage", action="store_true", help="Run with coverage reporting")
 45 |     parser.add_argument("--parallel", "-n", type=int, help="Number of parallel workers")
 46 |     
 47 |     args = parser.parse_args()
 48 |     
 49 |     # Change to tests directory
 50 |     test_dir = Path(__file__).parent
 51 |     os.chdir(test_dir)
 52 |     
 53 |     # Base pytest command
 54 |     base_cmd = ["python", "-m", "pytest"]
 55 |     
 56 |     if args.verbose:
 57 |         base_cmd.append("-v")
 58 |     
 59 |     if args.parallel:
 60 |         base_cmd.extend(["-n", str(args.parallel)])
 61 |     
 62 |     if args.coverage:
 63 |         base_cmd.extend([
 64 |             "--cov=playbooks.cloudwatch",
 65 |             "--cov=services.cloudwatch_service",
 66 |             "--cov=services.cloudwatch_pricing",
 67 |             "--cov-report=html",
 68 |             "--cov-report=term-missing"
 69 |         ])
 70 |     
 71 |     # Test categories
 72 |     test_categories = []
 73 |     
 74 |     if args.unit or not any([args.unit, args.integration, args.performance, args.cost_validation]):
 75 |         test_categories.append(("Unit Tests", [
 76 |             "unit/analyzers/test_cloudwatch_base_analyzer.py",
 77 |             "unit/analyzers/test_cloudwatch_general_spend_analyzer.py",
 78 |             "unit/analyzers/test_metrics_optimization_analyzer.py",
 79 |             "unit/analyzers/test_logs_optimization_analyzer.py",
 80 |             "unit/analyzers/test_alarms_and_dashboards_analyzer.py",
 81 |             "unit/services/test_cloudwatch_service.py",
 82 |             "unit/services/test_cloudwatch_cost_controller.py",
 83 |             "unit/services/test_cloudwatch_query_service.py"
 84 |         ]))
 85 |     
 86 |     if args.integration or not any([args.unit, args.integration, args.performance, args.cost_validation]):
 87 |         test_categories.append(("Integration Tests", [
 88 |             "integration/test_cloudwatch_orchestrator_integration.py",
 89 |             "integration/test_cloudwatch_comprehensive_tool_integration.py"
 90 |         ]))
 91 |     
 92 |     if args.performance or not any([args.unit, args.integration, args.performance, args.cost_validation]):
 93 |         test_categories.append(("Performance Tests", [
 94 |             "performance/test_cloudwatch_parallel_execution.py"
 95 |         ]))
 96 |     
 97 |     if args.cost_validation or not any([args.unit, args.integration, args.performance, args.cost_validation]):
 98 |         test_categories.append(("Cost Constraint Validation Tests", [
 99 |             "unit/test_cloudwatch_cost_constraints.py"
100 |         ]))
101 |     
102 |     # Run test categories
103 |     all_passed = True
104 |     results = {}
105 |     
106 |     for category_name, test_files in test_categories:
107 |         print(f"\n🧪 Running {category_name}")
108 |         print("=" * 80)
109 |         
110 |         category_passed = True
111 |         for test_file in test_files:
112 |             if os.path.exists(test_file):
113 |                 cmd = base_cmd + [test_file]
114 |                 success = run_command(cmd, f"{category_name}: {test_file}")
115 |                 if not success:
116 |                     category_passed = False
117 |                     all_passed = False
118 |             else:
119 |                 print(f"⚠️  Test file not found: {test_file}")
120 |                 category_passed = False
121 |                 all_passed = False
122 |         
123 |         results[category_name] = category_passed
124 |     
125 |     # Run specific CloudWatch marker tests
126 |     print(f"\n🧪 Running CloudWatch-specific marker tests")
127 |     print("=" * 80)
128 |     
129 |     marker_tests = [
130 |         ("No-Cost Validation", ["-m", "no_cost_validation"]),
131 |         ("CloudWatch Unit Tests", ["-m", "unit and cloudwatch"]),
132 |         ("CloudWatch Integration Tests", ["-m", "integration and cloudwatch"]),
133 |         ("CloudWatch Performance Tests", ["-m", "performance and cloudwatch"])
134 |     ]
135 |     
136 |     for test_name, marker_args in marker_tests:
137 |         cmd = base_cmd + marker_args
138 |         success = run_command(cmd, test_name)
139 |         if not success:
140 |             all_passed = False
141 |         results[test_name] = success
142 |     
143 |     # Summary
144 |     print(f"\n{'='*80}")
145 |     print("TEST SUMMARY")
146 |     print(f"{'='*80}")
147 |     
148 |     for category, passed in results.items():
149 |         status = "✅ PASSED" if passed else "❌ FAILED"
150 |         print(f"{category:<40} {status}")
151 |     
152 |     overall_status = "✅ ALL TESTS PASSED" if all_passed else "❌ SOME TESTS FAILED"
153 |     print(f"\nOverall Result: {overall_status}")
154 |     
155 |     if args.coverage and all_passed:
156 |         print(f"\n📊 Coverage report generated in htmlcov/index.html")
157 |     
158 |     return 0 if all_passed else 1
159 | 
160 | 
161 | if __name__ == "__main__":
162 |     sys.exit(main())
```

--------------------------------------------------------------------------------
/services/cost_explorer.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | AWS Cost Explorer service module.
  3 | 
  4 | This module provides functions for interacting with the AWS Cost Explorer API.
  5 | """
  6 | 
  7 | import logging
  8 | from typing import Dict, List, Optional, Any
  9 | import boto3
 10 | from botocore.exceptions import ClientError
 11 | 
 12 | logger = logging.getLogger(__name__)
 13 | 
 14 | def get_cost_and_usage(
 15 |     start_date: str,
 16 |     end_date: str,
 17 |     granularity: str = "MONTHLY",
 18 |     metrics: List[str] = None,
 19 |     group_by: Optional[List[Dict[str, str]]] = None,
 20 |     filter_expr: Optional[Dict[str, Any]] = None,
 21 |     region: Optional[str] = None
 22 | ) -> Dict[str, Any]:
 23 |     """
 24 |     Retrieve cost and usage data from AWS Cost Explorer.
 25 |     
 26 |     Args:
 27 |         start_date: Start date in YYYY-MM-DD format
 28 |         end_date: End date in YYYY-MM-DD format
 29 |         granularity: Time granularity (DAILY, MONTHLY, HOURLY)
 30 |         metrics: List of cost metrics to retrieve
 31 |         group_by: Optional grouping dimensions
 32 |         filter_expr: Optional filters
 33 |         region: AWS region (optional)
 34 |         
 35 |     Returns:
 36 |         Dictionary containing the Cost Explorer API response
 37 |     """
 38 |     try:
 39 |         # Set default metrics if not provided
 40 |         if metrics is None:
 41 |             metrics = ["BlendedCost", "UnblendedCost"]
 42 |             
 43 |         # Create Cost Explorer client
 44 |         if region:
 45 |             ce_client = boto3.client('ce', region_name=region)
 46 |         else:
 47 |             ce_client = boto3.client('ce')
 48 |             
 49 |         # Prepare the request parameters
 50 |         params = {
 51 |             'TimePeriod': {
 52 |                 'Start': start_date,
 53 |                 'End': end_date
 54 |             },
 55 |             'Granularity': granularity,
 56 |             'Metrics': metrics
 57 |         }
 58 |         
 59 |         # Add optional parameters if provided
 60 |         if group_by:
 61 |             params['GroupBy'] = group_by
 62 |             
 63 |         if filter_expr:
 64 |             params['Filter'] = filter_expr
 65 |             
 66 |         # Make the API call
 67 |         response = ce_client.get_cost_and_usage(**params)
 68 |         
 69 |         return {
 70 |             "status": "success",
 71 |             "data": response,
 72 |             "message": f"Retrieved cost data from {start_date} to {end_date}"
 73 |         }
 74 |         
 75 |     except ClientError as e:
 76 |         logger.error(f"Error in Cost Explorer API: {str(e)}")
 77 |         return {
 78 |             "status": "error",
 79 |             "message": f"Cost Explorer API error: {str(e)}",
 80 |             "error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
 81 |         }
 82 |         
 83 |     except Exception as e:
 84 |         logger.error(f"Unexpected error in Cost Explorer service: {str(e)}")
 85 |         return {
 86 |             "status": "error",
 87 |             "message": f"Unexpected error: {str(e)}"
 88 |         }
 89 | 
 90 | def get_cost_forecast(
 91 |     start_date: str,
 92 |     end_date: str,
 93 |     granularity: str = "MONTHLY",
 94 |     metric: str = "BLENDED_COST",
 95 |     filter_expr: Optional[Dict[str, Any]] = None,
 96 |     region: Optional[str] = None
 97 | ) -> Dict[str, Any]:
 98 |     """
 99 |     Get a cost forecast from AWS Cost Explorer.
100 |     
101 |     Args:
102 |         start_date: Start date in YYYY-MM-DD format
103 |         end_date: End date in YYYY-MM-DD format
104 |         granularity: Time granularity (DAILY, MONTHLY)
105 |         metric: Cost metric to forecast
106 |         filter_expr: Optional filters
107 |         region: AWS region (optional)
108 |         
109 |     Returns:
110 |         Dictionary containing the Cost Explorer forecast response
111 |     """
112 |     try:
113 |         # Create Cost Explorer client
114 |         if region:
115 |             ce_client = boto3.client('ce', region_name=region)
116 |         else:
117 |             ce_client = boto3.client('ce')
118 |             
119 |         # Prepare the request parameters
120 |         params = {
121 |             'TimePeriod': {
122 |                 'Start': start_date,
123 |                 'End': end_date
124 |             },
125 |             'Granularity': granularity,
126 |             'Metric': metric
127 |         }
128 |         
129 |         # Add optional filter if provided
130 |         if filter_expr:
131 |             params['Filter'] = filter_expr
132 |             
133 |         # Make the API call
134 |         response = ce_client.get_cost_forecast(**params)
135 |         
136 |         return {
137 |             "status": "success",
138 |             "data": response,
139 |             "message": f"Retrieved cost forecast from {start_date} to {end_date}"
140 |         }
141 |         
142 |     except ClientError as e:
143 |         logger.error(f"Error in Cost Explorer forecast API: {str(e)}")
144 |         return {
145 |             "status": "error",
146 |             "message": f"Cost Explorer forecast API error: {str(e)}",
147 |             "error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
148 |         }
149 |         
150 |     except Exception as e:
151 |         logger.error(f"Unexpected error in Cost Explorer forecast service: {str(e)}")
152 |         return {
153 |             "status": "error",
154 |             "message": f"Unexpected error: {str(e)}"
155 |         }
156 | 
157 | def get_cost_categories(
158 |     region: Optional[str] = None
159 | ) -> Dict[str, Any]:
160 |     """
161 |     List cost categories from AWS Cost Explorer.
162 |     
163 |     Args:
164 |         region: AWS region (optional)
165 |         
166 |     Returns:
167 |         Dictionary containing the cost categories
168 |     """
169 |     try:
170 |         # Create Cost Explorer client
171 |         if region:
172 |             ce_client = boto3.client('ce', region_name=region)
173 |         else:
174 |             ce_client = boto3.client('ce')
175 |             
176 |         # Make the API call
177 |         response = ce_client.list_cost_category_definitions()
178 |         
179 |         return {
180 |             "status": "success",
181 |             "data": response,
182 |             "message": f"Retrieved {len(response.get('CostCategoryReferences', []))} cost categories"
183 |         }
184 |         
185 |     except ClientError as e:
186 |         logger.error(f"Error listing cost categories: {str(e)}")
187 |         return {
188 |             "status": "error",
189 |             "message": f"Error listing cost categories: {str(e)}",
190 |             "error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
191 |         }
192 |         
193 |     except Exception as e:
194 |         logger.error(f"Unexpected error listing cost categories: {str(e)}")
195 |         return {
196 |             "status": "error",
197 |             "message": f"Unexpected error: {str(e)}"
198 |         }
199 | 
```

--------------------------------------------------------------------------------
/tests/integration/cloudwatch/test_cloudwatch_integration.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Integration tests for CloudWatch functionality.
  4 | """
  5 | 
  6 | import asyncio
  7 | import json
  8 | import sys
  9 | import os
 10 | import pytest
 11 | 
 12 | # Add the project root to the path
 13 | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))
 14 | 
 15 | from runbook_functions import run_cloudwatch_general_spend_analysis
 16 | 
 17 | 
 18 | @pytest.mark.asyncio
 19 | async def test_cloudwatch_timeout():
 20 |     """Test CloudWatch general spend analysis to replicate timeout issue."""
 21 |     print("Testing CloudWatch general spend analysis timeout issue...")
 22 |     
 23 |     try:
 24 |         # Test with minimal parameters that should trigger the timeout
 25 |         arguments = {
 26 |             "region": "us-east-1",
 27 |             "lookback_days": 7,
 28 |             "page": 1
 29 |         }
 30 |         
 31 |         print(f"Calling run_cloudwatch_general_spend_analysis with: {arguments}")
 32 |         
 33 |         # This should timeout and we should get a full stack trace
 34 |         result = await run_cloudwatch_general_spend_analysis(arguments)
 35 |         
 36 |         print("Result received:")
 37 |         for content in result:
 38 |             print(content.text)
 39 |             
 40 |         return True
 41 |         
 42 |     except Exception as e:
 43 |         print(f"Exception caught in test: {str(e)}")
 44 |         print("Full stack trace:")
 45 |         import traceback
 46 |         traceback.print_exc()
 47 |         return False
 48 | 
 49 | 
 50 | @pytest.mark.asyncio
 51 | async def test_stack_trace_capture():
 52 |     """Test that CloudWatch functions handle errors gracefully with structured responses."""
 53 |     print("Testing CloudWatch error handling...")
 54 |     
 55 |     # Test with invalid arguments that will cause an error
 56 |     arguments = {
 57 |         "region": "us-east-1",
 58 |         "lookback_days": "invalid_string",  # This should cause a type error
 59 |         "page": 1
 60 |     }
 61 |     
 62 |     print(f"Calling run_cloudwatch_general_spend_analysis with invalid lookback_days: {arguments}")
 63 |     
 64 |     try:
 65 |         result = await run_cloudwatch_general_spend_analysis(arguments)
 66 |         
 67 |         print("Result received:")
 68 |         for content in result:
 69 |             result_text = content.text
 70 |             print(result_text)
 71 |             
 72 |             # Parse the JSON response to check for proper error handling
 73 |             import json
 74 |             try:
 75 |                 response_data = json.loads(result_text)
 76 |                 
 77 |                 # Check if it's a proper error response with structured format
 78 |                 if (response_data.get('status') == 'error' and 
 79 |                     'error_message' in response_data and
 80 |                     'analysis_type' in response_data and
 81 |                     'timestamp' in response_data):
 82 |                     print("✅ SUCCESS: Structured error response found")
 83 |                     return True
 84 |                 else:
 85 |                     print("❌ FAILURE: Invalid error response structure")
 86 |                     return False
 87 |                     
 88 |             except json.JSONDecodeError:
 89 |                 print("❌ FAILURE: Response is not valid JSON")
 90 |                 return False
 91 |                 
 92 |     except Exception as e:
 93 |         print(f"❌ FAILURE: Exception not handled properly: {str(e)}")
 94 |         return False
 95 | 
 96 | 
 97 | def test_pricing_cache():
 98 |     """Test that pricing calls are cached and don't block."""
 99 |     print("Testing CloudWatch pricing cache fix...")
100 |     
101 |     try:
102 |         from services.cloudwatch_pricing import CloudWatchPricing
103 |         import time
104 |         
105 |         # Initialize pricing service
106 |         pricing = CloudWatchPricing(region='us-east-1')
107 |         
108 |         # First call - should use fallback pricing and cache it
109 |         print("Making first pricing call...")
110 |         start_time = time.time()
111 |         result1 = pricing.get_metrics_pricing()
112 |         first_call_time = time.time() - start_time
113 |         
114 |         print(f"First call took {first_call_time:.3f} seconds")
115 |         print(f"Status: {result1.get('status')}")
116 |         print(f"Source: {result1.get('source')}")
117 |         
118 |         # Second call - should use cache and be instant
119 |         print("\nMaking second pricing call...")
120 |         start_time = time.time()
121 |         result2 = pricing.get_metrics_pricing()
122 |         second_call_time = time.time() - start_time
123 |         
124 |         print(f"Second call took {second_call_time:.3f} seconds")
125 |         print(f"Status: {result2.get('status')}")
126 |         print(f"Source: {result2.get('source')}")
127 |         
128 |         # Verify caching worked
129 |         if second_call_time < 0.001:  # Should be nearly instant
130 |             print("✅ SUCCESS: Caching is working - second call was instant")
131 |             return True
132 |         else:
133 |             print("❌ FAILURE: Caching not working - second call took too long")
134 |             return False
135 |             
136 |     except Exception as e:
137 |         print(f"❌ Error in pricing cache test: {str(e)}")
138 |         return False
139 | 
140 | 
141 | async def run_cloudwatch_integration_tests():
142 |     """Run all CloudWatch integration tests."""
143 |     print("Starting CloudWatch Integration Tests")
144 |     print("=" * 50)
145 |     
146 |     tests = [
147 |         ("CloudWatch Timeout Handling", test_cloudwatch_timeout),
148 |         ("Error Handling", test_stack_trace_capture),
149 |         ("Pricing Cache", test_pricing_cache),
150 |     ]
151 |     
152 |     passed = 0
153 |     failed = 0
154 |     
155 |     for test_name, test_func in tests:
156 |         try:
157 |             if asyncio.iscoroutinefunction(test_func):
158 |                 result = await test_func()
159 |             else:
160 |                 result = test_func()
161 |                 
162 |             if result:
163 |                 print(f"✓ PASS: {test_name}")
164 |                 passed += 1
165 |             else:
166 |                 print(f"✗ FAIL: {test_name}")
167 |                 failed += 1
168 |         except Exception as e:
169 |             print(f"✗ FAIL: {test_name} - Exception: {e}")
170 |             failed += 1
171 |     
172 |     print("=" * 50)
173 |     print(f"CloudWatch Integration Tests: {passed + failed} total, {passed} passed, {failed} failed")
174 |     
175 |     if failed == 0:
176 |         print("🎉 ALL CLOUDWATCH INTEGRATION TESTS PASSED!")
177 |         return True
178 |     else:
179 |         print(f"❌ {failed} CLOUDWATCH INTEGRATION TESTS FAILED")
180 |         return False
181 | 
182 | if __name__ == "__main__":
183 |     success = asyncio.run(run_cloudwatch_integration_tests())
184 |     sys.exit(0 if success else 1)
```

--------------------------------------------------------------------------------
/tests/unit/s3/live/test_s3_governance_bucket_discovery.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Live test to debug S3 governance check bucket discovery issue.
  3 | 
  4 | This test will help identify why s3_governance_check returns 0 buckets
  5 | when there are actually 40+ buckets in the account.
  6 | """
  7 | 
  8 | import asyncio
  9 | import logging
 10 | import pytest
 11 | from typing import Dict, Any
 12 | 
 13 | # Set up logging to see debug messages
 14 | logging.basicConfig(level=logging.DEBUG)
 15 | logger = logging.getLogger(__name__)
 16 | 
 17 | @pytest.mark.live
 18 | async def test_s3_bucket_discovery_debug():
 19 |     """
 20 |     Debug the S3 bucket discovery mechanism used by governance check.
 21 |     
 22 |     This test will step through the bucket discovery process to identify
 23 |     where the silent failure is occurring.
 24 |     """
 25 |     
 26 |     # Test 1: Direct S3Service bucket listing
 27 |     logger.info("=== Test 1: Direct S3Service bucket listing ===")
 28 |     try:
 29 |         from services.s3_service import S3Service
 30 |         
 31 |         s3_service = S3Service(region='us-east-1')
 32 |         logger.info(f"S3Service initialized: {s3_service}")
 33 |         
 34 |         # Test the list_buckets method directly
 35 |         buckets_result = await s3_service.list_buckets()
 36 |         logger.info(f"S3Service.list_buckets() result: {buckets_result}")
 37 |         
 38 |         if buckets_result.get("status") == "success":
 39 |             buckets = buckets_result.get("data", {}).get("Buckets", [])
 40 |             logger.info(f"Found {len(buckets)} buckets via S3Service")
 41 |             for i, bucket in enumerate(buckets[:5]):  # Show first 5
 42 |                 logger.info(f"  Bucket {i+1}: {bucket.get('Name')} (Region: {bucket.get('Region', 'unknown')})")
 43 |         else:
 44 |             logger.error(f"S3Service.list_buckets() failed: {buckets_result}")
 45 |             
 46 |     except Exception as e:
 47 |         logger.error(f"Error in S3Service test: {str(e)}")
 48 |     
 49 |     # Test 2: Direct boto3 client call
 50 |     logger.info("\n=== Test 2: Direct boto3 client call ===")
 51 |     try:
 52 |         import boto3
 53 |         from botocore.exceptions import ClientError
 54 |         
 55 |         s3_client = boto3.client('s3', region_name='us-east-1')
 56 |         logger.info(f"Boto3 S3 client created: {s3_client}")
 57 |         
 58 |         # Direct list_buckets call
 59 |         response = s3_client.list_buckets()
 60 |         buckets = response.get('Buckets', [])
 61 |         logger.info(f"Found {len(buckets)} buckets via direct boto3 call")
 62 |         
 63 |         for i, bucket in enumerate(buckets[:5]):  # Show first 5
 64 |             logger.info(f"  Bucket {i+1}: {bucket.get('Name')} (Created: {bucket.get('CreationDate')})")
 65 |             
 66 |     except ClientError as e:
 67 |         logger.error(f"AWS ClientError in direct boto3 test: {e}")
 68 |     except Exception as e:
 69 |         logger.error(f"Error in direct boto3 test: {str(e)}")
 70 |     
 71 |     # Test 3: GovernanceAnalyzer bucket discovery
 72 |     logger.info("\n=== Test 3: GovernanceAnalyzer bucket discovery ===")
 73 |     try:
 74 |         from playbooks.s3.analyzers.governance_analyzer import GovernanceAnalyzer
 75 |         from services.s3_service import S3Service
 76 |         
 77 |         s3_service = S3Service(region='us-east-1')
 78 |         analyzer = GovernanceAnalyzer(s3_service=s3_service)
 79 |         logger.info(f"GovernanceAnalyzer initialized: {analyzer}")
 80 |         
 81 |         # Test the _get_buckets_to_analyze method
 82 |         context = {'region': 'us-east-1'}
 83 |         buckets_to_analyze = await analyzer._get_buckets_to_analyze(context)
 84 |         logger.info(f"GovernanceAnalyzer._get_buckets_to_analyze() returned: {len(buckets_to_analyze)} buckets")
 85 |         
 86 |         for i, bucket_name in enumerate(buckets_to_analyze[:5]):  # Show first 5
 87 |             logger.info(f"  Bucket {i+1}: {bucket_name}")
 88 |             
 89 |     except Exception as e:
 90 |         logger.error(f"Error in GovernanceAnalyzer test: {str(e)}")
 91 |     
 92 |     # Test 4: Full governance analysis
 93 |     logger.info("\n=== Test 4: Full governance analysis ===")
 94 |     try:
 95 |         from playbooks.s3.s3_optimization_orchestrator import S3OptimizationOrchestrator
 96 |         
 97 |         orchestrator = S3OptimizationOrchestrator(region='us-east-1')
 98 |         logger.info(f"S3OptimizationOrchestrator initialized: {orchestrator}")
 99 |         
100 |         # Execute governance analysis
101 |         result = await orchestrator.execute_analysis("governance", region='us-east-1')
102 |         logger.info(f"Governance analysis result status: {result.get('status')}")
103 |         logger.info(f"Total buckets analyzed: {result.get('data', {}).get('total_buckets_analyzed', 0)}")
104 |         
105 |         if result.get('status') == 'error':
106 |             logger.error(f"Governance analysis error: {result.get('message')}")
107 |         
108 |     except Exception as e:
109 |         logger.error(f"Error in full governance analysis test: {str(e)}")
110 |     
111 |     # Test 5: Check AWS credentials and permissions
112 |     logger.info("\n=== Test 5: AWS credentials and permissions check ===")
113 |     try:
114 |         import boto3
115 |         
116 |         # Check STS identity
117 |         sts_client = boto3.client('sts', region_name='us-east-1')
118 |         identity = sts_client.get_caller_identity()
119 |         logger.info(f"AWS Identity: {identity}")
120 |         
121 |         # Test S3 permissions
122 |         s3_client = boto3.client('s3', region_name='us-east-1')
123 |         
124 |         # Test list_buckets permission
125 |         try:
126 |             response = s3_client.list_buckets()
127 |             logger.info(f"list_buckets permission: OK ({len(response.get('Buckets', []))} buckets)")
128 |         except ClientError as e:
129 |             logger.error(f"list_buckets permission: DENIED - {e}")
130 |         
131 |         # Test get_bucket_location permission on first bucket
132 |         try:
133 |             response = s3_client.list_buckets()
134 |             if response.get('Buckets'):
135 |                 first_bucket = response['Buckets'][0]['Name']
136 |                 location = s3_client.get_bucket_location(Bucket=first_bucket)
137 |                 logger.info(f"get_bucket_location permission: OK (tested on {first_bucket})")
138 |             else:
139 |                 logger.warning("No buckets to test get_bucket_location permission")
140 |         except ClientError as e:
141 |             logger.error(f"get_bucket_location permission: DENIED - {e}")
142 |             
143 |     except Exception as e:
144 |         logger.error(f"Error in credentials/permissions test: {str(e)}")
145 | 
146 | if __name__ == "__main__":
147 |     # Run the test directly
148 |     asyncio.run(test_s3_bucket_discovery_debug())
```

--------------------------------------------------------------------------------
/tests/unit/analyzers/conftest_cloudwatch.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | CloudWatch-specific pytest configuration and fixtures.
  3 | 
  4 | This module provides CloudWatch-specific fixtures for testing analyzers and services.
  5 | """
  6 | 
  7 | import pytest
  8 | import boto3
  9 | from unittest.mock import Mock, AsyncMock
 10 | from datetime import datetime, timedelta
 11 | from moto import mock_aws
 12 | 
 13 | from services.cloudwatch_service import CloudWatchOperationResult
 14 | 
 15 | 
 16 | @pytest.fixture
 17 | def mock_aws_credentials():
 18 |     """Mock AWS credentials for testing."""
 19 |     import os
 20 |     with patch.dict(os.environ, {
 21 |         'AWS_ACCESS_KEY_ID': 'testing',
 22 |         'AWS_SECRET_ACCESS_KEY': 'testing',
 23 |         'AWS_SECURITY_TOKEN': 'testing',
 24 |         'AWS_SESSION_TOKEN': 'testing',
 25 |         'AWS_DEFAULT_REGION': 'us-east-1'
 26 |     }):
 27 |         yield
 28 | 
 29 | 
 30 | @pytest.fixture
 31 | def mock_cloudwatch_client(mock_aws_credentials):
 32 |     """Mock CloudWatch client with moto."""
 33 |     with mock_aws():
 34 |         yield boto3.client('cloudwatch', region_name='us-east-1')
 35 | 
 36 | 
 37 | @pytest.fixture
 38 | def mock_logs_client(mock_aws_credentials):
 39 |     """Mock CloudWatch Logs client with moto."""
 40 |     with mock_aws():
 41 |         yield boto3.client('logs', region_name='us-east-1')
 42 | 
 43 | 
 44 | @pytest.fixture
 45 | def mock_ce_client(mock_aws_credentials):
 46 |     """Mock Cost Explorer client with moto."""
 47 |     with mock_aws():
 48 |         yield boto3.client('ce', region_name='us-east-1')
 49 | 
 50 | 
 51 | @pytest.fixture
 52 | def sample_cloudwatch_cost_data():
 53 |     """Sample CloudWatch Cost Explorer response data."""
 54 |     return {
 55 |         "ResultsByTime": [
 56 |             {
 57 |                 "TimePeriod": {
 58 |                     "Start": "2024-01-01",
 59 |                     "End": "2024-01-02"
 60 |                 },
 61 |                 "Groups": [
 62 |                     {
 63 |                         "Keys": ["DataIngestion-Bytes"],
 64 |                         "Metrics": {
 65 |                             "UnblendedCost": {"Amount": "5.25", "Unit": "USD"},
 66 |                             "UsageQuantity": {"Amount": "10.5", "Unit": "GB"}
 67 |                         }
 68 |                     },
 69 |                     {
 70 |                         "Keys": ["DataStorage-ByteHrs"],
 71 |                         "Metrics": {
 72 |                             "UnblendedCost": {"Amount": "2.10", "Unit": "USD"},
 73 |                             "UsageQuantity": {"Amount": "70.0", "Unit": "GB-Hours"}
 74 |                         }
 75 |                     }
 76 |                 ]
 77 |             }
 78 |         ]
 79 |     }
 80 | 
 81 | 
 82 | @pytest.fixture
 83 | def sample_cloudwatch_alarms():
 84 |     """Sample CloudWatch alarms data."""
 85 |     return [
 86 |         {
 87 |             "AlarmName": "test-alarm-1",
 88 |             "AlarmDescription": "Test alarm with actions",
 89 |             "StateValue": "OK",
 90 |             "AlarmActions": ["arn:aws:sns:us-east-1:123456789012:test-topic"],
 91 |             "Period": 300,
 92 |             "MetricName": "CPUUtilization"
 93 |         },
 94 |         {
 95 |             "AlarmName": "test-alarm-2",
 96 |             "AlarmDescription": "Test alarm without actions",
 97 |             "StateValue": "INSUFFICIENT_DATA",
 98 |             "AlarmActions": [],
 99 |             "Period": 60,  # High resolution
100 |             "MetricName": "NetworkIn"
101 |         }
102 |     ]
103 | 
104 | 
105 | @pytest.fixture
106 | def sample_cloudwatch_log_groups():
107 |     """Sample CloudWatch log groups data."""
108 |     return [
109 |         {
110 |             "logGroupName": "/aws/lambda/test-function",
111 |             "creationTime": int((datetime.now() - timedelta(days=30)).timestamp() * 1000),
112 |             "retentionInDays": 14,
113 |             "storedBytes": 1024000
114 |         },
115 |         {
116 |             "logGroupName": "/aws/apigateway/test-api",
117 |             "creationTime": int((datetime.now() - timedelta(days=400)).timestamp() * 1000),
118 |             "storedBytes": 2048000
119 |             # No retention policy
120 |         }
121 |     ]
122 | 
123 | 
124 | @pytest.fixture
125 | def mock_cloudwatch_pricing_service():
126 |     """Mock CloudWatch pricing service instance."""
127 |     service = Mock()
128 |     service.region = "us-east-1"
129 |     
130 |     def mock_get_logs_pricing():
131 |         return {
132 |             "status": "success",
133 |             "logs_pricing": {
134 |                 "ingestion_per_gb": 0.50,
135 |                 "storage_per_gb_month": 0.03,
136 |                 "insights_per_gb_scanned": 0.005
137 |             }
138 |         }
139 |     
140 |     def mock_calculate_logs_cost(log_groups_data):
141 |         total_cost = 0.0
142 |         for log_group in log_groups_data:
143 |             stored_gb = log_group.get('storedBytes', 0) / (1024**3)
144 |             total_cost += stored_gb * 0.03
145 |         
146 |         return {
147 |             "status": "success",
148 |             "total_monthly_cost": total_cost,
149 |             "cost_breakdown": {
150 |                 "storage_cost": total_cost,
151 |                 "ingestion_cost": 0.0,
152 |                 "insights_cost": 0.0
153 |             }
154 |         }
155 |     
156 |     service.get_logs_pricing = mock_get_logs_pricing
157 |     service.calculate_logs_cost = mock_calculate_logs_cost
158 |     
159 |     return service
160 | 
161 | 
162 | @pytest.fixture
163 | def mock_cloudwatch_service():
164 |     """Mock CloudWatch service instance."""
165 |     service = Mock()
166 |     service.region = "us-east-1"
167 |     service.operation_count = 0
168 |     service.cost_incurring_operations = []
169 |     service.total_execution_time = 0.0
170 |     
171 |     # Mock async methods
172 |     async def mock_list_metrics(namespace=None, metric_name=None, dimensions=None):
173 |         return CloudWatchOperationResult(
174 |             success=True,
175 |             data={
176 |                 'metrics': [
177 |                     {'Namespace': 'AWS/EC2', 'MetricName': 'CPUUtilization'},
178 |                     {'Namespace': 'Custom/App', 'MetricName': 'RequestCount'}
179 |                 ],
180 |                 'total_count': 2
181 |             },
182 |             operation_name='list_metrics',
183 |             operation_type='free'
184 |         )
185 |     
186 |     async def mock_describe_alarms(alarm_names=None):
187 |         return CloudWatchOperationResult(
188 |             success=True,
189 |             data={
190 |                 'alarms': [
191 |                     {
192 |                         'AlarmName': 'test-alarm',
193 |                         'StateValue': 'OK',
194 |                         'AlarmActions': ['arn:aws:sns:us-east-1:123456789012:test-topic'],
195 |                         'Period': 300
196 |                     }
197 |                 ],
198 |                 'total_count': 1,
199 |                 'analysis': {
200 |                     'total_alarms': 1,
201 |                     'alarms_by_state': {'OK': 1},
202 |                     'alarms_without_actions': []
203 |                 }
204 |             },
205 |             operation_name='describe_alarms',
206 |             operation_type='free'
207 |         )
208 |     
209 |     service.list_metrics = mock_list_metrics
210 |     service.describe_alarms = mock_describe_alarms
211 |     
212 |     return service
```

--------------------------------------------------------------------------------
/test_runbooks.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Test script for CFM Tips AWS Cost Optimization MCP Server
  4 | """
  5 | 
  6 | import sys
  7 | import os
  8 | 
  9 | # Add current directory to path
 10 | sys.path.append(os.path.dirname(os.path.abspath(__file__)))
 11 | 
 12 | def test_imports():
 13 |     """Test that all imports work correctly."""
 14 |     print("Testing imports...")
 15 |     
 16 |     try:
 17 |         # Test MCP server imports
 18 |         from mcp.server import Server
 19 |         from mcp.server.stdio import stdio_server
 20 |         from mcp.types import Tool, TextContent
 21 |         print("✅ MCP imports successful")
 22 |         
 23 |         # Test AWS imports
 24 |         import boto3
 25 |         from botocore.exceptions import ClientError, NoCredentialsError
 26 |         print("✅ AWS imports successful")
 27 |         
 28 |         # Test runbook functions import
 29 |         from runbook_functions import (
 30 |             run_ec2_right_sizing_analysis,
 31 |             generate_ec2_right_sizing_report,
 32 |             run_ebs_optimization_analysis,
 33 |             identify_unused_ebs_volumes,
 34 |             generate_ebs_optimization_report,
 35 |             run_rds_optimization_analysis,
 36 |             identify_idle_rds_instances,
 37 |             generate_rds_optimization_report,
 38 |             run_lambda_optimization_analysis,
 39 |             identify_unused_lambda_functions,
 40 |             generate_lambda_optimization_report,
 41 |             run_comprehensive_cost_analysis,
 42 |             get_management_trails,
 43 |             run_cloudtrail_trails_analysis,
 44 |             generate_cloudtrail_report
 45 |         )
 46 |         print("✅ Runbook functions import successful")
 47 |         
 48 |         return True
 49 |         
 50 |     except ImportError as e:
 51 |         print(f"❌ Import error: {e}")
 52 |         return False
 53 |     except Exception as e:
 54 |         print(f"❌ Unexpected error: {e}")
 55 |         return False
 56 | 
 57 | def test_server_creation():
 58 |     """Test that the MCP server can be created."""
 59 |     print("\nTesting server creation...")
 60 |     
 61 |     try:
 62 |         # Import the server module
 63 |         import mcp_server_with_runbooks
 64 |         print("✅ Server module imported successfully")
 65 |         
 66 |         # Check if server is created
 67 |         if hasattr(mcp_server_with_runbooks, 'server'):
 68 |             print("✅ Server object created successfully")
 69 |             
 70 |             # Check server name
 71 |             if mcp_server_with_runbooks.server.name == "cfm_tips":
 72 |                 print("✅ Server name is correct: cfm_tips")
 73 |             else:
 74 |                 print(f"⚠️  Server name: {mcp_server_with_runbooks.server.name}")
 75 |             
 76 |             return True
 77 |         else:
 78 |             print("❌ Server object not found")
 79 |             return False
 80 |             
 81 |     except Exception as e:
 82 |         print(f"❌ Server creation error: {str(e)}")
 83 |         return False
 84 | 
 85 | def test_cloudtrail_functions():
 86 |     """Test CloudTrail optimization functions."""
 87 |     print("\nTesting CloudTrail functions...")
 88 |     
 89 |     try:
 90 |         from runbook_functions import (
 91 |             get_management_trails,
 92 |             run_cloudtrail_trails_analysis,
 93 |             generate_cloudtrail_report
 94 |         )
 95 |         print("✅ CloudTrail functions imported successfully")
 96 |         
 97 |         # Test function signatures
 98 |         import inspect
 99 |         
100 |         # Check get_management_trails
101 |         sig = inspect.signature(get_management_trails)
102 |         if 'arguments' in sig.parameters:
103 |             print("✅ get_management_trails has correct signature")
104 |         else:
105 |             print("❌ get_management_trails signature incorrect")
106 |             return False
107 |             
108 |         # Check run_cloudtrail_trails_analysis
109 |         sig = inspect.signature(run_cloudtrail_trails_analysis)
110 |         if 'arguments' in sig.parameters:
111 |             print("✅ run_cloudtrail_trails_analysis has correct signature")
112 |         else:
113 |             print("❌ run_cloudtrail_trails_analysis signature incorrect")
114 |             return False
115 |             
116 |         # Check generate_cloudtrail_report
117 |         sig = inspect.signature(generate_cloudtrail_report)
118 |         if 'arguments' in sig.parameters:
119 |             print("✅ generate_cloudtrail_report has correct signature")
120 |         else:
121 |             print("❌ generate_cloudtrail_report signature incorrect")
122 |             return False
123 |             
124 |         return True
125 |         
126 |     except ImportError as e:
127 |         print(f"❌ CloudTrail import error: {e}")
128 |         return False
129 |     except Exception as e:
130 |         print(f"❌ CloudTrail test error: {e}")
131 |         return False
132 | 
133 | def test_tool_names():
134 |     """Test that tool names are within MCP limits."""
135 |     print("\nTesting tool name lengths...")
136 |     
137 |     server_name = "cfm_tips"
138 |     sample_tools = [
139 |         "ec2_rightsizing",
140 |         "ebs_optimization", 
141 |         "rds_idle",
142 |         "lambda_unused",
143 |         "comprehensive_analysis",
144 |         "get_coh_recommendations",
145 |         "cloudtrail_optimization"
146 |     ]
147 |     
148 |     max_length = 0
149 |     for tool in sample_tools:
150 |         combined = f"{server_name}___{tool}"
151 |         length = len(combined)
152 |         max_length = max(max_length, length)
153 |         
154 |         if length > 64:
155 |             print(f"❌ Tool name too long: {combined} ({length} chars)")
156 |             return False
157 |     
158 |     print(f"✅ All tool names within limit (max: {max_length} chars)")
159 |     return True
160 | 
161 | def main():
162 |     """Run all tests."""
163 |     print("CFM Tips AWS Cost Optimization MCP Server - Integration Test")
164 |     print("=" * 65)
165 |     
166 |     tests_passed = 0
167 |     total_tests = 4
168 |     
169 |     # Test imports
170 |     if test_imports():
171 |         tests_passed += 1
172 |     
173 |     # Test server creation
174 |     if test_server_creation():
175 |         tests_passed += 1
176 |     
177 |     # Test CloudTrail functions
178 |     if test_cloudtrail_functions():
179 |         tests_passed += 1
180 |     
181 |     # Test tool names
182 |     if test_tool_names():
183 |         tests_passed += 1
184 |     
185 |     print(f"\n" + "=" * 65)
186 |     print(f"Tests passed: {tests_passed}/{total_tests}")
187 |     
188 |     if tests_passed == total_tests:
189 |         print("✅ All integration tests passed!")
190 |         print("\nNext steps:")
191 |         print("1. Configure AWS credentials: aws configure")
192 |         print("2. Apply the correct IAM permissions (see CORRECTED_PERMISSIONS.md)")
193 |         print("3. Start the server: q chat --mcp-config \"$(pwd)/mcp_runbooks.json\"")
194 |         print("4. Test with: \"Run comprehensive cost analysis for us-east-1\"")
195 |         print("\n🎉 CFM Tips is ready to help optimize your AWS costs!")
196 |         return True
197 |     else:
198 |         print("❌ Some tests failed. Check the errors above.")
199 |         return False
200 | 
201 | if __name__ == "__main__":
202 |     success = main()
203 |     sys.exit(0 if success else 1)
204 | 
```

--------------------------------------------------------------------------------
/tests/legacy/test_runbook_integration.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | 
  4 | This script tests that the runbook functions work correctly with the new S3OptimizationOrchestrator.
  5 | """
  6 | 
  7 | import asyncio
  8 | import json
  9 | import logging
 10 | import sys
 11 | from typing import Dict, Any
 12 | 
 13 | # Configure logging
 14 | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 15 | logger = logging.getLogger(__name__)
 16 | 
 17 | async def test_s3_playbook_functions():
 18 |     """Test S3 playbook functions with orchestrator."""
 19 |     logger.info("=== Testing S3 Playbook Functions ===")
 20 |     
 21 |     try:
 22 |         # Import S3 functions directly from orchestrator
 23 |         from playbooks.s3.s3_optimization_orchestrator import (
 24 |             run_s3_quick_analysis,
 25 |             run_s3_general_spend_analysis,
 26 |             run_s3_comprehensive_optimization_tool
 27 |         )
 28 |         
 29 |         # Test arguments for S3 functions
 30 |         test_args = {
 31 |             "region": "us-east-1",
 32 |             "lookback_days": 7,
 33 |             "timeout_seconds": 30,
 34 |             "store_results": True
 35 |         }
 36 |         
 37 |         # Test general spend analysis
 38 |         logger.info("Testing general spend analysis...")
 39 |         result = await run_s3_general_spend_analysis(test_args)
 40 |         
 41 |         if not result or not isinstance(result, list):
 42 |             logger.error("✗ General spend analysis returned invalid result")
 43 |             return False
 44 |         
 45 |         # Parse the result
 46 |         try:
 47 |             result_data = json.loads(result[0].text)
 48 |             if result_data.get("status") not in ["success", "error"]:
 49 |                 logger.error(f"✗ Unexpected status: {result_data.get('status')}")
 50 |                 return False
 51 |             logger.info(f"✓ General spend analysis: {result_data.get('status')}")
 52 |         except Exception as e:
 53 |             logger.error(f"✗ Failed to parse result: {e}")
 54 |             return False
 55 |         
 56 |         # Test comprehensive analysis
 57 |         logger.info("Testing comprehensive analysis...")
 58 |         comprehensive_args = test_args.copy()
 59 |         comprehensive_args["timeout_seconds"] = 60
 60 |         
 61 |         result = await run_s3_comprehensive_optimization_tool(comprehensive_args)
 62 |         
 63 |         if not result or not isinstance(result, list):
 64 |             logger.error("✗ Comprehensive analysis returned invalid result")
 65 |             return False
 66 |         
 67 |         try:
 68 |             result_data = json.loads(result[0].text)
 69 |             if result_data.get("status") not in ["success", "error"]:
 70 |                 logger.error(f"✗ Unexpected comprehensive status: {result_data.get('status')}")
 71 |                 return False
 72 |             logger.info(f"✓ Comprehensive analysis: {result_data.get('status')}")
 73 |         except Exception as e:
 74 |             logger.error(f"✗ Failed to parse comprehensive result: {e}")
 75 |             return False
 76 |         
 77 |         logger.info("✓ All S3 runbook functions working with new orchestrator")
 78 |         return True
 79 |         
 80 |     except Exception as e:
 81 |         logger.error(f"✗ S3 runbook function test failed: {e}")
 82 |         return False
 83 | 
 84 | async def test_session_data_storage():
 85 |     """Test that session data is being stored correctly."""
 86 |     logger.info("=== Testing Session Data Storage ===")
 87 |     
 88 |     try:
 89 |         from playbooks.s3.s3_optimization_orchestrator import S3OptimizationOrchestrator
 90 |         
 91 |         orchestrator = S3OptimizationOrchestrator(region="us-east-1")
 92 |         
 93 |         # Run an analysis that should store data
 94 |         result = await orchestrator.execute_analysis(
 95 |             analysis_type="general_spend",
 96 |             region="us-east-1",
 97 |             lookback_days=7,
 98 |             store_results=True
 99 |         )
100 |         
101 |         if result.get("status") != "success":
102 |             logger.warning(f"Analysis not successful: {result.get('status')}")
103 |             return True  # Still pass if analysis runs but has issues
104 |         
105 |         # Check that tables were created
106 |         tables = orchestrator.get_stored_tables()
107 |         if not tables:
108 |             logger.warning("No tables found after analysis")
109 |             return True  # Still pass - may be expected in test environment
110 |         
111 |         logger.info(f"✓ Session data storage working: {len(tables)} tables created")
112 |         return True
113 |         
114 |     except Exception as e:
115 |         logger.error(f"✗ Session data storage test failed: {e}")
116 |         return False
117 | 
118 | async def test_no_cost_compliance():
119 |     """Test that no cost-incurring operations are performed."""
120 |     logger.info("=== Testing No-Cost Compliance ===")
121 |     
122 |     try:
123 |         from services.s3_service import S3Service
124 |         
125 |         service = S3Service(region="us-east-1")
126 |         
127 |         # Check operation stats
128 |         stats = service.get_operation_stats()
129 |         
130 |         # Verify only allowed operations were called
131 |         forbidden_ops = {'list_objects', 'list_objects_v2', 'head_object', 'get_object'}
132 |         called_forbidden = set(stats.keys()).intersection(forbidden_ops)
133 |         
134 |         if called_forbidden:
135 |             logger.error(f"✗ Forbidden operations called: {called_forbidden}")
136 |             return False
137 |         
138 |         logger.info(f"✓ No-cost compliance verified: {len(stats)} allowed operations called")
139 |         return True
140 |         
141 |     except Exception as e:
142 |         logger.error(f"✗ No-cost compliance test failed: {e}")
143 |         return False
144 | 
145 | async def run_integration_tests():
146 |     """Run all integration tests."""
147 |     logger.info("Starting Runbook Integration Tests")
148 |     logger.info("=" * 50)
149 |     
150 |     tests = [
151 |         ("S3 Playbook Functions", test_s3_playbook_functions),
152 |         ("Session Data Storage", test_session_data_storage),
153 |         ("No-Cost Compliance", test_no_cost_compliance),
154 |     ]
155 |     
156 |     passed = 0
157 |     failed = 0
158 |     
159 |     for test_name, test_func in tests:
160 |         try:
161 |             result = await test_func()
162 |             if result:
163 |                 logger.info(f"✓ PASS: {test_name}")
164 |                 passed += 1
165 |             else:
166 |                 logger.error(f"✗ FAIL: {test_name}")
167 |                 failed += 1
168 |         except Exception as e:
169 |             logger.error(f"✗ FAIL: {test_name} - Exception: {e}")
170 |             failed += 1
171 |     
172 |     logger.info("=" * 50)
173 |     logger.info(f"Integration Tests: {passed + failed} total, {passed} passed, {failed} failed")
174 |     
175 |     if failed == 0:
176 |         logger.info("🎉 ALL INTEGRATION TESTS PASSED!")
177 |         return True
178 |     else:
179 |         logger.error(f"❌ {failed} INTEGRATION TESTS FAILED")
180 |         return False
181 | 
182 | if __name__ == "__main__":
183 |     success = asyncio.run(run_integration_tests())
184 |     sys.exit(0 if success else 1)
```

--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_cache_control.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Test Cache Control for CloudWatch Optimization
  3 | 
  4 | Demonstrates how to control caching behavior for testing purposes.
  5 | """
  6 | 
  7 | import pytest
  8 | import os
  9 | from utils.cache_decorator import (
 10 |     dao_cache,
 11 |     is_cache_enabled,
 12 |     enable_cache,
 13 |     disable_cache,
 14 |     clear_cache,
 15 |     get_cache_stats
 16 | )
 17 | 
 18 | 
 19 | class TestCacheControl:
 20 |     """Test cache control functionality."""
 21 |     
 22 |     def test_cache_enabled_by_default(self):
 23 |         """Test that cache is enabled by default."""
 24 |         # Cache should be enabled by default (unless CFM_ENABLE_CACHE=false)
 25 |         assert is_cache_enabled() in (True, False)  # Depends on environment
 26 |     
 27 |     def test_disable_cache_programmatically(self):
 28 |         """Test disabling cache programmatically."""
 29 |         # Save original state
 30 |         original_state = is_cache_enabled()
 31 |         
 32 |         try:
 33 |             # Disable cache
 34 |             disable_cache()
 35 |             assert is_cache_enabled() is False
 36 |             
 37 |             # Enable cache
 38 |             enable_cache()
 39 |             assert is_cache_enabled() is True
 40 |         finally:
 41 |             # Restore original state
 42 |             if original_state:
 43 |                 enable_cache()
 44 |             else:
 45 |                 disable_cache()
 46 |     
 47 |     def test_cache_decorator_respects_global_setting(self):
 48 |         """Test that decorator respects global cache setting."""
 49 |         call_count = 0
 50 |         
 51 |         @dao_cache(ttl_seconds=60)
 52 |         def test_function(value):
 53 |             nonlocal call_count
 54 |             call_count += 1
 55 |             return value * 2
 56 |         
 57 |         # Save original state
 58 |         original_state = is_cache_enabled()
 59 |         
 60 |         try:
 61 |             # Test with cache enabled
 62 |             enable_cache()
 63 |             clear_cache()
 64 |             call_count = 0
 65 |             
 66 |             result1 = test_function(5)
 67 |             result2 = test_function(5)
 68 |             
 69 |             assert result1 == 10
 70 |             assert result2 == 10
 71 |             assert call_count == 1  # Should only call once due to caching
 72 |             
 73 |             # Test with cache disabled
 74 |             disable_cache()
 75 |             clear_cache()
 76 |             call_count = 0
 77 |             
 78 |             result1 = test_function(5)
 79 |             result2 = test_function(5)
 80 |             
 81 |             assert result1 == 10
 82 |             assert result2 == 10
 83 |             assert call_count == 2  # Should call twice without caching
 84 |         finally:
 85 |             # Restore original state
 86 |             if original_state:
 87 |                 enable_cache()
 88 |             else:
 89 |                 disable_cache()
 90 |     
 91 |     def test_cache_decorator_with_enabled_parameter(self):
 92 |         """Test that decorator enabled parameter overrides global setting."""
 93 |         call_count = 0
 94 |         
 95 |         @dao_cache(ttl_seconds=60, enabled=False)
 96 |         def always_uncached(value):
 97 |             nonlocal call_count
 98 |             call_count += 1
 99 |             return value * 2
100 |         
101 |         # Save original state
102 |         original_state = is_cache_enabled()
103 |         
104 |         try:
105 |             # Even with cache enabled globally, this function should not cache
106 |             enable_cache()
107 |             clear_cache()
108 |             call_count = 0
109 |             
110 |             result1 = always_uncached(5)
111 |             result2 = always_uncached(5)
112 |             
113 |             assert result1 == 10
114 |             assert result2 == 10
115 |             assert call_count == 2  # Should call twice (caching disabled)
116 |         finally:
117 |             # Restore original state
118 |             if original_state:
119 |                 enable_cache()
120 |             else:
121 |                 disable_cache()
122 |     
123 |     def test_cache_stats(self):
124 |         """Test cache statistics.
125 |         
126 |         NOTE: This test uses 'page' parameter which is in the important_params list
127 |         of _generate_cache_key(). Using other parameters may not generate unique
128 |         cache keys due to the selective parameter inclusion in the cache decorator.
129 |         """
130 |         # Save original state
131 |         original_state = is_cache_enabled()
132 |         
133 |         try:
134 |             enable_cache()
135 |             clear_cache()
136 |             
137 |             # Define function after clearing cache to ensure clean state
138 |             # Use 'page' parameter which is in the cache decorator's important_params list
139 |             @dao_cache(ttl_seconds=60)
140 |             def test_function(page=1):
141 |                 return page * 2
142 |             
143 |             # Make some calls using page parameter (which IS in important_params)
144 |             result1 = test_function(page=1)  # MISS
145 |             result2 = test_function(page=1)  # HIT
146 |             result3 = test_function(page=2)  # MISS
147 |             result4 = test_function(page=2)  # HIT
148 |             
149 |             # Verify results are correct
150 |             assert result1 == 2
151 |             assert result2 == 2
152 |             assert result3 == 4
153 |             assert result4 == 4
154 |             
155 |             stats = get_cache_stats()
156 |             
157 |             assert 'hits' in stats
158 |             assert 'misses' in stats
159 |             assert 'hit_rate' in stats
160 |             assert 'enabled' in stats
161 |             assert stats['enabled'] is True
162 |             # The test expects exactly 2 hits and 2 misses
163 |             assert stats['hits'] == 2, f"Expected 2 hits but got {stats['hits']}"
164 |             assert stats['misses'] == 2, f"Expected 2 misses but got {stats['misses']}"
165 |         finally:
166 |             # Restore original state
167 |             clear_cache()
168 |             if original_state:
169 |                 enable_cache()
170 |             else:
171 |                 disable_cache()
172 | 
173 | 
174 | class TestCacheEnvironmentVariable:
175 |     """Test cache control via environment variable."""
176 |     
177 |     def test_cache_disabled_via_env_var(self, monkeypatch):
178 |         """Test disabling cache via CFM_ENABLE_CACHE environment variable."""
179 |         # This test would need to reload the module to test env var
180 |         # For now, just document the behavior
181 |         pass
182 | 
183 | 
184 | # Example usage in tests
185 | @pytest.fixture
186 | def disable_cache_for_test():
187 |     """Fixture to disable cache for a specific test."""
188 |     original_state = is_cache_enabled()
189 |     disable_cache()
190 |     clear_cache()
191 |     yield
192 |     if original_state:
193 |         enable_cache()
194 |     else:
195 |         disable_cache()
196 | 
197 | 
198 | def test_with_cache_disabled(disable_cache_for_test):
199 |     """Example test that runs with cache disabled."""
200 |     # Your test code here
201 |     # Cache will be disabled for this test
202 |     assert is_cache_enabled() is False
203 | 
204 | 
205 | @pytest.fixture
206 | def enable_cache_for_test():
207 |     """Fixture to enable cache for a specific test."""
208 |     original_state = is_cache_enabled()
209 |     enable_cache()
210 |     clear_cache()
211 |     yield
212 |     if original_state:
213 |         enable_cache()
214 |     else:
215 |         disable_cache()
216 | 
217 | 
218 | def test_with_cache_enabled(enable_cache_for_test):
219 |     """Example test that runs with cache enabled."""
220 |     # Your test code here
221 |     # Cache will be enabled for this test
222 |     assert is_cache_enabled() is True
223 | 
```
Page 1/19FirstPrevNextLast